chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r817401 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/ src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/
Date Mon, 21 Sep 2009 20:25:48 GMT
Author: asrabkin
Date: Mon Sep 21 20:25:47 2009
New Revision: 817401

URL: http://svn.apache.org/viewvc?rev=817401&view=rev
Log:
CHUKWA-97.  Refactored FileTailers, added LWFTAdaptor.

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=817401&r1=817400&r2=817401&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Mon Sep 21 20:25:47 2009
@@ -58,13 +58,10 @@
 
   IMPROVEMENTS
 
-<<<<<<< .mine
     CHUKWA-388.  Clean up user interface color.  (Eric Yang)
 
-=======
     CHUKWA-387.  Summarize mode for dumpChunks should count bytes. (asrabkin)
 
->>>>>>> .r812501
     CHUKWA-379.  Refactor sender code. (asrabkin)
 
     CHUKWA-374.  Adaptor.getStatus() shouldn't throw exceptions. (asrabkin)
@@ -221,6 +218,8 @@
 
   NEW FEATURES
 
+    CHUKWA-97.  Refactored FileTailers, added LWFTAdaptor. (asrabkin)
+
     CHUKWA-236. Added migration script for moving database schema for Chukwa 0.1.1 to Chukwa
0.1.2. (Eric Yang)
 
     CHUKWA-78.  Added down sample SQL aggregation for job data, task data and utilization
