Return-Path: Delivered-To: apmail-hadoop-chukwa-commits-archive@minotaur.apache.org Received: (qmail 93283 invoked from network); 21 Dec 2009 21:05:51 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 21 Dec 2009 21:05:51 -0000 Received: (qmail 63936 invoked by uid 500); 21 Dec 2009 21:05:51 -0000 Delivered-To: apmail-hadoop-chukwa-commits-archive@hadoop.apache.org Received: (qmail 63911 invoked by uid 500); 21 Dec 2009 21:05:51 -0000 Mailing-List: contact chukwa-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: chukwa-dev@hadoop.apache.org Delivered-To: mailing list chukwa-commits@hadoop.apache.org Received: (qmail 63901 invoked by uid 99); 21 Dec 2009 21:05:51 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Dec 2009 21:05:51 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Dec 2009 21:05:41 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4078023888EC; Mon, 21 Dec 2009 21:05:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r892976 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/ src/java/org/apache/hadoop/chukwa/datacollection/agent/ src/java/org/apache... Date: Mon, 21 Dec 2009 21:05:18 -0000 To: chukwa-commits@hadoop.apache.org From: asrabkin@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091221210519.4078023888EC@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: asrabkin Date: Mon Dec 21 21:05:17 2009 New Revision: 892976 URL: http://svn.apache.org/viewvc?rev=892976&view=rev Log: CHUKWA-421. Use modification time to detect rotation. Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java Modified: hadoop/chukwa/trunk/CHANGES.txt hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java Modified: hadoop/chukwa/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=892976&r1=892975&r2=892976&view=diff ============================================================================== --- hadoop/chukwa/trunk/CHANGES.txt (original) +++ hadoop/chukwa/trunk/CHANGES.txt Mon Dec 21 21:05:17 2009 @@ -18,6 +18,8 @@ IMPROVEMENTS + CHUKWA-421. Use modification time to detect rotation. (asrabkin) + CHUKWA-432. PipelineableWriter becomes an abstract class. (asrabkin) CHUKWA-429. Update HDFS heatmap color with rainbow colors. (Eric Yang) Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java?rev=892976&r1=892975&r2=892976&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java Mon Dec 21 21:05:17 2009 @@ -35,11 +35,10 @@ @Override public final void start(String adaptorID, String type, long offset, - ChunkReceiver dest, AdaptorManager c) throws AdaptorException { + ChunkReceiver dest) throws AdaptorException { this.adaptorID = adaptorID; this.type = type; this.dest=dest; - control = c; start(offset); } @@ -50,7 +49,8 @@ control.stopAdaptor(adaptorID, gracefully); } - public String parseArgs(String d, String s) { + public String parseArgs(String d, String s, AdaptorManager c) { + control = c; return parseArgs(s); } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java?rev=892976&r1=892975&r2=892976&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java Mon Dec 21 21:05:17 2009 @@ -13,6 +13,7 @@ String innerClassName; String innerType; ChunkReceiver dest; + AdaptorManager manager; @Override public String getCurrentStatus() { @@ -30,14 +31,15 @@ * Note that the name of the inner class will get parsed out as a type */ @Override - public String parseArgs(String innerClassName, String params) { + public String parseArgs(String innerClassName, String params, AdaptorManager a) { + manager = a; Matcher m = p.matcher(params); this.innerClassName = innerClassName; String innerCoreParams; if(m.matches()) { innerType = m.group(1); inner = AdaptorFactory.createAdaptor(innerClassName); - innerCoreParams = inner.parseArgs(innerType,m.group(2)); + innerCoreParams = inner.parseArgs(innerType,m.group(2),a); return innerClassName + innerCoreParams; } else return null; @@ -64,10 +66,10 @@ */ @Override public void start(String adaptorID, String type, long offset, - ChunkReceiver dest, AdaptorManager c) throws AdaptorException { + ChunkReceiver dest) throws AdaptorException { String dummyAdaptorID = adaptorID; this.dest = dest; - inner.start(dummyAdaptorID, type, offset, this, c); + inner.start(dummyAdaptorID, type, offset, this); } @Override Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java?rev=892976&r1=892975&r2=892976&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java Mon Dec 21 21:05:17 2009 @@ -55,7 +55,7 @@ * @throws AdaptorException */ public void start(String adaptorID, String type, long offset, - ChunkReceiver dest, AdaptorManager c) throws AdaptorException; + ChunkReceiver dest) throws AdaptorException; /** * Return the adaptor's state Should not include class name or byte @@ -77,7 +77,7 @@ * * @return Stream name as a string, null if params are malformed */ - public String parseArgs(String datatype, String params); + public String parseArgs(String datatype, String params, AdaptorManager c); /** * Signals this adaptor to come to an orderly stop. The adaptor ought to push Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java?rev=892976&r1=892975&r2=892976&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java Mon Dec 21 21:05:17 2009 @@ -63,7 +63,7 @@ @Override public void start(String adaptorID, String type, long offset, - ChunkReceiver dest, AdaptorManager manager) throws AdaptorException { + ChunkReceiver dest) throws AdaptorException { try { String dummyAdaptorID = adaptorID; this.dest = dest; @@ -81,7 +81,7 @@ for(Chunk c:myBuffer.chunks) dest.add(c); - inner.start(dummyAdaptorID, innerType, offset, this, manager); + inner.start(dummyAdaptorID, innerType, offset, this); } catch(InterruptedException e) { throw new AdaptorException(e); } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java?rev=892976&r1=892975&r2=892976&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java Mon Dec 21 21:05:17 2009 @@ -48,21 +48,14 @@ */ int DEFAULT_SAMPLE_PERIOD_MS = 1000 * 2; int SAMPLE_PERIOD_MS = DEFAULT_SAMPLE_PERIOD_MS; - private static Configuration conf = null; +// private Configuration conf = null; public static final int MAX_SAMPLE_PERIOD = 60 * 1000; - FileTailer() { - if (conf == null) { - ChukwaAgent agent = ChukwaAgent.getAgent(); - if (agent != null) { - conf = agent.getConfiguration(); - if (conf != null) { - SAMPLE_PERIOD_MS = conf.getInt( - "chukwaAgent.adaptor.context.switch.time", - DEFAULT_SAMPLE_PERIOD_MS); - } - } - } + FileTailer(Configuration conf) { + // this.conf = conf; + SAMPLE_PERIOD_MS = conf.getInt( + "chukwaAgent.adaptor.context.switch.time", + DEFAULT_SAMPLE_PERIOD_MS); eq = DataFactory.getInstance().getEventQueue(); // iterations are much more common than adding a new adaptor Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java?rev=892976&r1=892975&r2=892976&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java Mon Dec 21 21:05:17 2009 @@ -57,7 +57,7 @@ protected static FileTailer tailer; static { - tailer = new FileTailer(); + tailer = null; log = Logger.getLogger(FileTailingAdaptor.class); } @@ -77,9 +77,11 @@ @Override public void start(long offset) { - conf = control.getConfiguration(); - MAX_READ_SIZE = conf.getInt(MAX_READ_SIZE_OPT, DEFAULT_MAX_READ_SIZE); - this.fileReadOffset = offset; + synchronized(LWFTAdaptor.class) { + if(tailer == null) + tailer = new FileTailer(control.getConfiguration()); + } + this.fileReadOffset = offset - offsetOfFirstByte; tailer.startWatchingFile(this); } @@ -97,12 +99,15 @@ public String getStreamName() { return toWatch.getPath(); } - + @Override public String parseArgs(String params) { + conf = control.getConfiguration(); + MAX_READ_SIZE = conf.getInt(MAX_READ_SIZE_OPT, DEFAULT_MAX_READ_SIZE); + Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?"); Matcher m = cmd.matcher(params); - if (m.matches()) { + if (m.matches()) { //check for first-byte offset. If absent, assume we just got a path. offsetOfFirstByte = Long.parseLong(m.group(1)); toWatch = new File(m.group(2)); } else { @@ -228,6 +233,7 @@ } private void handleShrunkenFile(long measuredLen) { + log.info("file "+ toWatch +"shrank from " + fileReadOffset + " to " + measuredLen); offsetOfFirstByte = measuredLen; fileReadOffset = 0; } Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java?rev=892976&view=auto ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java (added) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java Mon Dec 21 21:05:17 2009 @@ -0,0 +1,164 @@ +package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.Collections; +import java.util.Queue; +import java.util.LinkedList; +import org.apache.hadoop.chukwa.datacollection.ChunkReceiver; + +/** + * Checkpoint state: + * date modified of most-recently tailed file, offset of first byte of that file, + * then regular FTA arts + * + */ +public class RCheckFTAdaptor extends LWFTAdaptor implements FileFilter { + + private static class FPair implements Comparable { + File f; + long mod; + FPair(File f) { + this.f = f; + mod = f.lastModified(); + } + /** + * -1 implies this is LESS THAN o + */ + @Override + public int compareTo(FPair o) { + if(mod < o.mod) + return -1; + else if (mod > o.mod) + return 1; + //want toWatch to be last + else return (o.f.getName().compareTo(f.getName()));//shouldn't happen? + } + } + + long prevFileLastModDate = 0; + LinkedList fileQ = new LinkedList(); + String fBaseName; + File cur; //this is the actual physical file being watched. + // in contrast, toWatch is the path specified by the user + boolean caughtUp = false; + /** + * Check for date-modified and offset; if absent assume we just got a name. + */ + @Override + public String parseArgs(String params) { + Pattern cmd = Pattern.compile("d:(\\d+)\\s+(\\d+)\\s+(.+)\\s?"); + Matcher m = cmd.matcher(params); + if (m.matches()) { + prevFileLastModDate = Long.parseLong(m.group(1)); + offsetOfFirstByte = Long.parseLong(m.group(2)); + toWatch = new File(m.group(3)).getAbsoluteFile(); + } else { + toWatch = new File(params.trim()).getAbsoluteFile(); + } + fBaseName = toWatch.getName(); + return toWatch.getAbsolutePath(); + } + + public String getCurrentStatus() { + return type.trim() + " d:" + prevFileLastModDate + " " + offsetOfFirstByte + " " + toWatch.getPath(); + } + + @Override + public boolean accept(File pathname) { + return pathname.getName().startsWith(fBaseName) && + ( pathname.getName().equals(fBaseName) || + pathname.lastModified() > prevFileLastModDate); + } + + + protected void mkFileQ() { + + File toWatchDir = toWatch.getParentFile(); + File[] candidates = toWatchDir.listFiles(this); + if(candidates == null) { + log.error(toWatchDir + " is not a directory"); + } else { + log.debug("saw " + candidates.length + " files matching pattern"); + fileQ = new LinkedList(); + for(File f:candidates) + fileQ.add(new FPair(f)); + Collections.sort(fileQ); + } + } + + protected void advanceQ() { + FPair next = fileQ.poll(); + if(next != null) { + cur = next.f; + caughtUp = toWatch.equals(cur); + if(caughtUp && !fileQ.isEmpty()) + log.warn("expected rotation queue to be empty when caught up..."); + } + else { + cur = null; + caughtUp = true; + } + } + + @Override + public void start(long offset) { + mkFileQ(); //figure out what to watch + advanceQ(); + super.start(offset); + } + + @Override + public synchronized boolean tailFile(ChunkReceiver eq) + throws InterruptedException { + boolean hasMoreData = false; + try { + + if(caughtUp) { + //we're caught up and watching an unrotated file + mkFileQ(); //figure out what to watch + advanceQ(); + } + if(cur == null) //file we're watching doesn't exist + return false; + + log.debug("treating " + cur + " as " + toWatch); + + long len = cur.length(); + long tsPreTail = cur.exists() ? cur.lastModified() : prevFileLastModDate; + if(len < fileReadOffset) { + log.info("file "+ cur +" shrank from " + fileReadOffset + " to " + len); + //no unseen changes to prev version, since mod date is older than last scan. + offsetOfFirstByte += fileReadOffset; + fileReadOffset = 0; + } else if(len > fileReadOffset) { + log.debug("slurping from " + cur+ " at offset " + fileReadOffset); + RandomAccessFile reader = new RandomAccessFile(cur, "r"); + slurp(len, reader); + reader.close(); + } else { + //we're either caught up or at EOF + if (!caughtUp) { + prevFileLastModDate = cur.lastModified(); + //Hit EOF on an already-rotated file. Move on! + offsetOfFirstByte += fileReadOffset; + fileReadOffset = 0; + advanceQ(); + log.debug("not caught up, and hit EOF. Moving forward in queue to " + cur); + } else + prevFileLastModDate = tsPreTail; + + } + + } catch(IOException e) { + log.warn("IOException in tailer", e); + deregisterAndStop(false); + } + + return hasMoreData; + } +} Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java?rev=892976&r1=892975&r2=892976&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java Mon Dec 21 21:05:17 2009 @@ -263,7 +263,7 @@ public void tryToBind() throws IOException { if(ALLOW_REMOTE) s = new ServerSocket(portno); - else { + else { //FIXME: is there a way to allow all local addresses? (including IPv6 local) s = new ServerSocket(); s.bind(new InetSocketAddress(InetAddress.getByAddress(new byte[] {127,0,0,1}), portno)); } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=892976&r1=892975&r2=892976&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Mon Dec 21 21:05:17 2009 @@ -329,7 +329,7 @@ log.warn("Error creating adaptor of class " + adaptorClassName); return null; } - String coreParams = adaptor.parseArgs(dataType,params); + String coreParams = adaptor.parseArgs(dataType,params,this); if(coreParams == null) { log.warn("invalid params for adaptor: " + params); return null; @@ -354,7 +354,7 @@ needNewCheckpoint = true; try { adaptor.start(adaptorID, dataType, offset, DataFactory - .getInstance().getEventQueue(), this); + .getInstance().getEventQueue()); log.info("started a new adaptor, id = " + adaptorID + " function=["+adaptor.toString()+"]"); ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size()); ChukwaAgent.agentMetrics.addedAdaptor.inc(); Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java?rev=892976&r1=892975&r2=892976&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java Mon Dec 21 21:05:17 2009 @@ -71,8 +71,8 @@ File file = new File(logFile); connector.start(); Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName); - adaptor.parseArgs(recordType, "0 " +file.getAbsolutePath()); - adaptor.start("", recordType, 0l,queue, AdaptorManager.NULL ); + adaptor.parseArgs(recordType, "0 " +file.getAbsolutePath(),AdaptorManager.NULL); + adaptor.start("", recordType, 0l,queue); adaptor.shutdown(AdaptorShutdownPolicy.WAIT_TILL_FINISHED); connector.shutdown(); file.renameTo(new File(logFile + ".sav")); Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java?rev=892976&r1=892975&r2=892976&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java Mon Dec 21 21:05:17 2009 @@ -41,7 +41,7 @@ } public void start(String adaptorID, String type, long offset, - ChunkReceiver dest, AdaptorManager c) throws AdaptorException { + ChunkReceiver dest) throws AdaptorException { this.setName("MaxRateSender adaptor"); this.adaptorID = adaptorID; this.offset = offset; @@ -51,7 +51,7 @@ } @Override - public String parseArgs(String d, String s) { + public String parseArgs(String d, String s,AdaptorManager c) { return s; } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java?rev=892976&r1=892975&r2=892976&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java Mon Dec 21 21:05:17 2009 @@ -109,6 +109,12 @@ return tmpOutput; } + + public static File makeTestFile(String name, int size) throws IOException { + return makeTestFile(name, size, new File(System.getProperty("test.build.data", "/tmp"))); + + } + public static File makeTestFile(File baseDir) throws IOException { return makeTestFile("atemp",10, baseDir); } Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java?rev=892976&r1=892975&r2=892976&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java (original) +++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java Mon Dec 21 21:05:17 2009 @@ -115,7 +115,7 @@ return false; } - private void createEmptyDir(File dir) { + public static void createEmptyDir(File dir) { if(!nukeDirContents(dir)) dir.mkdir(); assertTrue(dir.isDirectory() && dir.listFiles().length == 0); Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java?rev=892976&r1=892975&r2=892976&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java (original) +++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java Mon Dec 21 21:05:17 2009 @@ -28,6 +28,7 @@ import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent; import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController; import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector; +import static org.apache.hadoop.chukwa.util.TempFileUtil.*; public class TestLogRotate extends TestCase { ChunkCatcherConnector chunks; @@ -91,28 +92,5 @@ Thread.sleep(2000); } - private File makeTestFile(String name, int size) throws IOException { - File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"), - name); - FileOutputStream fos = new FileOutputStream(tmpOutput); - - PrintWriter pw = new PrintWriter(fos); - for (int i = 0; i < size; ++i) { - pw.print(i + " "); - pw.println("abcdefghijklmnopqrstuvwxyz"); - } - pw.flush(); - pw.close(); - return tmpOutput; - } - - public static void main(String[] args) { - try { - TestLogRotate tests = new TestLogRotate(); - tests.testLogRotate(); - } catch (Exception e) { - e.printStackTrace(); - } - } } Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java?rev=892976&view=auto ============================================================================== --- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java (added) +++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java Mon Dec 21 21:05:17 2009 @@ -0,0 +1,93 @@ +package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import org.apache.hadoop.chukwa.Chunk; +import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent; +import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector; +import org.apache.hadoop.conf.Configuration; +import junit.framework.TestCase; +import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor; +import org.apache.log4j.Level; + +public class TestRCheckAdaptor extends TestCase { + + ChunkCatcherConnector chunks; + + public TestRCheckAdaptor() { + chunks = new ChunkCatcherConnector(); + chunks.start(); + } + + public void testLogRotate() throws IOException, InterruptedException, + ChukwaAgent.AlreadyRunningException { + Configuration conf = new Configuration(); + conf.set("chukwaAgent.control.port", "0"); + conf.setInt("chukwaAgent.adaptor.context.switch.time", 100); + +// RCheckFTAdaptor.log.setLevel(Level.DEBUG); + File baseDir = new File(System.getProperty("test.build.data", "/tmp") + "/rcheck"); + TestDirTailingAdaptor.createEmptyDir(baseDir); + File tmpOutput = new File(baseDir, "rotateTest.1"); + PrintWriter pw = new PrintWriter(new FileOutputStream(tmpOutput)); + pw.println("First"); + pw.close(); + Thread.sleep(1000);//to make sure mod dates are distinguishing. + tmpOutput = new File(baseDir, "rotateTest"); + pw = new PrintWriter(new FileOutputStream(tmpOutput)); + pw.println("Second"); + pw.close(); + + + ChukwaAgent agent = new ChukwaAgent(conf); + String adaptorID = agent.processAddCommand("add lr = filetailer.RCheckFTAdaptor test " + tmpOutput.getAbsolutePath() + " 0"); + assertNotNull(adaptorID); + + Chunk c = chunks.waitForAChunk(2000); + assertNotNull(c); + assertTrue(c.getData().length == 6); + assertTrue("First\n".equals(new String(c.getData()))); + c = chunks.waitForAChunk(2000); + assertNotNull(c); + assertTrue(c.getData().length == 7); + assertTrue("Second\n".equals(new String(c.getData()))); + + pw = new PrintWriter(new FileOutputStream(tmpOutput, true)); + pw.println("Third"); + pw.close(); + c = chunks.waitForAChunk(2000); + + assertNotNull(c); + assertTrue(c.getData().length == 6); + assertTrue("Third\n".equals(new String(c.getData()))); + Thread.sleep(1500); + + tmpOutput.renameTo(new File(baseDir, "rotateTest.2")); + pw = new PrintWriter(new FileOutputStream(tmpOutput, true)); + pw.println("Fourth"); + pw.close(); + c = chunks.waitForAChunk(2000); + + assertNotNull(c); + System.out.println("got " + new String(c.getData())); + assertTrue("Fourth\n".equals(new String(c.getData()))); + + Thread.sleep(1500); + + tmpOutput.renameTo(new File(baseDir, "rotateTest.3")); + Thread.sleep(400); + pw = new PrintWriter(new FileOutputStream(tmpOutput, true)); + pw.println("Fifth"); + pw.close(); + c = chunks.waitForAChunk(2000); + assertNotNull(c); + System.out.println("got " + new String(c.getData())); + assertTrue("Fifth\n".equals(new String(c.getData()))); + + agent.shutdown(); + Thread.sleep(2000); + } + +} Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java?rev=892976&r1=892975&r2=892976&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java (original) +++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java Mon Dec 21 21:05:17 2009 @@ -50,6 +50,12 @@ System.out.println("testing lightweight fta"); runTest("LWFTAdaptor"); } + + + public void testRotAdaptor() throws Exception { + System.out.println("testing lightweight fta"); + runTest("LWFTAdaptor"); + } public void runTest(String name) throws IOException, InterruptedException, ChukwaAgent.AlreadyRunningException {