Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 71715 invoked from network); 5 Dec 2008 20:31:11 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 5 Dec 2008 20:31:11 -0000 Received: (qmail 24023 invoked by uid 500); 5 Dec 2008 20:31:22 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 23997 invoked by uid 500); 5 Dec 2008 20:31:22 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 23987 invoked by uid 99); 5 Dec 2008 20:31:22 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Dec 2008 12:31:22 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Dec 2008 20:29:47 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 148862388A0C; Fri, 5 Dec 2008 12:30:35 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r723855 [5/23] - in /hadoop/core/trunk: ./ src/contrib/ src/contrib/chukwa/ src/contrib/chukwa/bin/ src/contrib/chukwa/conf/ src/contrib/chukwa/docs/ src/contrib/chukwa/docs/paper/ src/contrib/chukwa/hadoop-packaging/ src/contrib/chukwa/lib... Date: Fri, 05 Dec 2008 20:30:21 -0000 To: core-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081205203035.148862388A0C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java Fri Dec 5 12:30:14 2008 @@ -23,6 +23,9 @@ import org.apache.hadoop.chukwa.datacollection.DataFactory; import org.apache.hadoop.chukwa.datacollection.ChunkQueue; +import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent; +import org.apache.hadoop.conf.Configuration; +import org.mortbay.log.Log; /** * A shared thread used by all FileTailingAdaptors. @@ -40,21 +43,32 @@ /** * How often to tail each file. */ - int SAMPLE_PERIOD_MS = 1000* 2; //FIXME: should be configurable + int DEFAULT_SAMPLE_PERIOD_MS = 1000* 2; + int SAMPLE_PERIOD_MS = DEFAULT_SAMPLE_PERIOD_MS; + private static Configuration conf = null; FileTailer() { - eq = DataFactory.getInstance().getEventQueue(); + if (conf == null) { + ChukwaAgent agent = ChukwaAgent.getAgent(); + if (agent != null) { + conf = agent.getConfiguration(); + if (conf != null) { + SAMPLE_PERIOD_MS= conf.getInt("chukwaAgent.adaptor.context.switch.time", DEFAULT_SAMPLE_PERIOD_MS); + } + } + } + eq = DataFactory.getInstance().getEventQueue(); - //iterations are much more common than adding a new adaptor - adaptors = new CopyOnWriteArrayList(); + //iterations are much more common than adding a new adaptor + adaptors = new CopyOnWriteArrayList(); - this.setDaemon(true); - start();//start the file-tailing thread + this.setDaemon(true); + start();//start the file-tailing thread } //called by FileTailingAdaptor, only void startWatchingFile(FileTailingAdaptor f) { - adaptors.add(f); + adaptors.add(f); } //called by FileTailingAdaptor, only @@ -64,7 +78,7 @@ public void run() { try{ - while(true) { + while(true) { boolean shouldISleep = true; long startTime = System.currentTimeMillis(); for(FileTailingAdaptor f: adaptors) { @@ -72,13 +86,12 @@ shouldISleep &= !hasMoreData; } long timeToReadFiles = System.currentTimeMillis() - startTime; - assert timeToReadFiles >= 0 : " time shouldn't go backwards"; - if(timeToReadFiles < SAMPLE_PERIOD_MS && shouldISleep) - Thread.sleep(SAMPLE_PERIOD_MS - timeToReadFiles+1); + if(timeToReadFiles < SAMPLE_PERIOD_MS || shouldISleep) { + Thread.sleep(SAMPLE_PERIOD_MS); + } } + } catch(InterruptedException e) { } - catch(InterruptedException e) - {} } Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java Fri Dec 5 12:30:14 2008 @@ -22,9 +22,13 @@ import org.apache.hadoop.chukwa.datacollection.ChunkReceiver; import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor; import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException; +import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent; +import org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec; +import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; import java.io.*; +import java.util.Timer; /** * An adaptor that repeatedly tails a specified file, sending the new bytes. @@ -42,8 +46,12 @@ * to the next. This way, we get quick response time for other files if one * file is growing rapidly. */ - public static final int MAX_READ_SIZE = 128 * 1024; - + public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024 ; + public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE ; + public static int MAX_RETRIES = 300; + protected static Configuration conf = null; + private int attempts = 0; + File toWatch; /** * next PHYSICAL offset to read @@ -51,6 +59,8 @@ protected long fileReadOffset; protected String type; private ChunkReceiver dest; + protected RandomAccessFile reader = null; + protected long adaptorID; /** * The logical offset of the first byte of the file @@ -64,21 +74,24 @@ log =Logger.getLogger(FileTailingAdaptor.class); } - public void start(String type, String params, long bytes, ChunkReceiver dest) { - //in this case params = filename + public void start(long adaptorID, String type, String params, long bytes, ChunkReceiver dest) { + //in this case params = filename log.info("started file tailer on file " + params); - this.type = type; - this.dest = dest; + this.adaptorID = adaptorID; + this.type = type; + this.dest = dest; + this.attempts = 0; - String[] words = params.split(" "); - if(words.length > 1) { - offsetOfFirstByte = Long.parseLong(words[0]); - toWatch = new File(params.substring(words[0].length() + 1)); - } - else - toWatch = new File(params); + String[] words = params.split(" "); + if(words.length > 1) { + offsetOfFirstByte = Long.parseLong(words[0]); + toWatch = new File(params.substring(words[0].length() + 1)); + } else { + toWatch = new File(params); + } - this.fileReadOffset= bytes - offsetOfFirstByte; + + this.fileReadOffset= bytes; tailer.startWatchingFile(this); } @@ -88,25 +101,38 @@ */ public long shutdown() throws AdaptorException { try{ - tailFile(tailer.eq); // get tail end of file. - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); + if(toWatch.exists()) { + int retry=0; + tailer.stopWatchingFile(this); + TerminatorThread lastTail = new TerminatorThread(this,tailer.eq); + lastTail.setDaemon(true); + lastTail.start(); + while(lastTail.isAlive() && retry < 60) { + try { + log.info("Retry:"+retry); + Thread.currentThread().sleep(1000); + retry++; + } catch(InterruptedException ex) { + } + } + } + } finally { + return fileReadOffset + offsetOfFirstByte; } - hardStop();//need to do this afterwards, so that offset stays visible during tailFile(). - return fileReadOffset + offsetOfFirstByte; + } /** * Stop tailing the file, effective immediately. */ public void hardStop() throws AdaptorException { - tailer.stopWatchingFile(this); + tailer.stopWatchingFile(this); } /** * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus() */ public String getCurrentStatus() { - return type + " " + offsetOfFirstByte+ " " + toWatch.getPath(); + return type.trim() + " " + offsetOfFirstByte+ " " + toWatch.getPath() + " " + fileReadOffset; // can make this more efficient using a StringBuilder } @@ -114,6 +140,10 @@ return "Tailer on " + toWatch; } + public String getStreamName() { + return toWatch.getPath(); + } + /** * Looks at the tail of the associated file, adds some of it to event queue * This method is not thread safe. Returns true if there's more data in the @@ -123,35 +153,119 @@ */ public synchronized boolean tailFile(ChunkReceiver eq) throws InterruptedException { boolean hasMoreData = false; - try { - if(!toWatch.exists()) - return false; //no more data - - RandomAccessFile reader = new RandomAccessFile(toWatch, "r"); - long len = reader.length(); - if (len > fileReadOffset) { - reader.seek(fileReadOffset); - - long bufSize = len - fileReadOffset; - if (bufSize > MAX_READ_SIZE) { - bufSize = MAX_READ_SIZE; - hasMoreData = true; - } - byte[] buf = new byte[(int) bufSize]; - reader.read(buf); - assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: " - + " pointer is " - + reader.getFilePointer() - + " but offset is " + fileReadOffset + bufSize; - - int bytesUsed = extractRecords(dest, fileReadOffset + offsetOfFirstByte, buf); - fileReadOffset = fileReadOffset + bytesUsed; - } - reader.close(); - } catch (IOException e) { - log.warn("failure reading " + toWatch, e); - } - return hasMoreData; + try { + if(!toWatch.exists() && attempts>MAX_RETRIES) { + log.warn("Adaptor|" + adaptorID +"| File does not exist: "+toWatch.getAbsolutePath()+", streaming policy expired. File removed from streaming."); + ChukwaAgent agent = ChukwaAgent.getAgent(); + if (agent != null) { + agent.stopAdaptor(adaptorID, false); + } else { + log.info("Agent is null, running in default mode"); + } + tailer.stopWatchingFile(this); + return false; + } else if(!toWatch.exists()) { + log.warn("failed to stream data for: "+toWatch.getAbsolutePath()+", attempt: "+attempts+" of "+MAX_RETRIES); + attempts++; + return false; //no more data + } + if (reader == null) + { + reader = new RandomAccessFile(toWatch, "r"); + log.info("Adaptor|" + adaptorID + "|Opening the file for the first time|seek|" + fileReadOffset); + } + + long len = 0L; + try { + RandomAccessFile newReader = new RandomAccessFile(toWatch,"r"); + len = reader.length(); + long newLength = newReader.length(); + if(newLength= len) { + reader.close(); + reader = newReader; + fileReadOffset=0L; + log.debug("Adaptor|" + adaptorID +"| File size mismatched, rotating: "+toWatch.getAbsolutePath()); + } else { + try { + newReader.close(); + } catch(IOException e) { + // do nothing. + } + } + } catch(IOException e) { + // do nothing, if file doesn't exist. + } + if (len >= fileReadOffset) { + if(offsetOfFirstByte>fileReadOffset) { + // If the file rotated, the recorded offsetOfFirstByte is greater than file size, + // reset the first byte position to beginning of the file. + offsetOfFirstByte = 0L; + fileReadOffset=0; + } + + log.debug("Adaptor|" + adaptorID + "|seeking|" + fileReadOffset ); + reader.seek(fileReadOffset); + + long bufSize = len - fileReadOffset; + + if (conf == null) + { + ChukwaAgent agent = ChukwaAgent.getAgent(); + if (agent != null) + { + conf = agent.getConfiguration(); + if (conf != null) + { + MAX_READ_SIZE= conf.getInt("chukwaAgent.fileTailingAdaptor.maxReadSize", DEFAULT_MAX_READ_SIZE); + log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: " + MAX_READ_SIZE); + } + else + { + log.info("Conf is null, running in default mode"); + } + } + else + { + log.info("Agent is null, running in default mode"); + } + } + + if (bufSize > MAX_READ_SIZE) { + bufSize = MAX_READ_SIZE; + hasMoreData = true; + } + byte[] buf = new byte[(int) bufSize]; + + + long curOffset = fileReadOffset; + + reader.read(buf); + assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: " + + " pointer is " + + reader.getFilePointer() + + " but offset is " + fileReadOffset + bufSize; + + int bytesUsed = extractRecords(dest, fileReadOffset + offsetOfFirstByte, buf); + fileReadOffset = fileReadOffset + bytesUsed; + + + log.debug("Adaptor|" + adaptorID + "|start|" + curOffset + "|end|"+ fileReadOffset); + + + } else { + // file has rotated and no detection + reader.close(); + reader=null; + fileReadOffset = 0L; + offsetOfFirstByte = 0L; + hasMoreData = true; + log.warn("Adaptor|" + adaptorID +"| file has rotated and no detection - reset counters to 0L"); + } + } catch (IOException e) { + log.warn("failure reading " + toWatch, e); + } + attempts=0; + return hasMoreData; } /** Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java?rev=723855&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java (added) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java Fri Dec 5 12:30:14 2008 @@ -0,0 +1,38 @@ +package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer; + +import java.io.IOException; +import java.util.TimerTask; + +import org.apache.hadoop.chukwa.datacollection.ChunkReceiver; +import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException; +import org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor; +import org.apache.hadoop.chukwa.inputtools.plugin.metrics.ExecHelper; +import org.apache.log4j.Logger; + +public class TerminatorThread extends Thread { + private static Logger log =Logger.getLogger(FileAdaptor.class); + private static FileTailingAdaptor adaptor = null; + private static ChunkReceiver eq = null; + + public TerminatorThread(FileTailingAdaptor adaptor, ChunkReceiver eq) { + this.adaptor = adaptor; + this.eq = eq; + } + + public void run() { + log.info("Terminator thread started."); + try { + while(adaptor.tailFile(eq)) { + log.info(""); + } + } catch (InterruptedException e) { + log.info("Unable to send data to collector for 60 seconds, force shutdown."); + } + log.info("Terminator finished."); + try { + adaptor.reader.close(); + } catch (IOException ex) { + + } + } +} Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java Fri Dec 5 12:30:14 2008 @@ -19,6 +19,7 @@ package org.apache.hadoop.chukwa.datacollection.agent; import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor; +import org.apache.log4j.Logger; /** * Produces new unconfigured adaptors, given the class name of the appender type @@ -26,6 +27,7 @@ */ public class AdaptorFactory { + static Logger log = Logger.getLogger(ChukwaAgent.class); /** * Instantiate an adaptor that can be added by the {@link ChukwaAgent} * @param className the name of the {@link Adaptor} class to instantiate @@ -39,13 +41,32 @@ obj = Class.forName(className).newInstance(); if (Adaptor.class.isInstance(obj)){ return (Adaptor) obj; + } + else + return null; + } catch (Exception e1){ + log.warn("Error instantiating new adaptor by class name, " + + "attempting again, but with default chukwa package prepended, i.e. " + + "org.apache.hadoop.chukwa.datacollection.adaptor." + className + ". " + + e1); + try{ + //if failed, try adding default class prefix + Object obj2 = Class.forName( + "org.apache.hadoop.chukwa.datacollection.adaptor." + + className).newInstance(); + if (Adaptor.class.isInstance(obj2)){ + log.debug("Succeeded in finding class by adding default chukwa " + + "namespace prefix to class name profided"); + return (Adaptor) obj2; + } + else + return null; + } catch (Exception e2) { + System.out.println("Also error instantiating new adaptor by classname" + + "with prefix added" + e2); + return null; } - else return null; - } catch (Exception e){ - System.out.println("Error instantiating new adaptor by class" + e); - return null; } - } } Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java Fri Dec 5 12:30:14 2008 @@ -56,11 +56,14 @@ InputStream in = connection.getInputStream(); BufferedReader br = new BufferedReader(new InputStreamReader(in)); PrintStream out = new PrintStream(new BufferedOutputStream(connection.getOutputStream())); + //out.println("You are connected to the chukwa agent on "+ InetAddress.getLocalHost().getCanonicalHostName()); + //out.flush(); String cmd = null; while((cmd = br.readLine()) != null) { processCommand(cmd, out); } - log.info("control connection closed"); + if (log.isDebugEnabled()) + { log.debug("control connection closed");} } catch(SocketException e ) { if(e.getMessage().equals("Socket Closed")) @@ -78,7 +81,8 @@ */ public void processCommand(String cmd, PrintStream out) throws IOException { String[] words = cmd.split(" "); - log.info("command from " + connection.getRemoteSocketAddress() + ":"+ cmd); + if (log.isDebugEnabled()) + { log.debug("command from " + connection.getRemoteSocketAddress() + ":"+ cmd);} if(words[0].equalsIgnoreCase("help")) { out.println("you're talking to the Chukwa agent. Commands available: "); @@ -88,6 +92,7 @@ out.println("list -- list running adaptors"); out.println("close -- close this connection"); out.println("stopagent -- stop the whole agent process"); + out.println("reloadCollectors -- reload the list of collectors"); out.println("help -- print this message"); out.println("\t Command names are case-blind."); } @@ -123,9 +128,15 @@ out.println("OK adaptor "+ num+ " stopped"); } } - else if(words[0].equalsIgnoreCase("list") ) { + else if(words[0].equalsIgnoreCase("reloadCollectors")) { + agent.getConnector().reloadConfiguration(); + out.println("OK reloadCollectors done"); + }else if(words[0].equalsIgnoreCase("list") ) { Map adaptorsByNumber = agent.getAdaptorList(); - System.out.println("number of adaptors: " + adaptorsByNumber.size()); + + if (log.isDebugEnabled()) + { log.debug("number of adaptors: " + adaptorsByNumber.size());} + synchronized(adaptorsByNumber) { for(Map.Entry a: adaptorsByNumber.entrySet()) { try{ @@ -168,7 +179,7 @@ } public String formatAdaptorStatus(Adaptor a) throws AdaptorException { - return a.getClass().getCanonicalName() + " " + a.getCurrentStatus() + " " + agent.getOffset(a); + return a.getClass().getCanonicalName() + " " + a.getCurrentStatus(); } /** @@ -186,7 +197,8 @@ { try { Socket connection = s.accept(); - log.info("new connection from " + connection.getInetAddress()); + if (log.isDebugEnabled()) + { log.debug("new connection from " + connection.getInetAddress());} ListenThread l = new ListenThread(connection); l.setDaemon(true); l.start(); Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Fri Dec 5 12:30:14 2008 @@ -23,6 +23,7 @@ import org.apache.hadoop.chukwa.datacollection.connector.*; import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector; import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector; +import org.apache.hadoop.chukwa.util.PidFile; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; @@ -32,257 +33,331 @@ import java.io.*; /** - * The local agent daemon that runs on each machine. - * This class is designed to be embeddable, for use in testing. + * The local agent daemon that runs on each machine. This class is designed to + * be embeddable, for use in testing. * */ public class ChukwaAgent { - boolean DO_CHECKPOINT_RESTORE = false; - boolean WRITE_CHECKPOINTS = false; - - static String tags = ""; + boolean DO_CHECKPOINT_RESTORE = true; + //boolean WRITE_CHECKPOINTS = true; + static Logger log = Logger.getLogger(ChukwaAgent.class); + static ChukwaAgent agent = null; + private static PidFile pFile = null; + + public static ChukwaAgent getAgent() + { + return agent; + } + + Configuration conf = null; + Connector connector = null; - //doesn't need an equals(), comparator, etc - private static class Offset { - public Offset(long l, long id) { + // doesn't need an equals(), comparator, etc + private static class Offset + { + public Offset(long l, long id) + { offset = l; this.id = id; } + private volatile long id; private volatile long offset; } - public static class AlreadyRunningException extends Exception { + public static class AlreadyRunningException extends Exception + { private static final long serialVersionUID = 1L; - public AlreadyRunningException() { + public AlreadyRunningException() + { super("Agent already running; aborting"); } } - - + private final Map adaptorPositions; - //basically only used by the control socket thread. + // basically only used by the control socket thread. private final Map adaptorsByNumber; - File checkpointDir; //lock this object to indicate checkpoint in progress - File initialAdaptors; - String CHECKPOINT_BASE_NAME; //base filename for checkpoint files - int CHECKPOINT_INTERVAL_MS ; //min interval at which to write checkpoints - + private File checkpointDir; // lock this object to indicate checkpoint in + // progress + private File initialAdaptors; + private String CHECKPOINT_BASE_NAME; // base filename for checkpoint files + private int CHECKPOINT_INTERVAL_MS; // min interval at which to write + // checkpoints + private static String tags = ""; private Timer checkpointer; - private volatile boolean needNewCheckpoint = false; //set to true if any event has happened - //that should cause a new checkpoint to be written - - - private long lastAdaptorNumber= 0; //ID number of the last adaptor to be started - private int checkpointNumber; //id number of next checkpoint. - //should be protected by grabbing lock on checkpointDir - + private volatile boolean needNewCheckpoint = false; // set to true if any + // event has happened + // that should cause a new checkpoint to be written + + private long lastAdaptorNumber = 0; // ID number of the last adaptor to be + // started + private int checkpointNumber; // id number of next checkpoint. + // should be protected by grabbing lock on checkpointDir private final AgentControlSocketListener controlSock; /** * @param args - * @throws AdaptorException + * @throws AdaptorException */ - public static void main(String[] args) throws AdaptorException { + public static void main(String[] args) throws AdaptorException + { + + pFile = new PidFile("Agent"); + Runtime.getRuntime().addShutdownHook(pFile); - try{ - System.out.println("usage: LocalAgent [-restore] [default collector URL]"); - ChukwaAgent agent = new ChukwaAgent(); - if(agent.anotherAgentIsRunning()) { - System.out.println("another agent is running (or port has been usurped). Bailing out now"); - } - - Connector connector; - - int uriArgNumber= 0; - if(args.length > 0) { - if(args[0].equals("-restore")) { - agent.DO_CHECKPOINT_RESTORE = true; + try + { + if (args.length > 0 && args[0].equals("-help")) { + System.out.println("usage: LocalAgent [-noCheckPoint]" + + "[default collector URL]"); + System.exit(0); + } + ChukwaAgent localAgent = new ChukwaAgent(); + + if (agent.anotherAgentIsRunning()) + { + System.out + .println("another agent is running (or port has been usurped). " + + "Bailing out now"); + System.exit(-1); + } + + int uriArgNumber = 0; + if (args.length > 0) + { + if (args[0].equalsIgnoreCase("-noCheckPoint")) + { + agent.DO_CHECKPOINT_RESTORE = false; uriArgNumber = 1; } - if(args[uriArgNumber].equals("local")) - connector = new ConsoleOutConnector(agent); + if (args[uriArgNumber].equals("local")) + agent.connector = new ConsoleOutConnector(agent); else { - if(!args[uriArgNumber].contains("://")) + if (!args[uriArgNumber].contains("://")) args[uriArgNumber] = "http://" + args[uriArgNumber]; - connector = new HttpConnector(agent, args[uriArgNumber]); + agent.connector = new HttpConnector(agent, args[uriArgNumber]); } - } - else - connector = new HttpConnector(agent); + } else + agent.connector = new HttpConnector(agent); - connector.start(); + agent.connector.start(); log.info("local agent started on port " + agent.getControlSock().portno); - } catch(AlreadyRunningException e){ - log.error("agent started already on this machine with same portno ; bailing out"); - System.out.println("agent started already on this machine with same portno ; bailing out"); - System.exit(0); //better safe than sorry - } catch(Exception e) { + } catch (AlreadyRunningException e) + { + log + .error("agent started already on this machine with same portno;" + + " bailing out"); + System.out + .println("agent started already on this machine with same portno;" + + " bailing out"); + System.exit(0); // better safe than sorry + } catch (Exception e) + { e.printStackTrace(); } } - private boolean anotherAgentIsRunning() { + + private boolean anotherAgentIsRunning() + { return !controlSock.isBound(); } + /** * @return the number of running adaptors inside this local agent */ - public int adaptorCount() { - return adaptorPositions.size(); + public int adaptorCount() + { + return adaptorsByNumber.size(); } public ChukwaAgent() throws AlreadyRunningException { + ChukwaAgent.agent = this; + readConfig(); - //almost always just reading this; so use a ConcurrentHM. - //since we wrapped the offset, it's not a structural mod. - adaptorPositions= new ConcurrentHashMap(); + // almost always just reading this; so use a ConcurrentHM. + // since we wrapped the offset, it's not a structural mod. + adaptorPositions = new ConcurrentHashMap(); adaptorsByNumber = new HashMap(); - checkpointNumber=0; - try{ - if(DO_CHECKPOINT_RESTORE) + checkpointNumber = 0; + try + { + if (DO_CHECKPOINT_RESTORE) restoreFromCheckpoint(); - } catch(IOException e) { + } catch (IOException e) + { log.warn("failed to restart from checkpoint: ", e); } - - try { - if(initialAdaptors != null && initialAdaptors.exists()) + + try + { + if (initialAdaptors != null && initialAdaptors.exists()) readAdaptorsFile(initialAdaptors); - } catch(IOException e) { - log.warn("couldn't read user-specified file "+ initialAdaptors.getAbsolutePath()); + } catch (IOException e) + { + log.warn("couldn't read user-specified file " + + initialAdaptors.getAbsolutePath()); } - + controlSock = new AgentControlSocketListener(this); - try { - controlSock.tryToBind(); //do this synchronously; if it fails, we know another agent is running. - controlSock.start(); //this sets us up as a daemon + try + { + controlSock.tryToBind(); // do this synchronously; if it fails, we know + // another agent is running. + controlSock.start(); // this sets us up as a daemon log.info("control socket started on port " + controlSock.portno); - - if(CHECKPOINT_INTERVAL_MS > 0) { + + if (CHECKPOINT_INTERVAL_MS > 0) + { checkpointer = new Timer(); checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS); } - } catch(IOException e) { + } catch (IOException e) + { log.info("failed to bind to socket; aborting agent launch", e); throw new AlreadyRunningException(); } - } - //FIXME: should handle bad lines here + // FIXME: should handle bad lines here public long processCommand(String cmd) { String[] words = cmd.split(" "); - if(words[0].equalsIgnoreCase("add")) + if (words[0].equalsIgnoreCase("add")) { - //words should contain (space delimited): - // 0) command ("add") - // 1) AdaptorClassname - // 2) dataType (e.g. "hadoop_log") - // 3) params - // (e.g. for files, this is filename, - // but can be arbitrarily many space - // delimited agent specific params ) - // 4) offset + // words should contain (space delimited): + // 0) command ("add") + // 1) AdaptorClassname + // 2) dataType (e.g. "hadoop_log") + // 3) params + // (e.g. for files, this is filename, + // but can be arbitrarily many space + // delimited agent specific params ) + // 4) offset long offset; - try { - offset = Long.parseLong(words[words.length-1]); - } catch(NumberFormatException e) { + try + { + offset = Long.parseLong(words[words.length - 1]); + } catch (NumberFormatException e) + { log.warn("malformed line " + cmd); return -1L; } String adaptorName = words[1]; Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName); - if(adaptor == null) { - log.warn("don't recognize adaptor name " + adaptorName); + if (adaptor == null) + { + log.warn("Error creating adaptor from adaptor name " + adaptorName); return -1L; } - String dataType = words[2]; - + String streamName = ""; String params = ""; - if(words.length > 4){ //no argument - int begParams = adaptorName.length()+dataType.length()+6;//length("ADD x type ") = length(x) + 5, i.e. count letters & spaces - params = cmd.substring(begParams, cmd.length() - words[words.length-1].length() -1); + if (words.length > 4) + { // no argument + int begParams = adaptorName.length() + dataType.length() + 6; + // length("ADD x type ") = length(x) + 5, i.e. count letters & spaces + params = cmd.substring(begParams, cmd.length() + - words[words.length - 1].length() - 1); + streamName = params.substring(params.indexOf(" ") + 1, params.length()); } long adaptorID; - synchronized(adaptorsByNumber) { - adaptorID = ++lastAdaptorNumber; + synchronized (adaptorsByNumber) + { + for (Map.Entry a : adaptorsByNumber.entrySet()) + { + if (streamName.intern() == a.getValue().getStreamName().intern()) + { + log.warn(params + " already exist, skipping."); + return -1; + } + } + adaptorID = ++lastAdaptorNumber; adaptorsByNumber.put(adaptorID, adaptor); - adaptorPositions.put(adaptor, new Offset(offset,adaptorID)); - } - - try { - adaptor.start(dataType, params, offset, DataFactory.getInstance().getEventQueue()); - log.info("started a new adaptor, id = " +adaptorID); - return adaptorID ; - - } catch(AdaptorException e) { - log.warn("failed to start adaptor", e); - //FIXME: don't we need to clean up the adaptor maps here? + adaptorPositions.put(adaptor, new Offset(offset, adaptorID)); + try + { + adaptor.start(adaptorID, dataType, params, offset, DataFactory + .getInstance().getEventQueue()); + log.info("started a new adaptor, id = " + adaptorID); + return adaptorID; + + } catch (Exception e) + { + log.warn("failed to start adaptor", e); + // FIXME: don't we need to clean up the adaptor maps here? + } } - } - else + } else log.warn("only 'add' command supported in config files"); return -1; } /** - * Tries to restore from a checkpoint file in checkpointDir. - * There should usually only be one checkpoint present -- - * two checkpoints present implies a crash during - * writing the higher-numbered one. - * As a result, this method chooses the lowest-numbered file present. - * - * Lines in the checkpoint file are processed one at a time with processCommand(); - * + * Tries to restore from a checkpoint file in checkpointDir. There should + * usually only be one checkpoint present -- two checkpoints present implies a + * crash during writing the higher-numbered one. As a result, this method + * chooses the lowest-numbered file present. + * + * Lines in the checkpoint file are processed one at a time with + * processCommand(); + * * @return true if the restore succeeded * @throws IOException - */ + */ public boolean restoreFromCheckpoint() throws IOException { - synchronized(checkpointDir) + synchronized (checkpointDir) { - String[] checkpointNames = checkpointDir.list(new FilenameFilter() + String[] checkpointNames = checkpointDir.list(new FilenameFilter() { - public boolean accept(File dir, String name) { + public boolean accept(File dir, String name) + { return name.startsWith(CHECKPOINT_BASE_NAME); - } + } }); - if(checkpointNames.length == 0) + + if (checkpointNames == null) { + log.error("Unable to list directories in checkpoint dir"); + return false; + } + if (checkpointNames.length == 0) { - log.info("No checkpoints found in "+ checkpointDir); + log.info("No checkpoints found in " + checkpointDir); return false; } - if(checkpointNames.length > 2) - log.warn("expected at most two checkpoint files in " + checkpointDir + "; saw " + checkpointNames.length); - else if(checkpointNames.length == 0) + if (checkpointNames.length > 2) + log.warn("expected at most two checkpoint files in " + checkpointDir + + "; saw " + checkpointNames.length); + else if (checkpointNames.length == 0) return false; - String lowestName=null; - int lowestIndex=Integer.MAX_VALUE; - for(String n: checkpointNames) { - int index = Integer.parseInt(n.substring(CHECKPOINT_BASE_NAME.length())); - if(index < lowestIndex) { + String lowestName = null; + int lowestIndex = Integer.MAX_VALUE; + for (String n : checkpointNames) + { + int index = Integer + .parseInt(n.substring(CHECKPOINT_BASE_NAME.length())); + if (index < lowestIndex) + { lowestName = n; lowestIndex = index; } @@ -294,203 +369,244 @@ } return true; } + private void readAdaptorsFile(File checkpoint) throws FileNotFoundException, IOException { - BufferedReader br = new BufferedReader( new InputStreamReader(new FileInputStream(checkpoint))); - String cmd=null; - while((cmd = br.readLine()) != null) + BufferedReader br = new BufferedReader(new InputStreamReader( + new FileInputStream(checkpoint))); + String cmd = null; + while ((cmd = br.readLine()) != null) processCommand(cmd); br.close(); } /** * Called periodically to write checkpoints + * * @throws IOException */ - public void writeCheckpoint() throws IOException - { + public void writeCheckpoint() throws IOException { needNewCheckpoint = false; - synchronized(checkpointDir) { + synchronized (checkpointDir) { log.info("writing checkpoint " + checkpointNumber); - FileOutputStream fos = new FileOutputStream( - new File(checkpointDir, CHECKPOINT_BASE_NAME + checkpointNumber)); - PrintWriter out = new PrintWriter( new BufferedWriter( + FileOutputStream fos = new FileOutputStream(new File(checkpointDir, + CHECKPOINT_BASE_NAME + checkpointNumber)); + PrintWriter out = new PrintWriter(new BufferedWriter( new OutputStreamWriter(fos))); - for(Map.Entry stat: adaptorPositions.entrySet()) { - try{ + for (Map.Entry stat : adaptorPositions.entrySet()) { + try { Adaptor a = stat.getKey(); out.print("ADD " + a.getClass().getCanonicalName()); - out.print(" "); - out.print(a.getType()); - out.print(" " + a.getCurrentStatus() + " "); - out.println(stat.getValue().offset); - } catch(AdaptorException e) { + out.println(" " + a.getCurrentStatus()); + } catch (AdaptorException e) { e.printStackTrace(); - }//don't try to recover from bad adaptor yet + }// don't try to recover from bad adaptor yet } out.close(); - File lastCheckpoint = new File(checkpointDir, CHECKPOINT_BASE_NAME + (checkpointNumber-1)); - log.debug("hopefully removing old checkpoint file " + lastCheckpoint.getAbsolutePath()); + File lastCheckpoint = new File(checkpointDir, CHECKPOINT_BASE_NAME + + (checkpointNumber - 1)); + log.debug("hopefully removing old checkpoint file " + + lastCheckpoint.getAbsolutePath()); lastCheckpoint.delete(); checkpointNumber++; } } - public void reportCommit(Adaptor src, long uuid) - { + public void reportCommit(Adaptor src, long uuid) { needNewCheckpoint = true; Offset o = adaptorPositions.get(src); - if(o != null) { - synchronized(o) { //order writes to offset, in case commits are processed out of order - if( uuid > o.offset) + if (o != null) + { + synchronized (o) + { // order writes to offset, in case commits are processed out of order + if (uuid > o.offset) o.offset = uuid; } - - log.info("got commit up to " + uuid + " on " + src+ " = "+ o.id); - } - else { - log.warn("got commit up to " + uuid + " for adaptor " +src + - " that doesn't appear to be running: " + adaptorsByNumber.size() + " total"); + + log.info("got commit up to " + uuid + " on " + src + " = " + o.id); + } else + { + log.warn("got commit up to " + uuid + " for adaptor " + src + + " that doesn't appear to be running: " + adaptorsByNumber.size() + + " total"); } } - class CheckpointTask extends TimerTask { - public void run() { - try{ - if(needNewCheckpoint ) { + class CheckpointTask extends TimerTask { + public void run() + { + try + { + if (needNewCheckpoint) + { writeCheckpoint(); } - } catch(IOException e) { + } catch (IOException e) + { log.warn("failed to write checkpoint", e); } } } - -//for use only by control socket. - Map getAdaptorList() { - return adaptorsByNumber; + + // for use only by control socket. + public Map getAdaptorList() { + return adaptorsByNumber; } + /** - * Stop the adaptor with given ID number. - * Takes a parameter to indicate whether the adaptor should - * force out all remaining data, or just exit abruptly. + * Stop the adaptor with given ID number. Takes a parameter to indicate + * whether the adaptor should force out all remaining data, or just exit + * abruptly. * - * If the adaptor is written correctly, its offset won't change after returning - * from shutdown. + * If the adaptor is written correctly, its offset won't change after + * returning from shutdown. * - * @param number the adaptor to stop - * @param gracefully if true, shutdown, if false, hardStop + * @param number + * the adaptor to stop + * @param gracefully + * if true, shutdown, if false, hardStop * @return the number of bytes synched at stop. -1 on error */ - public long stopAdaptor(long number, boolean gracefully) { + public long stopAdaptor(long number, boolean gracefully) { Adaptor toStop; long offset = -1; - - //at most one thread can get past this critical section with toStop != null - //so if multiple callers try to stop the same adaptor, all but one will fail - synchronized(adaptorsByNumber) { + + // at most one thread can get past this critical section with toStop != null + // so if multiple callers try to stop the same adaptor, all but one will + // fail + synchronized (adaptorsByNumber) { toStop = adaptorsByNumber.remove(number); } - if(toStop == null) { + if (toStop == null) { log.warn("trying to stop adaptor " + number + " that isn't running"); return offset; } - try { - if(gracefully ) { - - long bytesSentByAdaptor = toStop.shutdown(); //this can block - long unstableBytes = bytesSentByAdaptor -adaptorPositions.get(toStop).offset; - while(unstableBytes > 0 ) { - log.info("waiting for adaptor " + number + " to terminate " + - unstableBytes + " bytes are still uncommitted"); - Thread.sleep(2000); - unstableBytes = bytesSentByAdaptor -adaptorPositions.get(toStop).offset; - } + + try { + if (gracefully) { + offset = toStop.shutdown(); } - else - toStop.hardStop(); - Offset off = adaptorPositions.remove(toStop); //next checkpoint will have the remove - offset = off == null ? -1 : off.offset; - needNewCheckpoint = true; - - } catch(AdaptorException e) { + } catch (AdaptorException e) { log.error("adaptor failed to stop cleanly", e); - } catch(InterruptedException e) { - log.error("can't wait for adaptor to finish writing", e); + } finally { + needNewCheckpoint = true; } return offset; } - + + public Configuration getConfiguration() { + return conf; + } + + public Connector getConnector() { + return connector; + } + protected void readConfig() { - Configuration conf = new Configuration(); + conf = new Configuration(); + String chukwaHome = System.getenv("CHUKWA_HOME"); - if (chukwaHome == null){ + if (chukwaHome == null) { chukwaHome = "."; } - if(!chukwaHome.endsWith("/")) - chukwaHome = chukwaHome + "/"; - - conf.addResource(new Path("conf/chukwa-agent-conf.xml")); - CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name", "chukwa_checkpoint_"); - checkpointDir= new File(conf.get("chukwaAgent.checkpoint.dir", chukwaHome+ "/var/")); - CHECKPOINT_INTERVAL_MS= conf.getInt("chukwaAgent.checkpoint.interval", 5000); - DO_CHECKPOINT_RESTORE = conf.getBoolean("chukwaAgent.checkpoint.enabled", false); - if(DO_CHECKPOINT_RESTORE) { - WRITE_CHECKPOINTS = true; - log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS); + + if (!chukwaHome.endsWith("/")) { + chukwaHome = chukwaHome + File.separator; + } + log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]"); + + String chukwaConf = System.getProperty("CHUKWA_CONF_DIR"); + if (chukwaConf == null) { + chukwaConf = chukwaHome + "conf" + File.separator; + } + if (!chukwaHome.endsWith("/")) { + chukwaHome = chukwaHome + File.separator; + } + if (!chukwaConf.endsWith("/")) { + chukwaConf = chukwaConf + File.separator; + } + log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]"); + + conf.addResource(new Path(chukwaConf + "chukwa-agent-conf.xml")); + DO_CHECKPOINT_RESTORE = conf.getBoolean("chukwaAgent.checkpoint.enabled", + true); + CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name", + "chukwa_checkpoint_"); + checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", chukwaHome + + "/var/")); + CHECKPOINT_INTERVAL_MS = conf.getInt("chukwaAgent.checkpoint.interval", + 5000); + if (!checkpointDir.exists()) + { + checkpointDir.mkdirs(); } - // String initialAdaptorsStr = conf.get("initial_adaptors_file"); - tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\""); - - initialAdaptors = new File(chukwaHome + "conf/initial_adaptors"); + + log.info("Config - chukwaHome: [" + chukwaHome + "]"); + log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]"); + log.info("Config - checkpointDir: [" + checkpointDir + "]"); + log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS + + "]"); + log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]"); + log.info("Config - tags: [" + tags + "]"); + + if (DO_CHECKPOINT_RESTORE) { + needNewCheckpoint = true; + log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS); + } + + initialAdaptors = new File(chukwaConf + "initial_adaptors"); } - + public void shutdown() { shutdown(false); } /** - * Triggers agent shutdown. - * For now, this method doesn't shut down adaptors explicitly. It probably should. + * Triggers agent shutdown. For now, this method doesn't shut down adaptors + * explicitly. It probably should. */ public void shutdown(boolean exit) { - if(checkpointer != null) + if (checkpointer != null) checkpointer.cancel(); - - controlSock.shutdown(); //make sure we don't get new requests - + controlSock.shutdown(); // make sure we don't get new requests try { - if(WRITE_CHECKPOINTS) - writeCheckpoint(); //write a last checkpoint here, before stopping adaptors - } catch(IOException e) { + if (needNewCheckpoint) + writeCheckpoint(); // write a last checkpoint here, before stopping + // adaptors + } catch (IOException e) { } - - synchronized(adaptorsByNumber) { //shut down each adaptor - for(Adaptor a: adaptorsByNumber.values()) { - try{ + + synchronized (adaptorsByNumber) { + // shut down each adaptor + for (Adaptor a : adaptorsByNumber.values()) { + try { a.hardStop(); - }catch(AdaptorException e) { - log.warn("failed to cleanly stop " + a,e); + } catch (AdaptorException e) + { + log.warn("failed to cleanly stop " + a, e); } } } - - if(exit) + if (exit) System.exit(0); } -/** - * Returns the last offset at which a given adaptor was checkpointed - * @param a the adaptor in question - * @return that adaptor's last-checkpointed offset - */ + + /** + * Returns the last offset at which a given adaptor was checkpointed + * + * @param a + * the adaptor in question + * @return that adaptor's last-checkpointed offset + */ public long getOffset(Adaptor a) { return adaptorPositions.get(a).offset; } + /** * Returns the control socket for this agent. */ @@ -499,7 +615,7 @@ } public static String getTags() { - return tags; + return tags; } } Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java Fri Dec 5 12:30:14 2008 @@ -18,8 +18,9 @@ package org.apache.hadoop.chukwa.datacollection.agent; -import java.util.*; -//import java.util.concurrent.*; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.datacollection.ChunkQueue; @@ -33,13 +34,14 @@ */ public class MemLimitQueue implements ChunkQueue { - static Logger log = Logger.getLogger(WaitingQueue.class); private Queue queue = new LinkedList(); private long dataSize = 0; private final long MAX_MEM_USAGE; + + public MemLimitQueue(int limit) { MAX_MEM_USAGE = limit; } @@ -47,15 +49,21 @@ /** * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#add(org.apache.hadoop.chukwa.Chunk) */ - public void add(Chunk event) throws InterruptedException + public void add(Chunk chunk) throws InterruptedException { - assert event != null: "can't enqueue null chunks"; + assert chunk != null: "can't enqueue null chunks"; synchronized(this) { - while(event.getData().length + dataSize > MAX_MEM_USAGE) - this.wait(); - - dataSize += event.getData().length; - queue.add(event); + while(chunk.getData().length + dataSize > MAX_MEM_USAGE) + { + try + { + this.wait(); + log.info("MemLimitQueue is full [" + dataSize +"]"); + } + catch(InterruptedException e) {} + } + dataSize += chunk.getData().length; + queue.add(chunk); this.notifyAll(); } @@ -64,7 +72,7 @@ /** * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#collect(java.util.List, int) */ - public void collect(List events,int maxCount) throws InterruptedException + public void collect(List events,int maxSize) throws InterruptedException { synchronized(this) { //we can't just say queue.take() here, since we're holding a lock. @@ -72,10 +80,13 @@ this.wait(); } - int i = 0; - while(!queue.isEmpty() && (i++ < maxCount)) { + + int size = 0; + while(!queue.isEmpty() && (size < maxSize)) { Chunk e = this.queue.remove(); - dataSize -= e.getData().length; + int chunkSize = e.getData().length; + size += chunkSize; + dataSize -= chunkSize; events.add(e); } this.notifyAll(); Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java Fri Dec 5 12:30:14 2008 @@ -22,53 +22,77 @@ import org.mortbay.jetty.nio.SelectChannelConnector; import org.mortbay.jetty.servlet.*; import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector; -import org.apache.hadoop.chukwa.datacollection.writer.ConsoleWriter; +import org.apache.hadoop.chukwa.datacollection.writer.*; +import org.apache.hadoop.chukwa.util.PidFile; import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; public class CollectorStub { - - public static void main(String[] args) - { - + static int THREADS = 80; + private static PidFile pFile = null; + public static Server jettyServer = null; + public static void main(String[] args) { + + pFile=new PidFile("Collector"); + Runtime.getRuntime().addShutdownHook(pFile); try { System.out.println("usage: CollectorStub [portno] [pretend]"); - System.out.println("note: if no portno defined, defaults to value in chukwa-site.xml"); + System.out.println("note: if no portno defined, " + + "defaults to value in chukwa-site.xml"); ChukwaConfiguration conf = new ChukwaConfiguration(); int portNum = conf.getInt("chukwaCollector.http.port", 9999); - + THREADS = conf.getInt("chukwaCollector.http.threads", 80); + if(args.length != 0) portNum = Integer.parseInt(args[0]); + + //pick a writer. if(args.length > 1) { if(args[1].equals("pretend")) ServletCollector.setWriter(new ConsoleWriter(true)); else if(args[1].equals("pretend-quietly")) ServletCollector.setWriter(new ConsoleWriter(false)); + else if(args[1].equals("-classname")) { + if(args.length < 3) + System.err.println("need to specify a writer class"); + else { + Class writerClass = Class.forName(args[2]); + if(writerClass != null && + ChukwaWriter.class.isAssignableFrom(writerClass)) + ServletCollector.setWriter( + (ChukwaWriter) writerClass.newInstance()); + else + System.err.println(args[2]+ "is not a ChukwaWriter"); + } + } else - System.out.println("WARNING: don't know what to do with command line arg "+ args[1]); + System.out.println("WARNING: unknown command line arg "+ args[1]); } + //set up jetty connector SelectChannelConnector jettyConnector = new SelectChannelConnector(); - jettyConnector.setLowResourcesConnections(20); - jettyConnector.setLowResourceMaxIdleTime(1000); + jettyConnector.setLowResourcesConnections(THREADS-10); + jettyConnector.setLowResourceMaxIdleTime(1500); jettyConnector.setPort(portNum); - Server server = new Server(portNum); - server.setConnectors(new Connector[]{ jettyConnector}); - org.mortbay.thread.BoundedThreadPool pool = new org.mortbay.thread.BoundedThreadPool(); - pool.setMaxThreads(30); - server.setThreadPool(pool); - Context root = new Context(server,"/",Context.SESSIONS); + //set up jetty server + jettyServer = new Server(portNum); + + jettyServer.setConnectors(new Connector[]{ jettyConnector}); + org.mortbay.thread.BoundedThreadPool pool = + new org.mortbay.thread.BoundedThreadPool(); + pool.setMaxThreads(THREADS); + jettyServer.setThreadPool(pool); + //and add the servlet to it + Context root = new Context(jettyServer,"/",Context.SESSIONS); root.addServlet(new ServletHolder(new ServletCollector()), "/*"); - server.start(); - server.setStopAtShutdown(false); + jettyServer.start(); + jettyServer.setStopAtShutdown(false); System.out.println("started http collector on port number " + portNum); - } - catch(Exception e) - { - e.printStackTrace(); + } catch(Exception e) { + e.printStackTrace(); System.exit(0); } Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java Fri Dec 5 12:30:14 2008 @@ -18,7 +18,13 @@ package org.apache.hadoop.chukwa.datacollection.collector.servlet; -import java.io.*; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.LinkedList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import javax.servlet.ServletConfig; import javax.servlet.ServletException; @@ -27,26 +33,30 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.ChunkImpl; import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter; +import org.apache.hadoop.chukwa.datacollection.writer.WriterException; import org.apache.log4j.Logger; public class ServletCollector extends HttpServlet { - static final boolean FANCY_DIAGNOSTICS = true; + static final boolean FANCY_DIAGNOSTICS = false; static org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null; private static final long serialVersionUID = 6286162898591407111L; Logger log = Logger.getRootLogger();//.getLogger(ServletCollector.class); - - public static void setWriter(org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter w) throws IOException + public static void setWriter(org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter w) throws WriterException { writer = w; w.init(); } - + static long statTime = 0L; + static int numberHTTPConnection = 0; + static int numberchunks = 0; + public void init(ServletConfig servletConf) throws ServletException { @@ -56,6 +66,20 @@ return; } + + Timer statTimer = new Timer(); + statTimer.schedule(new TimerTask() + { + public void run() + { + log.info("stats:ServletCollector,numberHTTPConnection:" + numberHTTPConnection + + ",numberchunks:"+numberchunks); + statTime = System.currentTimeMillis(); + numberHTTPConnection = 0; + numberchunks = 0; + } + }, (1000), (60*1000)); + try { // read the application->pipeline settings from a config file in the format: @@ -81,56 +105,75 @@ if (writer == null) writer = new SeqFileWriter(); - } catch (IOException e) { + } catch (WriterException e) { throw new ServletException("Problem init-ing servlet", e); } } protected void accept(HttpServletRequest req, HttpServletResponse resp) - throws ServletException + throws ServletException { - ServletDiagnostics diagnosticPage = new ServletDiagnostics(); + numberHTTPConnection ++; + ServletDiagnostics diagnosticPage = new ServletDiagnostics(); + final long currentTime = System.currentTimeMillis(); try { - - final long currentTime = System.currentTimeMillis(); - log.debug("new post from " + req.getRemoteHost() + " at " + currentTime); + + log.debug("new post from " + req.getRemoteHost() + " at " + currentTime); java.io.InputStream in = req.getInputStream(); - + ServletOutputStream l_out = resp.getOutputStream(); final DataInputStream di = new DataInputStream(in); final int numEvents = di.readInt(); - // log.info("saw " + numEvents+ " in request"); + // log.info("saw " + numEvents+ " in request"); + + if(FANCY_DIAGNOSTICS) + { diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime); } - if(FANCY_DIAGNOSTICS) - diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime); - for (int i = 0; i < numEvents; i++){ - // TODO: pass new data to all registered stream handler methods for this chunk's stream + List events = new LinkedList(); + ChunkImpl logEvent = null; + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < numEvents; i++) + { + // TODO: pass new data to all registered stream handler + // methods for this chunk's stream // TODO: should really have some dynamic assignment of events to writers - ChunkImpl logEvent = ChunkImpl.read(di); + logEvent = ChunkImpl.read(di); + sb.append("ok:"); + sb.append(logEvent.getData().length); + sb.append(" bytes ending at offset "); + sb.append(logEvent.getSeqID()-1).append("\n"); - if(FANCY_DIAGNOSTICS) - diagnosticPage.sawChunk(logEvent, i); - - // write new data to data sync file - if(writer != null) { - writer.add(logEvent); //save() blocks until data is written - //this is where we ACK this connection - l_out.print("ok:"); - l_out.print(logEvent.getData().length); - l_out.print(" bytes ending at offset "); - l_out.println(logEvent.getSeqID()-1); - } - else - l_out.println("can't write: no writer"); + events.add(logEvent); + + if(FANCY_DIAGNOSTICS) + { diagnosticPage.sawChunk(logEvent, i); } } - if(FANCY_DIAGNOSTICS) - diagnosticPage.doneWithPost(); - resp.setStatus(200); - - } catch (IOException e) { - log.warn("IO error", e); + // write new data to data sync file + if(writer != null) + { + writer.add(events); + numberchunks += events.size(); + //this is where we ACK this connection + l_out.print(sb.toString()); + } + else + { + l_out.println("can't write: no writer"); + } + + + if(FANCY_DIAGNOSTICS) + { diagnosticPage.doneWithPost(); } + + resp.setStatus(200); + + } + catch(Throwable e) + { + log.warn("Exception talking to " +req.getRemoteHost() + " at " + currentTime , e); throw new ServletException(e); } } @@ -147,16 +190,30 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - PrintStream out = new PrintStream(resp.getOutputStream()); - resp.setStatus(200); - out.println("

