chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r806332 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/ src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ src/test/org/apac...
Date Thu, 20 Aug 2009 20:01:21 GMT
Author: asrabkin
Date: Thu Aug 20 20:01:21 2009
New Revision: 806332

URL: http://svn.apache.org/viewvc?rev=806332&view=rev
Log:
CHUKWA-380. FTA shouldn't emit empty chunks.

Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=806332&r1=806331&r2=806332&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Thu Aug 20 20:01:21 2009
@@ -122,6 +122,8 @@
 
   BUG FIXES
 
+    CHUKWA-380. FTA shouldn't emit empty chunks. (asrabkin)
+
     CHUKWA-378. Disable TestArchive unit test. (asrabkin)
 
     CHUKWA-377. Revised xtrace adaptor code. (asrabkin)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java?rev=806332&r1=806331&r2=806332&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
Thu Aug 20 20:01:21 2009
@@ -103,9 +103,10 @@
           c.setRecordOffsets(carriageReturns);
         } // else we get default one record
 
-        dest.add(c);
+
         //We can't replay exec data, so we might as well commit to it now.
         control.reportCommit(ExecAdaptor.this, sendOffset);
+        dest.add(c);
       } catch (JSONException e) {
         log.warn(e);
       } catch (InterruptedException e) {

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java?rev=806332&r1=806331&r2=806332&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
Thu Aug 20 20:01:21 2009
@@ -121,15 +121,6 @@
   
   static {
     tailer = new FileAdaptorTailer();
-    ChukwaAgent agent = ChukwaAgent.getAgent();
-    if (agent != null) {
-      Configuration conf = agent.getConfiguration();
-      if (conf != null) {
-        TIMEOUT_PERIOD = conf.getInt(
-            "chukwaAgent.adaptor.fileadaptor.timeoutperiod",
-            DEFAULT_TIMEOUT_PERIOD);
-      }
-    }
   }
   
   private long startTime = 0;
@@ -151,6 +142,9 @@
     log.info("adaptor id: " + adaptorID + " started file adaptor on file "
         + toWatch);
     this.startTime = System.currentTimeMillis();
+    TIMEOUT_PERIOD = control.getConfiguration().getInt(
+        "chukwaAgent.adaptor.fileadaptor.timeoutperiod",
+        DEFAULT_TIMEOUT_PERIOD);
     this.timeOut = startTime + TIMEOUT_PERIOD;
     
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=806332&r1=806331&r2=806332&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
Thu Aug 20 20:01:21 2009
@@ -51,7 +51,7 @@
   public static int MAX_RETRIES = 300;
   public static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
 
-  protected static Configuration conf = null;
+  protected Configuration conf = null;
   private int attempts = 0;
   private long gracefulPeriodExpired = 0l;
   private boolean adaptorInError = false;
@@ -75,6 +75,13 @@
   }
 
   public void start(long bytes) {
+    
+    conf = control.getConfiguration();
+    MAX_READ_SIZE = conf.getInt(
+        "chukwaAgent.fileTailingAdaptor.maxReadSize",
+        DEFAULT_MAX_READ_SIZE);
+    log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: "
+        + MAX_READ_SIZE);
     this.attempts = 0;
 
     log.info("started file tailer on file " + toWatch
@@ -287,27 +294,7 @@
 
         long bufSize = len - fileReadOffset;
 
-        if (conf == null) {
-          ChukwaAgent agent = ChukwaAgent.getAgent();
-          if (agent != null) {
-            conf = agent.getConfiguration();
-            if (conf != null) {
-              MAX_READ_SIZE = conf.getInt(
-                  "chukwaAgent.fileTailingAdaptor.maxReadSize",
-                  DEFAULT_MAX_READ_SIZE);
-              log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: "
-                  + MAX_READ_SIZE);
-            } else {
-              log.info("Conf is null, running in default mode");
-              conf = new Configuration();
-            }
-          } else {
-            log.info("Agent is null, running in default mode");
-            conf = new Configuration();
-          }
-        }
-
-        if (bufSize > MAX_READ_SIZE) {
+       if (bufSize > MAX_READ_SIZE) {
           bufSize = MAX_READ_SIZE;
           hasMoreData = true;
         }
@@ -378,6 +365,9 @@
    */
   protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
       byte[] buf) throws InterruptedException {
+    if(buf.length == 0)
+      return 0;
+    
     ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
         buffOffsetInFile + buf.length, buf, this);
 

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java?rev=806332&r1=806331&r2=806332&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
Thu Aug 20 20:01:21 2009
@@ -73,15 +73,16 @@
     anOldFile.setLastModified(10);//just after epoch
     agent = new ChukwaAgent(conf); //restart agent.
     
