chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r785462 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ src/java/org/apache/hadoop/chukwa/datacollection/agent/ src/test/org/apache/hadoop/chukwa/datacollection/adaptor/
Date Wed, 17 Jun 2009 01:02:08 GMT
Author: asrabkin
Date: Wed Jun 17 01:02:07 2009
New Revision: 785462

URL: http://svn.apache.org/viewvc?rev=785462&view=rev
Log:
CHUKWA-185. Ability to tail a whole directory

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=785462&r1=785461&r2=785462&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Wed Jun 17 01:02:07 2009
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-185. Ability to tail a whole directory. (asrabkin)
+
     CHUKWA-280. Added end to end test to detect iostat overflow. (Eric Yang)
 
     CHUKWA-194.  Backfilling tools.  (Jerome Boulon via asrabkin)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java?rev=785462&r1=785461&r2=785462&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
Wed Jun 17 01:02:07 2009
@@ -39,8 +39,8 @@
     this.adaptorID = adaptorID;
     this.type = type;
     this.dest=dest;
-    start(status, offset);
     control = c;
+    start(status, offset);
   }
   
   public abstract void start(String status, long offset) throws AdaptorException;

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java?rev=785462&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
(added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
Wed Jun 17 01:02:07 2009
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.util.regex.*;
+import java.io.File;
+import java.io.IOException;
+import org.apache.log4j.Logger;
+
+/**
+ *  Explore a whole directory hierarchy, looking for files to tail. 
+ *   DirTailingAdaptor will not try to start tailing a file more than once,
+ *   if the file hasn't been modified in the interim.  
+ *   
+ *  Offset param is used to track last finished scan.
+ *  
+ *  Mandatory first parameter is a directory.  Mandatory second parameter
+ *  is the name of an adaptor to start.  
+ *
+ */
+public class DirTailingAdaptor extends AbstractAdaptor implements Runnable {
+  
+  static Logger log = Logger.getLogger(DirTailingAdaptor.class); 
+  
+  Thread scanThread = new Thread(this);
+  long lastSweepStartTime;
+  volatile boolean continueScanning=true;
+  File baseDir;
+  long scanInterval;
+  String adaptorName;
+  
+  
+
+  static Pattern cmd = Pattern.compile("(.+)\\s+(\\S+)");
+  @Override
+  public void start(String status, long offset) throws AdaptorException {
+    scanInterval = control.getConfiguration().getInt("adaptor.dirscan.intervalMs", 10000);
+    Matcher m = cmd.matcher(status);
+    if(!m.matches() )
+      throw new AdaptorException("bad syntax for DirTailer");
+    else if (m.groupCount() < 2)
+      throw new AdaptorException("bad syntax for DirTailer");
+    baseDir = new File(m.group(1));
+    adaptorName = m.group(2);
+    
+    scanThread.start();
+    lastSweepStartTime = offset;
+  }
+  
+  public void run() {
+    try {
+      while(continueScanning) {
+        try {
+          long sweepStartTime = System.currentTimeMillis();
+          scanDirHierarchy(baseDir);
+          lastSweepStartTime=sweepStartTime;
+          control.reportCommit(this, lastSweepStartTime);
+          Thread.sleep(scanInterval);
+        } catch(IOException e) {
+          log.warn(e);
+        }
+      }
+    } catch(InterruptedException e) {
+    }
+  }
+  
+  /*
+   * Coded recursively.  Base case is a single non-dir file.
+   */
+  private void scanDirHierarchy(File dir) throws IOException {
+    if(!dir.isDirectory() ) {
+      //Don't start tailing if we would have gotten it on the last pass 
+      if(dir.lastModified() >= lastSweepStartTime) {
+        control.processAddCommand(
+            "add " + adaptorName +" " + type + " " + dir.getCanonicalPath() + " 0");
+      }
+    } else {
+      for(File f: dir.listFiles()) {
+        scanDirHierarchy(f);
+      }
+    }
+  }
+
+  @Override
+  public String getCurrentStatus() throws AdaptorException {
+    try {
+      return type + " " + baseDir.getCanonicalPath()+ " " + adaptorName+ " " + lastSweepStartTime;
+    } catch(IOException e) {
+      throw new AdaptorException(e);
+    }
+  }
+
+  @Override
+  public String getStreamName() {
+    return "dir scan of " + baseDir;
+  }
+
+
+  @Deprecated
+  public long shutdown() throws AdaptorException {
+    return shutdown(AdaptorShutdownPolicy.GRACEFULLY);
+  }
+
+  @Deprecated
+  public void hardStop() throws AdaptorException {
+    shutdown(AdaptorShutdownPolicy.HARD_STOP);
+  }
+
+  @Override
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+      throws AdaptorException {
+    continueScanning = false;
+    
+    return lastSweepStartTime;
+  }
+
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java?rev=785462&r1=785461&r2=785462&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java
Wed Jun 17 01:02:07 2009
@@ -35,6 +35,18 @@
   Adaptor getAdaptor(String id);
   String processAddCommand(String cmd);
   Map<String, String> getAdaptorList();
+  
+  /**
+   * Called to update the Agent status table.
+   * 
+   * Most adaptors should not call this. It is designed for adaptors that do
+   * some sort of local operation that needs checkpointing, but that doesn't
+   * emit chunks.  For instance, DirTailingAdaptor uses it to track sweeps. 
+   *  
+   * @param src the adaptor in question
+   * @param uuid the number to record as checkpoint.  Must be monotonically increasing.
+   */
+  public void reportCommit(Adaptor src, long uuid);
 
   static AdaptorManager NULL = new AdaptorManager() {
 
@@ -67,6 +79,10 @@
     public long stopAdaptor(String id, boolean gracefully) {
       return 0;
     }
+    
+    @Override
+    public void reportCommit(Adaptor a, long l) {
+    }
   };
   
 }

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java?rev=785462&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
(added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
Wed Jun 17 01:02:07 2009
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.conf.*;
+import junit.framework.TestCase;
+
+public class TestDirTailingAdaptor extends TestCase {
+
+  ChukwaAgent agent;
+  File baseDir;
+
+  public void testDirTailer() throws IOException,
+  ChukwaAgent.AlreadyRunningException, InterruptedException {
+    
+    Configuration conf = new Configuration();
+    baseDir = new File(System.getProperty("test.build.data", "/tmp"));
+    conf.set("chukwaAgent.checkpoint.dir", baseDir.getCanonicalPath());
+    conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
+    
+    agent = new ChukwaAgent(conf);
+    File emptyDir = new File(baseDir, "emptyDir");
+    if(emptyDir.exists())
+      emptyDir.delete();
+    emptyDir.mkdir();
+    assertEquals(0, agent.adaptorCount());
+    agent.processAddCommand("add DirTailingAdaptor raw " + emptyDir + " filetailer.CharFileTailingAdaptorUTF8
0");
+    assertEquals(1, agent.adaptorCount());
+
+    File dirWithFile = new File(baseDir, "dir2");
+    dirWithFile.delete();
+    assertFalse( "temp directory not empty",dirWithFile.exists());
+      
+    dirWithFile.mkdir();
+    File inDir = File.createTempFile("atemp", "file", dirWithFile);
+    inDir.deleteOnExit();
+    agent.processAddCommand("add DirTailingAdaptor raw " + dirWithFile + " filetailer.CharFileTailingAdaptorUTF8
0");
+    Thread.sleep(3000);
+    assertEquals(3, agent.adaptorCount());
+    System.out.println("DirTailingAdaptor looks OK before restart");
+    agent.shutdown();
+
+    conf.setBoolean("chukwaAgent.checkpoint.enabled", true);
+
+    File anOldFile = File.createTempFile("oldXYZ","file", dirWithFile);
+    File aNewFile = File.createTempFile("new", "file", dirWithFile);
+    anOldFile.deleteOnExit();
+    aNewFile.deleteOnExit();
+    anOldFile.setLastModified(10);//just after epoch
+    agent = new ChukwaAgent(conf); //restart agent.
+    assertEquals(4, agent.adaptorCount());
+    
+    //make sure we started tailing the new, not the old, file.
+    for(Map.Entry<String, String> adaptors : agent.getAdaptorList().entrySet()) {
+      assertFalse(adaptors.getValue().contains("oldXYZ"));
+    }
+  }
+
+}



Mime
View raw message