Chukwa servlet running

"); - if(FANCY_DIAGNOSTICS) - ServletDiagnostics.printPage(out); - out.println(""); -// accept(req,resp); + + PrintStream out = new PrintStream(resp.getOutputStream()); + resp.setStatus(200); + + String pingAtt = req.getParameter("ping"); + if (pingAtt!=null) + { + out.println("Date:" + ServletCollector.statTime); + out.println("Now:" + System.currentTimeMillis()); + out.println("numberHTTPConnection:" + ServletCollector.numberHTTPConnection); + out.println("numberchunks:" + ServletCollector.numberchunks); + } + else + { + out.println("

Chukwa servlet running

"); + if(FANCY_DIAGNOSTICS) + ServletDiagnostics.printPage(out); + out.println(""); + } + + } - @Override + @Override public String getServletInfo() { return "Chukwa Servlet Collector"; @@ -165,10 +222,14 @@ @Override public void destroy() { - synchronized(writer) - { - writer.close(); - } + try + { + writer.close(); + } catch (WriterException e) + { + log.warn("Exception during close", e); + e.printStackTrace(); + } super.destroy(); } } Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java Fri Dec 5 12:30:14 2008 @@ -31,6 +31,10 @@ static Logger log= Logger.getLogger(ServletDiagnostics.class); + + static int CHUNKS_TO_KEEP = 50; + static int CHUNKS_TO_DISPLAY = 50; + private static class PostStats { //statistics about a chunk public PostStats(String src, int count, long receivedTs) { @@ -75,7 +79,6 @@ static LinkedList lastPosts; PostStats curPost; - static int CHUNKS_TO_KEEP = 300; public void sawPost(String source, int chunks, long receivedTs) { @@ -96,28 +99,33 @@ long timeWindowOfSample = Long.MAX_VALUE; long now = System.currentTimeMillis(); - out.println("
    "); synchronized(lastPosts) { + int toSkip = lastPosts.size() - CHUNKS_TO_DISPLAY; + if(!lastPosts.isEmpty()) timeWindowOfSample = now - lastPosts.peek().receivedTs; for(PostStats stats: lastPosts) { - out.print("
  • "); - - out.print(stats.dataSize + " bytes from " + stats.src + " at timestamp " + stats.receivedTs); - out.println(" which was " + ((now - stats.receivedTs)/ 1000) + " seconds ago"); Long oldBytes = bytesFromHost.get(stats.src); long newBytes = stats.dataSize; if(oldBytes != null) newBytes += oldBytes; bytesFromHost.put(stats.src, newBytes); - out.println("
      "); - for(int i =0; i < stats.count; ++i) - out.println("
    1. "+ stats.lengths[i] + " bytes of type " + - stats.types[i] + ". Adaptor name ="+ stats.names[i] +"
    2. "); - out.println("
  • "); + + if( -- toSkip < 0) { //done skipping + out.print("
  • "); + + out.print(stats.dataSize + " bytes from " + stats.src + " at timestamp " + stats.receivedTs); + out.println(" which was " + ((now - stats.receivedTs)/ 1000) + " seconds ago"); + + out.println("
      "); + for(int i =0; i < stats.count; ++i) + out.println("
    1. "+ stats.lengths[i] + " bytes of type " + + stats.types[i] + ". Adaptor name ="+ stats.names[i] +"
    2. "); + out.println("
  • "); + } } } out.println("
