chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r910627 - in /hadoop/chukwa/trunk: CHANGES.txt src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
Date Tue, 16 Feb 2010 18:03:35 GMT
Author: asrabkin
Date: Tue Feb 16 18:03:34 2010
New Revision: 910627

URL: http://svn.apache.org/viewvc?rev=910627&view=rev
Log:
CHUKWA-454. DirTailingAdaptor can filter files. Contributed by Gerrit Jansen van Vuuren.

Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=910627&r1=910626&r2=910627&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Tue Feb 16 18:03:34 2010
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-454. DirTailingAdaptor can filter files. (Gerrit Jansen van Vuuren via asrabkin)
+
     CHUKWA-449. Utility to generate sequence file from log file. (Bill Graham via asrabkin)
 
     CHUKWA-440. Enable addon jar file for Demux from Distributed Cache. (Eric Yang)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java?rev=910627&r1=910626&r2=910627&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
Tue Feb 16 18:03:34 2010
@@ -17,10 +17,14 @@
  */
 package org.apache.hadoop.chukwa.datacollection.adaptor;
 
-import java.util.regex.*;
 import java.io.File;
 import java.io.IOException;
+
 import org.apache.log4j.Logger;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.FileFilterUtils;
+import org.apache.commons.io.filefilter.IOFileFilter;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
 
 /**
  *  Explore a whole directory hierarchy, looking for files to tail. 
@@ -29,9 +33,10 @@
  *   
  *  Offset param is used to track last finished scan.
  *  
- *  Mandatory first parameter is a directory.  Mandatory second parameter
- *  is the name of an adaptor to start.  
- *  
+ *  Mandatory first parameter is a directory with an optional unix style file
+ *  filter. Mandatory second parameter
+ *  is the name of an adaptor to start.
+ * 
  *  If the specified directory does not exist, the DirTailer will continue
  *  running, and will start tailing if the directory is later created.
  *
@@ -47,8 +52,8 @@
   String baseDirName; 
   long scanInterval;
   String adaptorName; //name of adaptors to start
-  
-  static Pattern cmd = Pattern.compile("(.+)\\s+(\\S+)");
+  IOFileFilter fileFilter;
+
   @Override
   public void start(long offset) throws AdaptorException {
     scanInterval = control.getConfiguration().getInt("adaptor.dirscan.intervalMs", 10000);
@@ -86,19 +91,23 @@
   private void scanDirHierarchy(File dir) throws IOException {
     if(!dir.exists())
       return;
-    if(!dir.isDirectory() ) {
-      //Don't start tailing if we would have gotten it on the last pass 
+    if(!dir.isDirectory()) {
+      //Don't start tailing if we would have gotten it on the last pass
       if(dir.lastModified() >= lastSweepStartTime) {
-        String newAdaptorID = control.processAddCommand(
-            "add " + adaptorName +" " + type + " " + dir.getCanonicalPath() + " 0");
-        log.info("DirTailingAdaptor " + adaptorID +  "  started new adaptor " + newAdaptorID);
-      } 
-    } else {
-      for(File f: dir.listFiles()) {
-        scanDirHierarchy(f);
+            String newAdaptorID = control.processAddCommand(
+                "add " + adaptorName +" " + type + " " + dir.getCanonicalPath() + " 0");
+            log.info("DirTailingAdaptor " + adaptorID +  "  started new adaptor " + newAdaptorID);
+       }
+      
+      } else {
+        log.info("Scanning directory: " + dir.getName());
+        
+        for(Object f: FileUtils.listFiles(dir, fileFilter, FileFilterUtils.trueFileFilter()))
{
+         scanDirHierarchy((File)f);
+        }
       }
-    }
   }
+  
 
   @Override
   public String getCurrentStatus() {
@@ -107,14 +116,23 @@
 
   @Override
   public String parseArgs(String status) {
-    Matcher m = cmd.matcher(status);
-    if(!m.matches() ) {
-      log.warn("bad syntax in DirTailingAdaptor args");
-      return null;
+    
+    String[] args = status.split(" ");
+     
+    if(args.length == 2){
+     baseDir = new File(args[0]);
+     fileFilter = FileFilterUtils.trueFileFilter();
+     adaptorName = args[1];
+    }else if(args.length == 3){
+     baseDir = new File(args[0]);
+     fileFilter = new WildcardFileFilter(args[1]);
+     adaptorName = args[2]; 
+    }else{
+     log.warn("bad syntax in DirTailingAdaptor args");
+     return null;
     }
-    baseDir = new File(m.group(1));
-    adaptorName = m.group(2);
-    return baseDir + " " + adaptorName; //both params mandatory
+    
+    return (args.length == 2)? baseDir + " " + adaptorName : baseDir + " " + fileFilter +
" " + adaptorName;  //both params mandatory
   }
 
   @Override
@@ -125,3 +143,4 @@
   }
 
 }
+

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java?rev=910627&r1=910626&r2=910627&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
Tue Feb 16 18:03:34 2010
@@ -32,7 +32,87 @@
   ChukwaAgent agent;
   File baseDir;
   static final int SCAN_INTERVAL = 1000;
+  
+  /**
+   * This test is exactly the same as testDirTailed except that it applies filtering and<br/>
+   * creates a file that should not be read inorder to test the filter.
+   * @throws IOException
+   * @throws ChukwaAgent.AlreadyRunningException
+   * @throws InterruptedException
+   */
+  public void testDirTailerFiltering() throws IOException,
+  ChukwaAgent.AlreadyRunningException, InterruptedException {
+    
+    DirTailingAdaptor.log.setLevel(Level.DEBUG);
+    
+    Configuration conf = new Configuration();
+    baseDir = new File(System.getProperty("test.build.data", "/tmp")).getCanonicalFile();
+    File checkpointDir = new File(baseDir, "dirtailerTestCheckpoints");
+    createEmptyDir(checkpointDir);
+    
+    conf.setInt("adaptor.dirscan.intervalMs", SCAN_INTERVAL);
+    conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getCanonicalPath());
+    conf.set("chukwaAgent.checkpoint.name", "checkpoint_");
+    conf.setInt("chukwaAgent.control.port", 0);
+    conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
+    
+    agent = new ChukwaAgent(conf);
+    File emptyDir = new File(baseDir, "emptyDir2");
+    createEmptyDir(emptyDir);
+    
+    assertEquals(0, agent.adaptorCount());
+    //check filtering with empty directory
+    agent.processAddCommand("add emptydir2= DirTailingAdaptor raw " + emptyDir + " *file
filetailer.CharFileTailingAdaptorUTF8 0");
+    assertEquals(1, agent.adaptorCount());
 
+    File dirWithFile = new File(baseDir, "dir3");
+    dirWithFile.delete();
+    assertFalse("temp directory not empty",dirWithFile.exists());
+      
+    //this file should be found by the filter
+    dirWithFile.mkdir();
+    File inDir = File.createTempFile("atemp", "file", dirWithFile);
+    inDir.deleteOnExit();
+    //This file should not be found by the filter
+    File noreadFile = File.createTempFile("atemp", "noread", dirWithFile);
+    noreadFile.deleteOnExit();
+    
+    //apply filter *file
+    agent.processAddCommand("add dir3= DirTailingAdaptor raw " + dirWithFile + " *file filetailer.CharFileTailingAdaptorUTF8
0");
+    Thread.sleep(3000);
+    assertEquals(3, agent.adaptorCount());
+    agent.shutdown();
+
+    conf.setBoolean("chukwaAgent.checkpoint.enabled", true);
+    Thread.sleep(500); //wait a little bit to make sure new file ts is > last checkpoint
time.
+    File anOldFile = File.createTempFile("oldXYZ","file", dirWithFile);
+    File aNewFile = File.createTempFile("new", "file", dirWithFile);
+    anOldFile.deleteOnExit();
+    aNewFile.deleteOnExit();
+    anOldFile.setLastModified(10);//just after epoch
+    agent = new ChukwaAgent(conf); //restart agent.
+    
+   Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
+   assertTrue(aNewFile.exists());
+   
+    //make sure we started tailing the new, not the old, file.
+    for(Map.Entry<String, String> adaptors : agent.getAdaptorList().entrySet()) {
+      System.out.println(adaptors.getKey() +": " + adaptors.getValue());
+      assertFalse(adaptors.getValue().contains("oldXYZ"));
+    }
+    //should be four adaptors: the DirTailer on emptyDir, the DirTailer on the full dir,
+    //and FileTailers for File inDir and file newfile and not the noread file.
+    assertEquals(4, agent.adaptorCount());
+    agent.shutdown();
+    
+    nukeDirContents(checkpointDir);//nuke dir
+    checkpointDir.delete();
+    emptyDir.delete();
+    nukeDirContents(dirWithFile);
+    dirWithFile.delete();
+  }
+
+  
   public void testDirTailer() throws IOException,
   ChukwaAgent.AlreadyRunningException, InterruptedException {
     
@@ -64,7 +144,7 @@
     dirWithFile.mkdir();
     File inDir = File.createTempFile("atemp", "file", dirWithFile);
     inDir.deleteOnExit();
-    agent.processAddCommand("add dir2= DirTailingAdaptor raw " + dirWithFile + " filetailer.CharFileTailingAdaptorUTF8
0");
+    agent.processAddCommand("add dir2= DirTailingAdaptor raw " + dirWithFile + " *file filetailer.CharFileTailingAdaptorUTF8
0");
     Thread.sleep(3000);
     assertEquals(3, agent.adaptorCount());
     System.out.println("DirTailingAdaptor looks OK before restart");



Mime
View raw message