data. (Eric Yang)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java?rev=817401&r1=817400&r2=817401&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
Mon Sep 21 20:25:47 2009
@@ -39,7 +39,7 @@
 class FileTailer extends Thread {
   static Logger log = Logger.getLogger(FileTailer.class);
 
-  private List<FileTailingAdaptor> adaptors;
+  private List<LWFTAdaptor> adaptors;
   private volatile boolean isRunning = true;
   ChunkQueue eq; // not private -- useful for file tailing adaptor classes
 
@@ -49,6 +49,7 @@
   int DEFAULT_SAMPLE_PERIOD_MS = 1000 * 2;
   int SAMPLE_PERIOD_MS = DEFAULT_SAMPLE_PERIOD_MS;
   private static Configuration conf = null;
+  public static final int MAX_SAMPLE_PERIOD = 60 * 1000;
 
   FileTailer() {
     if (conf == null) {
@@ -65,19 +66,19 @@
     eq = DataFactory.getInstance().getEventQueue();
 
     // iterations are much more common than adding a new adaptor
-    adaptors = new CopyOnWriteArrayList<FileTailingAdaptor>();
+    adaptors = new CopyOnWriteArrayList<LWFTAdaptor>();
 
     this.setDaemon(true);
     start();// start the file-tailing thread
   }
 
   // called by FileTailingAdaptor, only
-  void startWatchingFile(FileTailingAdaptor f) {
+  void startWatchingFile(LWFTAdaptor f) {
     adaptors.add(f);
   }
 
   // called by FileTailingAdaptor, only
-  void stopWatchingFile(FileTailingAdaptor f) {
+  void stopWatchingFile(LWFTAdaptor f) {
     adaptors.remove(f);
   }
 
@@ -86,11 +87,13 @@
       try {
         boolean shouldISleep = true;
         long startTime = System.currentTimeMillis();
-        for (FileTailingAdaptor f : adaptors) {
+        for (LWFTAdaptor f : adaptors) {
           boolean hasMoreData = f.tailFile(eq);
           shouldISleep &= !hasMoreData;
         }
         long timeToReadFiles = System.currentTimeMillis() - startTime;
+        if(timeToReadFiles > MAX_SAMPLE_PERIOD)
+          log.warn("took " + timeToReadFiles + " ms to check all files being tailed");
         if (timeToReadFiles < SAMPLE_PERIOD_MS || shouldISleep) {
           Thread.sleep(SAMPLE_PERIOD_MS);
         }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=817401&r1=817400&r2=817401&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
Mon Sep 21 20:25:47 2009
@@ -37,72 +37,27 @@
  * file. Subclasses can alter this behavior by overriding extractRecords().
  * 
  */
-public class FileTailingAdaptor extends AbstractAdaptor {
+public class FileTailingAdaptor extends LWFTAdaptor {
 
-  static Logger log;
 
-  /**
-   * This is the maximum amount we'll read from any one file before moving on to
-   * the next. This way, we get quick response time for other files if one file
-   * is growing rapidly.
-   */
-  public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024;
-  public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
   public static int MAX_RETRIES = 300;
   public static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
 
-  protected Configuration conf = null;
   private int attempts = 0;
   private long gracefulPeriodExpired = 0l;
   private boolean adaptorInError = false;
-  File toWatch;
-  /**
-   * next PHYSICAL offset to read
-   */
-  protected long fileReadOffset;
-  protected RandomAccessFile reader = null;
 
-  /**
-   * The logical offset of the first byte of the file
-   */
-  private long offsetOfFirstByte = 0;
-
-  private static FileTailer tailer;
-
-  static {
-    tailer = new FileTailer();
-    log = Logger.getLogger(FileTailingAdaptor.class);
-  }
+  protected RandomAccessFile reader = null;
 
   public void start(long bytes) {
-    
-    conf = control.getConfiguration();
-    MAX_READ_SIZE = conf.getInt(
-        "chukwaAgent.fileTailingAdaptor.maxReadSize",
-        DEFAULT_MAX_READ_SIZE);
-    log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: "
-        + MAX_READ_SIZE);
+    super.start(bytes);
+    log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: " + MAX_READ_SIZE);
     this.attempts = 0;
 
     log.info("started file tailer on file " + toWatch
         + " with first byte at offset " + offsetOfFirstByte);
-
-    this.fileReadOffset = bytes;
-    tailer.startWatchingFile(this);
-  }
-  
-  @Override
-  public String parseArgs(String params) { 
-    Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?");
-    Matcher m = cmd.matcher(params);
-    if (m.matches()) {
-      offsetOfFirstByte = Long.parseLong(m.group(1));
-      toWatch = new File(m.group(2));
-    } else {
-      toWatch = new File(params.trim());
-    }
-    return toWatch.getAbsolutePath();
   }
+ 
 
   /**
    * Do one last tail, and then stop
@@ -178,22 +133,6 @@
     return fileReadOffset + offsetOfFirstByte;
   }
   
-  
-  /**
-   * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus()
-   */
-  public String getCurrentStatus() {
-    return type.trim() + " " + offsetOfFirstByte + " " + toWatch.getPath();
-    // can make this more efficient using a StringBuilder
-  }
-
-  public String toString() {
-    return "Tailer on " + toWatch;
-  }
-
-  public String getStreamName() {
-    return toWatch.getPath();
-  }
 
   /**
    * Looks at the tail of the associated file, adds some of it to event queue
@@ -263,8 +202,7 @@
           
           reader = newReader;
           fileReadOffset = 0L;
-          log.debug("Adaptor|" + adaptorID
-              + "| File size mismatched, rotating: "
+          log.debug("Adaptor|"+ adaptorID + "| File size mismatched, rotating: "
               + toWatch.getAbsolutePath());
         } else {
           try {
@@ -288,50 +226,7 @@
           offsetOfFirstByte = 0L;
           log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
         }
-
-        log.debug("Adaptor|" + adaptorID + "|seeking|" + fileReadOffset);
-        reader.seek(fileReadOffset);
-
-        long bufSize = len - fileReadOffset;
-
-       if (bufSize > MAX_READ_SIZE) {
-          bufSize = MAX_READ_SIZE;
-          hasMoreData = true;
-        }
-        byte[] buf = new byte[(int) bufSize];
-
-        long curOffset = fileReadOffset;
-
-        int bufferRead = reader.read(buf);
-        assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic
is broken: "
-            + " pointer is "
-            + reader.getFilePointer()
-            + " but offset is "
-            + fileReadOffset + bufSize;
-
-        int bytesUsed = extractRecords(dest,
-            fileReadOffset + offsetOfFirstByte, buf);
-
-        // === WARNING ===
-        // If we couldn't found a complete record AND
-        // we cannot read more, i.e bufferRead == MAX_READ_SIZE
-        // it's because the record is too BIG
-        // So log.warn, and drop current buffer so we can keep moving
-        // instead of being stopped at that point for ever
-        if (bytesUsed == 0 && bufferRead == MAX_READ_SIZE) {
-          log.warn("bufferRead == MAX_READ_SIZE AND bytesUsed == 0, droping current buffer:
startOffset="
-                  + curOffset
-                  + ", MAX_READ_SIZE="
-                  + MAX_READ_SIZE
-                  + ", for "
-                  + toWatch.getPath());
-          bytesUsed = buf.length;
-        }
-
-        fileReadOffset = fileReadOffset + bytesUsed;
-
-        log.debug("Adaptor|" + adaptorID + "|start|" + curOffset + "|end|"
-            + fileReadOffset);
+        hasMoreData = slurp(len, reader);
 
       } else {
         // file has rotated and no detection
@@ -354,26 +249,5 @@
     return hasMoreData;
   }
 
-  /**
-   * Extract records from a byte sequence
-   * 
-   * @param eq the queue to stick the new chunk[s] in
-   * @param buffOffsetInFile the byte offset in the stream at which buf[] begins
-   * @param buf the byte buffer to extract records from
-   * @return the number of bytes processed
-   * @throws InterruptedException
-   */
-  protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
-      byte[] buf) throws InterruptedException {
-    if(buf.length == 0)
-      return 0;
-    
-    ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
-        buffOffsetInFile + buf.length, buf, this);
-
-    eq.add(chunk);
-    return buf.length;
-  }
-
 
 }

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java?rev=817401&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
(added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
Mon Sep 21 20:25:47 2009
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+/**
+ * A base class for file tailing adaptors.  
+ * Intended to mandate as little policy as possible, and to use as 
+ * few system resources as possible.
+ */
+public class LWFTAdaptor extends AbstractAdaptor {
+  
+  /**
+   * This is the maximum amount we'll read from any one file before moving on to
+   * the next. This way, we get quick response time for other files if one file
+   * is growing rapidly.
+   */
+  public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024;
+  public static final String MAX_READ_SIZE_OPT = 
+      "chukwaAgent.fileTailingAdaptor.maxReadSize";
+
+  public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
+  
+  static Logger log;
+  protected static FileTailer tailer;
+  
+  static {
+    tailer = new FileTailer();
+    log = Logger.getLogger(FileTailingAdaptor.class);
+  }
+  
+  
+  /**
+   * next PHYSICAL offset to read
+   */
+  protected long fileReadOffset;
+
+  /**
+   * The logical offset of the first byte of the file
+   */
+  protected long offsetOfFirstByte = 0;
+  protected Configuration conf = null;
+  
+  File toWatch;
+
+  @Override
+  public void start(long offset) {
+    conf = control.getConfiguration();
+    MAX_READ_SIZE = conf.getInt(MAX_READ_SIZE_OPT, DEFAULT_MAX_READ_SIZE);
+    this.fileReadOffset = offset;    
+    tailer.startWatchingFile(this);
+  }
+  
+  /**
+   * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus()
+   */
+  public String getCurrentStatus() {
+    return type.trim() + " " + offsetOfFirstByte + " " + toWatch.getPath();
+  }
+
+  public String toString() {
+    return "Lightweight Tailer on " + toWatch;
+  }
+
+  public String getStreamName() {
+    return toWatch.getPath();
+  }
+
+  @Override
+  public String parseArgs(String params) { 
+    Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?");
+    Matcher m = cmd.matcher(params);
+    if (m.matches()) {
+      offsetOfFirstByte = Long.parseLong(m.group(1));
+      toWatch = new File(m.group(2));
+    } else {
+      toWatch = new File(params.trim());
+    }
+    return toWatch.getAbsolutePath();
+  }
+  
+
+  @Override
+  public void hardStop() throws AdaptorException {
+    shutdown(AdaptorShutdownPolicy.HARD_STOP);
+  }
+  
+  @Override
+  public long shutdown() throws AdaptorException {
+    return shutdown(AdaptorShutdownPolicy.GRACEFULLY);
+  }
+
+  @Override
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+      throws AdaptorException {
+    tailer.stopWatchingFile(this);
+    return fileReadOffset + offsetOfFirstByte;
+  }
+  
+
+  /**
+   * Extract records from a byte sequence
+   * 
+   * @param eq the queue to stick the new chunk[s] in
+   * @param buffOffsetInFile the byte offset in the stream at which buf[] begins
+   * @param buf the byte buffer to extract records from
+   * @return the number of bytes processed
+   * @throws InterruptedException
+   */
+  protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
+      byte[] buf) throws InterruptedException {
+    if(buf.length == 0)
+      return 0;
+    
+    ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
+        buffOffsetInFile + buf.length, buf, this);
+
+    eq.add(chunk);
+    return buf.length;
+  }
+  
+  protected boolean slurp(long len, RandomAccessFile reader) throws IOException,
+  InterruptedException{
+    boolean hasMoreData = false;
+
+    log.debug("Adaptor|" + adaptorID + "|seeking|" + fileReadOffset);
+    reader.seek(fileReadOffset);
+
+    long bufSize = len - fileReadOffset;
+
+   if (bufSize > MAX_READ_SIZE) {
+      bufSize = MAX_READ_SIZE;
+      hasMoreData = true;
+    }
+    byte[] buf = new byte[(int) bufSize];
+
+    long curOffset = fileReadOffset;
+
+    int bufferRead = reader.read(buf);
+    assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic
is broken: "
+        + " pointer is "
+        + reader.getFilePointer()
+        + " but offset is "
+        + fileReadOffset + bufSize;
+
+    int bytesUsed = extractRecords(dest,
+        fileReadOffset + offsetOfFirstByte, buf);
+
+    // === WARNING ===
+    // If we couldn't found a complete record AND
+    // we cannot read more, i.e bufferRead == MAX_READ_SIZE
+    // it's because the record is too BIG
+    // So log.warn, and drop current buffer so we can keep moving
+    // instead of being stopped at that point for ever
+    if (bytesUsed == 0 && bufferRead == MAX_READ_SIZE) {
+      log.warn("bufferRead == MAX_READ_SIZE AND bytesUsed == 0, droping current buffer: startOffset="
+              + curOffset
+              + ", MAX_READ_SIZE="
+              + MAX_READ_SIZE
+              + ", for "
+              + toWatch.getPath());
+      bytesUsed = buf.length;
+    }
+
+    fileReadOffset = fileReadOffset + bytesUsed;
+
+    log.debug("Adaptor|" + adaptorID + "|start|" + curOffset + "|end|"
+        + fileReadOffset);
+    return hasMoreData;
+  }
+  
+  public synchronized boolean tailFile(ChunkReceiver eq)
+  throws InterruptedException {
+    boolean hasMoreData = false;
+    try {
+      long len = toWatch.length();
+      if(len < fileReadOffset) {
+        //file shrank; probably some data went missing.
+        handleShrunkenFile(len);
+        
+      } else if(len > fileReadOffset) {
+        RandomAccessFile reader = new RandomAccessFile(toWatch, "r");
+        slurp(len, reader);
+        reader.close();
+      }
+    } catch(IOException e) {
+      log.warn("IOException in tailer", e);
+      deregisterAndStop(false);
+    }
+    
+    return hasMoreData;
+  }
+
+  private void handleShrunkenFile(long measuredLen) {
+    offsetOfFirstByte = measuredLen;
+    fileReadOffset = 0;
+  }
+
+}

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java?rev=817401&r1=817400&r2=817401&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
Mon Sep 21 20:25:47 2009
@@ -37,8 +37,19 @@
     chunks = new ChunkCatcherConnector();
     chunks.start();
   }
+  
+  public void testRawAdaptor() throws Exception {
+    System.out.println("testing raw fta");
+    runTest("FileTailingAdaptor"); 
+  }
+
+
+  public void testLWRawAdaptor() throws Exception {
+    System.out.println("testing lightweight fta");
+    runTest("LWFTAdaptor"); 
+  }
 
-  public void testRawAdaptor() throws IOException, InterruptedException,
+  public void runTest(String name) throws IOException, InterruptedException,
       ChukwaAgent.AlreadyRunningException {
 
     // Remove any adaptor left over from previous run
@@ -49,10 +60,12 @@
 
     File testFile = makeTestFile("chukwaRawTest", 80);
     String adaptorId = agent
-        .processAddCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.FileTailingAdaptor"
+        .processAddCommand("add org.apache.hadoop.chukwa.datacollection.adaptor."
+            +"filetailer." + name
             + " raw " + testFile + " 0");
     assertNotNull(adaptorId);
-    Chunk c = chunks.waitForAChunk();
+    Chunk c = chunks.waitForAChunk(1000);
+    assertNotNull(c);
     assertEquals(testFile.length(), c.getData().length);
     assertTrue(c.getDataType().equals("raw"));
     assertTrue(c.getRecordOffsets().length == 1);
@@ -87,13 +100,5 @@
     return tmpOutput;
   }
 
-  public static void main(String[] args) {
-    try {
-      TestRawAdaptor tests = new TestRawAdaptor();
-      tests.testRawAdaptor();
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-  }
 
 }

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java?rev=817401&r1=817400&r2=817401&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
Mon Sep 21 20:25:47 2009
@@ -39,8 +39,9 @@
     chunks = new ChunkCatcherConnector();
     chunks.start();
   }
+  
 
-  public void testStartAtOffset() throws IOException, InterruptedException,
+  public void startAtOffset() throws IOException, InterruptedException,
       ChukwaAgent.AlreadyRunningException {
     Configuration conf = new Configuration();
     conf.set("chukwaAgent.control.port", "0");
@@ -49,7 +50,8 @@
     File testFile = makeTestFile();
     int startOffset = 0; // skip first line
     String adaptorId = agent
-        .processAddCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8
"
+       .processAddCommand("add org.apache.hadoop.chukwa.datacollection.adaptor"+
+            "filetailer.CharFileTailingAdaptorUTF8 "
             + "lines " + startOffset + " " + testFile + " " + startOffset);
     assertTrue(adaptorId != null);
     System.out.println("getting a chunk...");



Mime
View raw message