"); @@ -137,7 +145,7 @@ public void doneWithPost() { synchronized(lastPosts) { if(lastPosts.size() > CHUNKS_TO_KEEP) - lastPosts.remove(); + lastPosts.removeFirst(); lastPosts.add(curPost); } } Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java Fri Dec 5 12:30:14 2008 @@ -42,5 +42,6 @@ public void start(); - public void shutdown(); + public void shutdown(); + public void reloadConfiguration(); } Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java Fri Dec 5 12:30:14 2008 @@ -56,15 +56,18 @@ static Timer statTimer = null; static volatile int chunkCount = 0; - static final int MAX_EVENTS_PER_POST = 1000; - static final int MIN_POST_INTERVAL= 4 * 1000; + static final int MAX_SIZE_PER_POST = 2*1024*1024; + static final int MIN_POST_INTERVAL= 5 * 1000; static ChunkQueue chunkQueue; ChukwaAgent agent; String argDestination = null; private boolean stopMe = false; - + private boolean reloadConfiguration = false; + private Iterator collectors = null; + protected ChukwaSender connectorClient = null; + static{ statTimer = new Timer(); chunkQueue = DataFactory.getInstance().getEventQueue(); @@ -97,65 +100,104 @@ } public void run(){ - log.info("HttpConnector started at time:" + System.currentTimeMillis()); + log.info("HttpConnector started at time:" + System.currentTimeMillis()); + + Iterator destinations = null; + + // build a list of our destinations from collectors + try{ + destinations = DataFactory.getInstance().getCollectorURLs(); + } catch (IOException e){ + log.error("Failed to retreive list of collectors from " + + "conf/collectors file", e); + } + + connectorClient = new ChukwaHttpSender(); + + if (argDestination != null) + { + ArrayList tmp = new ArrayList(); + tmp.add(argDestination); + collectors = tmp.iterator(); + connectorClient.setCollectors(collectors); + log.info("using collector specified at agent runtime: " + argDestination); + } + else if (destinations != null && destinations.hasNext()) + { + collectors = destinations; + connectorClient.setCollectors(destinations); + log.info("using collectors from collectors file"); + } + else { + log.error("No collectors specified, exiting (and taking agent with us)."); + agent.shutdown(true);//error is unrecoverable, so stop hard. + return; + } + + try { + long lastPost = System.currentTimeMillis(); + while(!stopMe) { + List newQueue = new ArrayList(); + try { + //get all ready chunks from the chunkQueue to be sent + chunkQueue.collect(newQueue,MAX_SIZE_PER_POST); //FIXME: should really do this by size + + } catch(InterruptedException e) { + System.out.println("thread interrupted during addChunks(ChunkQueue)"); + Thread.currentThread().interrupt(); + break; + } + int toSend = newQueue.size(); + List results = connectorClient.send(newQueue); + log.info("sent " +toSend + " chunks, got back " + results.size() + " acks"); + //checkpoint the chunks which were committed + for(ChukwaHttpSender.CommitListEntry cle : results) { + agent.reportCommit(cle.adaptor, cle.uuid); + chunkCount++; + } + + if (reloadConfiguration) + { + connectorClient.setCollectors(collectors); + log.info("Resetting colectors"); + reloadConfiguration = false; + } + + long now = System.currentTimeMillis(); + if( now - lastPost < MIN_POST_INTERVAL ) + Thread.sleep(now - lastPost); //wait for stuff to accumulate + lastPost = now; + } //end of try forever loop + log.info("received stop() command so exiting run() loop to shutdown connector"); + } catch(OutOfMemoryError e) { + log.warn("Bailing out",e); + System.exit(-1); + } catch(InterruptedException e) { + //do nothing, let thread die. + log.warn("Bailing out",e); + System.exit(-1); + }catch(java.io.IOException e) { + log.error("connector failed; shutting down agent"); + agent.shutdown(true); + } + } - Iterator destinations = null; - + @Override + public void reloadConfiguration() + { + reloadConfiguration = true; + Iterator destinations = null; + // build a list of our destinations from collectors try{ - destinations = DataFactory.getInstance().getCollectors(); + destinations = DataFactory.getInstance().getCollectorURLs(); } catch (IOException e){ log.error("Failed to retreive list of collectors from conf/collectors file", e); } - - ChukwaSender connectorClient = new ChukwaHttpSender(); - if (argDestination != null) { - - ArrayList tmp = new ArrayList(); - tmp.add(argDestination); - connectorClient.setCollectors(tmp.iterator()); - log.info("using collector specified at agent runtime: " + argDestination); - } else if (destinations != null && destinations.hasNext()) { - connectorClient.setCollectors(destinations); - log.info("using collectors from collectors file"); - } else { - log.error("No collectors specified, exiting (and taking agent with us)."); - agent.shutdown(true);//error is unrecoverable, so stop hard. - return; + if (destinations != null && destinations.hasNext()) + { + collectors = destinations; } - - try { - long lastPost = System.currentTimeMillis(); - while(!stopMe) { - List newQueue = new ArrayList(); - try { - //get all ready chunks from the chunkQueue to be sent - chunkQueue.collect(newQueue,MAX_EVENTS_PER_POST); //FIXME: should really do this by size - - } catch(InterruptedException e) { - System.out.println("thread interrupted during addChunks(ChunkQueue)"); - Thread.currentThread().interrupt(); - break; - } - int toSend = newQueue.size(); - List results = connectorClient.send(newQueue); - log.info("sent " +toSend + " chunks, got back " + results.size() + " acks"); - //checkpoint the chunks which were committed - for(ChukwaHttpSender.CommitListEntry cle : results) { - agent.reportCommit(cle.adaptor, cle.uuid); - chunkCount++; - } - long now = System.currentTimeMillis(); - if( now - lastPost < MIN_POST_INTERVAL ) - Thread.sleep(now - lastPost); //wait for stuff to accumulate - lastPost = now; - } //end of try forever loop - log.info("received stop() command so exiting run() loop to shutdown connector"); - } catch(InterruptedException e) { - //do nothing, let thread die. - }catch(java.io.IOException e) { - log.error("connector failed; shutting down agent"); - agent.shutdown(true); - } + } } Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java Fri Dec 5 12:30:14 2008 @@ -19,11 +19,24 @@ package org.apache.hadoop.chukwa.datacollection.controller; -import java.net.*; -import java.io.*; -import java.util.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent; +import org.apache.log4j.Logger; /** * A convenience library for applications to communicate to the {@link ChukwaAgent}. Can be used @@ -31,14 +44,16 @@ * use for handling log rations. */ public class ChukwaAgentController { - - public class AddAdaptorTask extends TimerTask { - String adaptorName; - String type; - String params; - long offset; - long numRetries; - long retryInterval; + static Logger log = Logger.getLogger(ChukwaAgentController.class); + public class AddAdaptorTask extends TimerTask + { + + String adaptorName; + String type; + String params; + long offset; + long numRetries; + long retryInterval; AddAdaptorTask(String adaptorName, String type, String params, long offset, long numRetries, long retryInterval){ @@ -50,8 +65,18 @@ this.retryInterval = retryInterval; } @Override - public void run() { - add(adaptorName, type, params, offset, numRetries, retryInterval); + public void run() + { + try + { + log.info("Trying to resend the add command [" + adaptorName + "][" + offset + "][" + params +"] [" + numRetries+"]"); + add(adaptorName, type, params, offset, numRetries, retryInterval); + } + catch(Exception e) + { + log.warn("Exception in AddAdaptorTask.run", e); + e.printStackTrace(); + } } } @@ -148,7 +173,7 @@ Map pausedAdaptors; String hostname; int portno; - private Timer addFileTimer = new Timer(); + public ChukwaAgentController(){ portno = DEFAULT_PORT; @@ -218,6 +243,7 @@ System.out.println("Scheduling a agent connection retry for adaptor add() in another " + retryInterval + " milliseconds, " + numRetries + " retries remaining"); + Timer addFileTimer = new Timer(); addFileTimer.schedule(new AddAdaptorTask(adaptorName, type, params, offset, numRetries-1, retryInterval), retryInterval); } }else{ Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Fri Dec 5 12:30:14 2008 @@ -26,8 +26,10 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Date; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpException; @@ -39,6 +41,7 @@ import org.apache.commons.httpclient.methods.RequestEntity; import org.apache.commons.httpclient.params.HttpMethodParams; import org.apache.hadoop.chukwa.Chunk; +import org.apache.hadoop.chukwa.datacollection.DataFactory; import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.log4j.Logger; @@ -47,22 +50,23 @@ * Encapsulates all of the http setup and connection details needed for * chunks to be delivered to a collector. *