-    //should be four adaptors: the DirTailer on emptyDir, the DirTailer on the full dir,
-    //and FileTailers for File inDir and file newfile
-    assertEquals(4, agent.adaptorCount());
-    
+
     //make sure we started tailing the new, not the old, file.
     for(Map.Entry<String, String> adaptors : agent.getAdaptorList().entrySet()) {
       System.out.println(adaptors.getKey() +": " + adaptors.getValue());
       assertFalse(adaptors.getValue().contains("oldXYZ"));
     }
+    //should be four adaptors: the DirTailer on emptyDir, the DirTailer on the full dir,
+    //and FileTailers for File inDir and file newfile
+    assertEquals(4, agent.adaptorCount());
+    
     nukeDirContents(checkpointDir);//nuke dir
     checkpointDir.delete();
     emptyDir.delete();

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java?rev=806332&r1=806331&r2=806332&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
Thu Aug 20 20:01:21 2009
@@ -60,7 +60,7 @@
     assertEquals(0, agent.adaptorCount());
     String lsID = agent.processAddCommand(
       "add exec= org.apache.hadoop.chukwa.datacollection.adaptor.ExecAdaptor Listing 100
/bin/sleep 1 0");
-    Thread.sleep( 25*1000); //RAISE THIS to test longer
+    Thread.sleep( 5*1000); //RAISE THIS to test longer
     System.out.println("stopped ok"); 
   }
 

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java?rev=806332&r1=806331&r2=806332&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
Thu Aug 20 20:01:21 2009
@@ -39,6 +39,7 @@
   
   public TestFileAdaptor() throws IOException {
     baseDir = new File(System.getProperty("test.build.data", "/tmp"));
+    conf.setInt("chukwaAgent.control.port", 0);
     conf.set("chukwaAgent.checkpoint.dir", baseDir.getCanonicalPath());
     conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
     conf.setInt("chukwaAgent.adaptor.fileadaptor.timeoutperiod", 100);
@@ -74,7 +75,8 @@
 
     agent.processAddCommand("add test = FileAdaptor raw " +testFile.getCanonicalPath() +
" 0");
     assertEquals(1, agent.adaptorCount());
-    Chunk c = chunks.waitForAChunk();
+    Chunk c = chunks.waitForAChunk(5000);
+    assertNotNull(c);
     String dat = new String(c.getData());
     assertTrue(dat.startsWith("0 abcdefghijklmnopqrstuvwxyz"));
     assertTrue(dat.endsWith("9 abcdefghijklmnopqrstuvwxyz\n"));
@@ -84,7 +86,7 @@
   
   public void testRepeatedly() throws IOException,
   ChukwaAgent.AlreadyRunningException, InterruptedException {
-    int tests = 100; //SHOULD SET HIGHER AND WATCH WITH lsof to find leaks
+    int tests = 10; //SHOULD SET HIGHER AND WATCH WITH lsof to find leaks
 
     ChukwaAgent agent = new ChukwaAgent(conf);
     for(int i=0; i < tests; ++i) {
@@ -94,7 +96,8 @@
       assertEquals(0, agent.adaptorCount());
       agent.processAddCommand("add test = FileAdaptor raw " +testFile.getCanonicalPath()
+ " 0");
       assertEquals(1, agent.adaptorCount());
-      Chunk c = chunks.waitForAChunk();
+      Chunk c = chunks.waitForAChunk(5000);
+      assertNotNull(c);
       String dat = new String(c.getData());
       assertTrue(dat.startsWith("0 abcdefghijklmnopqrstuvwxyz"));
       assertTrue(dat.endsWith("9 abcdefghijklmnopqrstuvwxyz\n"));

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java?rev=806332&r1=806331&r2=806332&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
Thu Aug 20 20:01:21 2009
@@ -37,14 +37,6 @@
       Configuration conf = new ChukwaConfiguration();
       conf.set("chukwaAgent.control.port", "0");
       agent = new ChukwaAgent(conf);
-      // Remove any adaptor left over from previous run
-      
- 
-      ChukwaAgentController cli = new ChukwaAgentController("localhost", agent.getControllerPort());
-      cli.removeAll();
-      // sleep for some time to make sure we don't get chunk from existing
-      // streams
-      Thread.sleep(5000);
 
       FileTailingAdaptor.GRACEFUL_PERIOD = 30 * 1000;
 

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java?rev=806332&r1=806331&r2=806332&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
Thu Aug 20 20:01:21 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+import org.apache.hadoop.conf.Configuration;
 
 public class TestRawAdaptor extends TestCase {
   ChunkCatcherConnector chunks;
@@ -40,32 +41,38 @@
   public void testRawAdaptor() throws IOException, InterruptedException,
       ChukwaAgent.AlreadyRunningException {
 
-    ChukwaAgent agent = new ChukwaAgent();
     // Remove any adaptor left over from previous run
-    ChukwaConfiguration cc = new ChukwaConfiguration();
-    int portno = cc.getInt("chukwaAgent.control.port", 9093);
-    ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
-    cli.removeAll();
-    // sleep for some time to make sure we don't get chunk from existing streams
-    Thread.sleep(5000);
+    Configuration conf = new Configuration();
+    conf.set("chukwaAgent.control.port", "0");
+    conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
+    ChukwaAgent agent = new ChukwaAgent(conf);
+
     File testFile = makeTestFile("chukwaRawTest", 80);
     String adaptorId = agent
         .processAddCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.FileTailingAdaptor"
             + " raw " + testFile + " 0");
     assertNotNull(adaptorId);
     Chunk c = chunks.waitForAChunk();
-    while (!c.getDataType().equals("raw")) {
-      c = chunks.waitForAChunk();
-    }
+    assertEquals(testFile.length(), c.getData().length);
     assertTrue(c.getDataType().equals("raw"));
     assertTrue(c.getRecordOffsets().length == 1);
     assertTrue(c.getSeqID() == testFile.length());
+    
+    c = chunks.waitForAChunk(1000);
+    assertNull(c);
+    
     agent.stopAdaptor(adaptorId, false);
     agent.shutdown();
-    Thread.sleep(2000);
   }
 
-  private File makeTestFile(String name, int size) throws IOException {
+  /**
+   * 
+   * @param name
+   * @param size size in lines
+   * @return
+   * @throws IOException
+   */
+  public static File makeTestFile(String name, int size) throws IOException {
     File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
         name);
     FileOutputStream fos = new FileOutputStream(tmpOutput);

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java?rev=806332&r1=806331&r2=806332&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
Thu Aug 20 20:01:21 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+import org.apache.hadoop.conf.Configuration;
 import junit.framework.TestCase;
 
 public class TestStartAtOffset extends TestCase {
@@ -41,14 +42,10 @@
 
   public void testStartAtOffset() throws IOException, InterruptedException,
       ChukwaAgent.AlreadyRunningException {
-    ChukwaAgent agent = new ChukwaAgent();
-    // Remove any adaptor left over from previous run
-    ChukwaConfiguration cc = new ChukwaConfiguration();
-    int portno = cc.getInt("chukwaAgent.control.port", 9093);
-    ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
-    cli.removeAll();
-    // sleep for some time to make sure we don't get chunk from existing streams
-    Thread.sleep(5000);
+    Configuration conf = new Configuration();
+    conf.set("chukwaAgent.control.port", "0");
+    conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
+    ChukwaAgent agent = new ChukwaAgent(conf);
     File testFile = makeTestFile();
     int startOffset = 0; // skip first line
     String adaptorId = agent
@@ -75,19 +72,14 @@
     assertTrue(c.getDataType().equals("lines"));
     agent.stopAdaptor(adaptorId, false);
     agent.shutdown();
-    Thread.sleep(2000);
   }
 
   public void testStartAfterOffset() throws IOException, InterruptedException,
       ChukwaAgent.AlreadyRunningException {
-    ChukwaAgent agent = new ChukwaAgent();
-    // Remove any adaptor left over from previous run
-    ChukwaConfiguration cc = new ChukwaConfiguration();
-    int portno = cc.getInt("chukwaAgent.control.port", 9093);
-    ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
-    cli.removeAll();
-    // sleep for some time to make sure we don't get chunk from existing streams
-    Thread.sleep(5000);
+    Configuration conf = new Configuration();
+    conf.set("chukwaAgent.control.port", "0");
+    conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
+    ChukwaAgent agent = new ChukwaAgent(conf);
     File testFile = makeTestFile();
     int startOffset = 0;
     String adaptorId = agent
@@ -120,7 +112,6 @@
     assertTrue(c.getDataType().equals("lines"));
     agent.stopAdaptor(adaptorId, false);
     agent.shutdown();
-    Thread.sleep(2000);
   }
 
   private File makeTestFile() throws IOException {

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java?rev=806332&r1=806331&r2=806332&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java
Thu Aug 20 20:01:21 2009
@@ -30,12 +30,14 @@
   
   class Interruptor extends TimerTask {
     Thread targ;
+    volatile boolean deactivate = false;
     Interruptor(Thread t) {
       targ =t;
     }
     
-    public void run() {
-      targ.interrupt();
+    public synchronized void run() {
+      if(!deactivate)
+        targ.interrupt();
     }
   };
 
@@ -44,12 +46,21 @@
     tm = new Timer();
   }
 
-  public Chunk waitForAChunk(long ms) throws InterruptedException {
+  public Chunk waitForAChunk(long ms) {
     
     ArrayList<Chunk> chunks = new ArrayList<Chunk>();
+    Interruptor i = new Interruptor(Thread.currentThread());
     if(ms > 0)
-      tm.schedule(new Interruptor(Thread.currentThread()), ms);
-    eq.collect(chunks, 1);
+      tm.schedule(i, ms);
+    try {
+      eq.collect(chunks, 1);
+      synchronized(i) {
+        i.deactivate = true;
+      }
+    } catch(InterruptedException e) {
+      Thread.interrupted();
+      return null;
+    }
     return chunks.get(0);
   }
   



Mime
View raw message