- * On error, tries the list of available collectors, pauses for a minute, and then repeats. + * On error, tries the list of available collectors, pauses for a minute, + * and then repeats. *

*

Will wait forever for collectors to come up.

*/ public class ChukwaHttpSender implements ChukwaSender{ static final int MAX_RETRIES_PER_COLLECTOR = 4; //fast retries, in http client - static final int SENDER_RETRIES = 3; + static final int SENDER_RETRIES = 14440; static final int WAIT_FOR_COLLECTOR_REBOOT = 20 * 1000; //FIXME: this should really correspond to the timer in RetryListOfCollectors - + static final int BLACK_LIST_TIME = 300 * 1000; static Logger log = Logger.getLogger(ChukwaHttpSender.class); static HttpClient client = null; static MultiThreadedHttpConnectionManager connectionManager = null; static String currCollector = null; - + protected static ConcurrentHashMap blackList = null; protected Iterator collectors; static @@ -115,11 +119,11 @@ } public ChukwaHttpSender(){ - //setup default collector ArrayList tmp = new ArrayList(); this.collectors = tmp.iterator(); - currCollector = "http://localhost:8080"; - log.info("added a single collector to collector list in ConnectorClient constructor, it's hasNext is now: " + collectors.hasNext()); + ConcurrentHashMap tmpHash = new ConcurrentHashMap(); + this.blackList = tmpHash; + log.info("setting collectors to an empty iterator"); } @@ -135,21 +139,22 @@ * @param collectors */ public void setCollectors(Iterator collectors){ - this.collectors = collectors; - //setup a new destination from our list of collectors if one hasn't been set up + this.collectors = collectors; + this.blackList.clear(); + //setup a new destination from our list of collectors if one isn't set up if (currCollector == null){ if (collectors.hasNext()){ currCollector = collectors.next(); } else - log.error("No collectors to try in send(), not even trying to do doPost()"); + log.error("No collectors to try in setCollectors()"); } } /** - * grab all of the chunks currently in the chunkQueue, stores a copy of them locally, calculates - * their size, sets them up + * grab all of the chunks currently in the chunkQueue, stores a copy of them + * locally, calculates their size, sets them up * @return array of chunk id's which were ACKed by collector */ public List send(List toSend) throws InterruptedException, IOException{ @@ -183,14 +188,29 @@ //need to pick a destination here PostMethod method = new PostMethod(); try { + if(blackList.size()!=0) { + for(long time: blackList.keySet()) { + long now = new Date().getTime(); + if(now-time > BLACK_LIST_TIME) { + log.info(blackList.get(time)+" release from black list."); + blackList.remove(time); + } else if(currCollector.intern()==blackList.get(time)) { + currCollector = collectors.next(); + } + } + } doPost(method, postData, currCollector); - + // rotate POST to collectors do not work. All agent and collectors end up spending time to create TCP connections + // but unable to send any data. + // currCollector = collectors.next(); retries = SENDER_RETRIES; //reset count on success //if no exception was thrown from doPost, ACK that these chunks were sent return commitResults; } catch (Throwable e) { log.error("Http post exception", e); log.info("Checking list of collectors to see if another collector has been specified for rollover"); + blackList.put(new Date().getTime(), currCollector); + log.info("Black list collector: "+currCollector); if (collectors.hasNext()){ currCollector = collectors.next(); log.info("Found a new collector to roll over to, retrying HTTP Post to collector " + currCollector); @@ -200,6 +220,9 @@ " ms (" + retries + "retries left)"); Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT); retries --; + // shuffle the list of collectors if all of them are not available. + this.collectors = DataFactory.getInstance().getCollectorURLs(); + this.blackList.clear(); } else { log.error("No more collectors to try rolling over to; aborting"); throw new IOException("no collectors"); @@ -211,6 +234,11 @@ method.releaseConnection(); } } //end retry loop + if(currCollector==null) { + // reset the collector list, if ran out of collector to try. + this.collectors = DataFactory.getInstance().getCollectorURLs(); + this.blackList.clear(); + } return new ArrayList(); } @@ -230,25 +258,30 @@ return !(e instanceof java.net.ConnectException) && (exec < MAX_RETRIES_PER_COLLECTOR); } }); + + pars.setParameter(HttpMethodParams.SO_TIMEOUT , new Integer(30000)); + + + method.setParams(pars); method.setPath(dest); //send it across the network method.setRequestEntity(data); - log.info("HTTP post to " + dest+" length = "+ data.getContentLength()); + log.info(">>>>>> HTTP post to " + dest+" length = "+ data.getContentLength()); // Send POST request - client.setTimeout(8000); + //client.setTimeout(15*1000); int statusCode = client.executeMethod(method); if (statusCode != HttpStatus.SC_OK) { - log.error("HTTP post response statusCode: " +statusCode + ", statusLine: " + method.getStatusLine()); + log.error(">>>>>> HTTP post response statusCode: " +statusCode + ", statusLine: " + method.getStatusLine()); //do something aggressive here throw new HttpException("got back a failure from server"); } //implicitly "else" - log.info("got success back from the remote collector; response length "+ method.getResponseContentLength()); + log.info(">>>>>> HTTP Got success back from the remote collector; response length "+ method.getResponseContentLength()); //FIXME: should parse acks here InputStream rstream = null;