flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject flume git commit: FLUME-2918. Speed up TaildirSource on directories with many files
Date Mon, 20 Jun 2016 08:56:45 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk 1b9e58915 -> 7d1e683fb


FLUME-2918. Speed up TaildirSource on directories with many files

This patch greatly improves the performance of TaildirSource on
directories that contain a large number of files.

(Attila Simon via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7d1e683f
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7d1e683f
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7d1e683f

Branch: refs/heads/trunk
Commit: 7d1e683fbd7d261fff9fcf17ad78fd8469c64905
Parents: 1b9e589
Author: Mike Percy <mpercy@cloudera.com>
Authored: Mon Jun 20 01:09:07 2016 -0700
Committer: Mike Percy <mpercy@cloudera.com>
Committed: Mon Jun 20 01:13:56 2016 -0700

----------------------------------------------------------------------
 .../taildir/ReliableTaildirEventReader.java     |  56 ++--
 .../apache/flume/source/taildir/TailFile.java   |   7 -
 .../flume/source/taildir/TaildirMatcher.java    | 278 +++++++++++++++++++
 .../flume/source/taildir/TaildirSource.java     |   3 +
 .../TaildirSourceConfigurationConstants.java    |   4 +
 .../source/taildir/TestTaildirMatcher.java      | 227 +++++++++++++++
 .../flume/source/taildir/TestTaildirSource.java |   2 +-
 7 files changed, 531 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/7d1e683f/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
index 5b6d465..8128df4 100644
--- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
@@ -20,18 +20,14 @@
 package org.apache.flume.source.taildir;
 
 import java.io.File;
-import java.io.FileFilter;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
 import java.nio.file.Files;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.regex.Pattern;
 
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
@@ -43,11 +39,9 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Table;
-import com.google.common.collect.Table.Cell;
 import com.google.gson.stream.JsonReader;
 
 @InterfaceAudience.Private
@@ -55,13 +49,14 @@ import com.google.gson.stream.JsonReader;
 public class ReliableTaildirEventReader implements ReliableEventReader {
   private static final Logger logger = LoggerFactory.getLogger(ReliableTaildirEventReader.class);
 
-  private final Table<String, File, Pattern> tailFileTable;
+  private final List<TaildirMatcher> taildirCache;
   private final Table<String, String, String> headerTable;
 
   private TailFile currentFile = null;
   private Map<Long, TailFile> tailFiles = Maps.newHashMap();
   private long updateTime;
   private boolean addByteOffset;
+  private boolean cachePatternMatching;
   private boolean committed = true;
 
   /**
@@ -69,7 +64,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader {
    */
   private ReliableTaildirEventReader(Map<String, String> filePaths,
       Table<String, String, String> headerTable, String positionFilePath,
-      boolean skipToEnd, boolean addByteOffset) throws IOException {
+      boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching) throws IOException
{
     // Sanity checks
     Preconditions.checkNotNull(filePaths);
     Preconditions.checkNotNull(positionFilePath);
@@ -79,21 +74,17 @@ public class ReliableTaildirEventReader implements ReliableEventReader
{
           new Object[] { ReliableTaildirEventReader.class.getSimpleName(), filePaths });
     }
 
-    Table<String, File, Pattern> tailFileTable = HashBasedTable.create();
+    List<TaildirMatcher> taildirCache = Lists.newArrayList();
     for (Entry<String, String> e : filePaths.entrySet()) {
-      File f = new File(e.getValue());
-      File parentDir =  f.getParentFile();
-      Preconditions.checkState(parentDir.exists(),
-        "Directory does not exist: " + parentDir.getAbsolutePath());
-      Pattern fileNamePattern = Pattern.compile(f.getName());
-      tailFileTable.put(e.getKey(), parentDir, fileNamePattern);
+      taildirCache.add(new TaildirMatcher(e.getKey(), e.getValue(), cachePatternMatching));
     }
-    logger.info("tailFileTable: " + tailFileTable.toString());
+    logger.info("taildirCache: " + taildirCache.toString());
     logger.info("headerTable: " + headerTable.toString());
 
-    this.tailFileTable = tailFileTable;
+    this.taildirCache = taildirCache;
     this.headerTable = headerTable;
     this.addByteOffset = addByteOffset;
+    this.cachePatternMatching = cachePatternMatching;
     updateTailFiles(skipToEnd);
 
     logger.info("Updating position from position file: " + positionFilePath);
@@ -238,12 +229,10 @@ public class ReliableTaildirEventReader implements ReliableEventReader
{
     updateTime = System.currentTimeMillis();
     List<Long> updatedInodes = Lists.newArrayList();
 
-    for (Cell<String, File, Pattern> cell : tailFileTable.cellSet()) {
-      Map<String, String> headers = headerTable.row(cell.getRowKey());
-      File parentDir = cell.getColumnKey();
-      Pattern fileNamePattern = cell.getValue();
+    for (TaildirMatcher taildir : taildirCache) {
+      Map<String, String> headers = headerTable.row(taildir.getFileGroup());
 
-      for (File f : getMatchFiles(parentDir, fileNamePattern)) {
+      for (File f : taildir.getMatchingFiles()) {
         long inode = getInode(f);
         TailFile tf = tailFiles.get(inode);
         if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
@@ -274,21 +263,6 @@ public class ReliableTaildirEventReader implements ReliableEventReader
{
     return updateTailFiles(false);
   }
 
-  private List<File> getMatchFiles(File parentDir, final Pattern fileNamePattern) {
-    FileFilter filter = new FileFilter() {
-      public boolean accept(File f) {
-        String fileName = f.getName();
-        if (f.isDirectory() || !fileNamePattern.matcher(fileName).matches()) {
-          return false;
-        }
-        return true;
-      }
-    };
-    File[] files = parentDir.listFiles(filter);
-    ArrayList<File> result = Lists.newArrayList(files);
-    Collections.sort(result, new TailFile.CompareByLastModifiedTime());
-    return result;
-  }
 
   private long getInode(File file) throws IOException {
     long inode = (long) Files.getAttribute(file.toPath(), "unix:ino");
@@ -313,6 +287,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader
{
     private String positionFilePath;
     private boolean skipToEnd;
     private boolean addByteOffset;
+    private boolean cachePatternMatching;
 
     public Builder filePaths(Map<String, String> filePaths) {
       this.filePaths = filePaths;
@@ -339,8 +314,13 @@ public class ReliableTaildirEventReader implements ReliableEventReader
{
       return this;
     }
 
+    public Builder cachePatternMatching(boolean cachePatternMatching) {
+      this.cachePatternMatching = cachePatternMatching;
+      return this;
+    }
+
     public ReliableTaildirEventReader build() throws IOException {
-      return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd,
addByteOffset);
+      return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd,
addByteOffset, cachePatternMatching);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/7d1e683f/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
index eabd357..cb36e41 100644
--- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
@@ -212,13 +212,6 @@ public class TailFile {
     }
   }
 
-  public static class CompareByLastModifiedTime implements Comparator<File> {
-    @Override
-    public int compare(File f1, File f2) {
-      return Long.valueOf(f1.lastModified()).compareTo(f2.lastModified());
-    }
-  }
-
   private class LineResult {
     final boolean lineSepInclude;
     final byte[] line;

http://git-wip-us.apache.org/repos/asf/flume/blob/7d1e683f/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java
new file mode 100644
index 0000000..245aef5
--- /dev/null
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.source.taildir;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.PathMatcher;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+/**
+ * Identifies and caches the files matched by single file pattern for <code>TAILDIR<code/>
source.
+ * <p></p>
+ * Since file patterns only apply to the fileNames and not the parent dictionaries, this
implementation
+ * checks the parent directory for modification (additional or removed files update modification
time of parent dir)
+ * If no modification happened to the parent dir that means the underlying files could only
be written to but no need
+ * to rerun the pattern matching on fileNames.
+ * <p></p>
+ * This implementation provides lazy caching or no caching. Instances of this class keep
the result
+ * file list from the last successful execution
+ * of {@linkplain #getMatchingFiles()} function invocation,
+ * and may serve the content without hitting the FileSystem for performance optimization.
+ * <p></p>
+ * <b>IMPORTANT:</b> It is assumed that the hosting system provides at least
second granularity for both
+ * <code>System.currentTimeMillis()</code> and <code>File.lastModified()</code>.
Also that system clock is used
+ * for file system timestamps. If it is not the case then configure it as uncached.
+ * Class is solely for package only usage. Member functions are not thread safe.
+ *
+ * @see TaildirSource
+ * @see ReliableTaildirEventReader
+ * @see TaildirSourceConfigurationConstants
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class TaildirMatcher {
+  private static final Logger logger = LoggerFactory.getLogger(TaildirMatcher.class);
+  private static final FileSystem FS = FileSystems.getDefault();
+
+  // flag from configuration to switch off caching completely
+  private final boolean cachePatternMatching;
+  // id from configuration
+  private final String fileGroup;
+  // plain string of the desired files from configuration
+  private final String filePattern;
+
+  // directory monitored for changes
+  private final File parentDir;
+  // cached instance for filtering files based on filePattern
+  private final DirectoryStream.Filter<Path> fileFilter;
+
+  // system time in milliseconds, stores the last modification time of the
+  // parent directory seen by the last check, rounded to seconds
+  // initial value is used in first check only when it will be replaced instantly (system
time is positive)
+  private long lastSeenParentDirMTime = -1;
+  // system time in milliseconds, time of the last check, rounded to seconds
+  // initial value is used in first check only when it will be replaced instantly (system
time is positive)
+  private long lastCheckedTime = -1;
+  // cached content, files which matched the pattern within the parent directory
+  private List<File> lastMatchedFiles = Lists.newArrayList();
+
+  /**
+   * Package accessible constructor. From configuration context it represents a single <code>filegroup</code>
+   * and encapsulates the corresponding <code>filePattern</code>.
+   * <code>filePattern</code> consists of two parts: first part has to be a valid
path to
+   * an existing parent directory, second part has to be a
+   * valid regex {@link java.util.regex.Pattern} that match any non-hidden file names within
parent directory.
+   * A valid example for filePattern is <code>/dir0/dir1/.*</code> given <code>/dir0/dir1</code>
+   * is an existing directory structure readable by the running user.
+   * <p></p>
+   * An instance of this class is created for each fileGroup
+   *
+   * @param fileGroup arbitrary name of the group given by the config
+   * @param filePattern parent directory plus regex pattern. No wildcards are allowed in
directory name
+   * @param cachePatternMatching default true, recommended in every setup especially with
huge parent directories.
+   *                         Don't set when local system clock is not used for stamping mtime
(eg: remote filesystems)
+   * @see TaildirSourceConfigurationConstants
+   */
+  TaildirMatcher(String fileGroup, String filePattern, boolean cachePatternMatching) {
+    // store whatever came from configuration
+    this.fileGroup = fileGroup;
+    this.filePattern = filePattern;
+    this.cachePatternMatching = cachePatternMatching;
+
+    // calculate final members
+    File f = new File(filePattern);
+    this.parentDir = f.getParentFile();
+    String regex = f.getName();
+    final PathMatcher matcher = FS.getPathMatcher("regex:" + regex);
+    this.fileFilter = new DirectoryStream.Filter<Path>() {
+      @Override
+      public boolean accept(Path entry) throws IOException {
+        return matcher.matches(entry.getFileName()) && !Files.isDirectory(entry);
+      }
+    };
+
+    // sanity check
+    Preconditions.checkState(parentDir.exists(),
+        "Directory does not exist: " + parentDir.getAbsolutePath());
+  }
+
+  /**
+   * Lists those files within the parentDir that match regex pattern passed in during object
instantiation.
+   * Designed for frequent periodic invocation {@link org.apache.flume.source.PollableSourceRunner}.
+   * <p></p>
+   * Based on the modification of the parentDir this function may trigger cache recalculation
by calling
+   * {@linkplain #getMatchingFilesNoCache()} or
+   * return the value stored in {@linkplain #lastMatchedFiles}.
+   * Parentdir is allowed to be a symbolic link.
+   * <p></p>
+   * Files returned by this call are weakly consistent (see {@link DirectoryStream}).
+   * It does not freeze the directory while iterating,
+   * so it may (or may not) reflect updates to the directory that occur during the call,
+   * In which case next call
+   * will return those files (as mtime is increasing it won't hit cache but trigger recalculation).
+   * It is guaranteed that invocation reflects every change which was observable at the time
of invocation.
+   * <p></p>
+   * Matching file list recalculation is triggered when caching was turned off or
+   * if mtime is greater than the previously seen mtime
+   * (including the case of cache hasn't been calculated before).
+   * Additionally if a constantly updated directory was configured as parentDir
+   * then multiple changes to the parentDir may happen
+   * within the same second so in such case (assuming at least second granularity of reported
mtime)
+   * it is impossible to tell whether a change of the dir happened before the check or after
+   * (unless the check happened after that second).
+   * Having said that implementation also stores system time of the previous invocation and
previous invocation has to
+   * happen strictly after the current mtime to avoid further cache refresh
+   * (because then it is guaranteed that previous invocation resulted in valid cache content).
+   * If system clock hasn't passed the second of
+   * the current mtime then logic expects more changes as well
+   * (since it cannot be sure that there won't be any further changes still in that second
+   * and it would like to avoid data loss in first place)
+   * hence it recalculates matching files. If system clock finally
+   * passed actual mtime then a subsequent invocation guarantees that it picked up every
+   * change from the passed second so
+   * any further invocations can be served from cache associated with that second (given
mtime is not updated again).
+   *
+   * @return List of files matching the pattern sorted by last modification time. No recursion.
No directories.
+   * If nothing matches then returns an empty list. If I/O issue occurred then returns the
list collected to the point
+   * when exception was thrown.
+   *
+   * @see #getMatchingFilesNoCache()
+   */
+  List<File> getMatchingFiles() {
+    long now = TimeUnit.SECONDS.toMillis(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
+    long currentParentDirMTime = parentDir.lastModified();
+    List<File> result;
+
+    // calculate matched files if
+    // - we don't want to use cache (recalculate every time) OR
+    // - directory was clearly updated after the last check OR
+    // - last mtime change wasn't already checked for sure (system clock hasn't passed that
second yet)
+    if (!cachePatternMatching ||
+        lastSeenParentDirMTime < currentParentDirMTime ||
+        !(currentParentDirMTime < lastCheckedTime)) {
+      lastMatchedFiles = sortByLastModifiedTime(getMatchingFilesNoCache());
+      lastSeenParentDirMTime = currentParentDirMTime;
+      lastCheckedTime = now;
+    }
+
+    return lastMatchedFiles;
+  }
+
+  /**
+   * Provides the actual files within the parentDir which
+   * files are matching the regex pattern. Each invocation uses {@link DirectoryStream}
+   * to identify matching files.
+   *
+   * Files returned by this call are weakly consistent (see {@link DirectoryStream}). It
does not freeze the directory while iterating,
+   * so it may (or may not) reflect updates to the directory that occur during the call.
In which case next call
+   * will return those files.
+   *
+   * @return List of files matching the pattern unsorted. No recursion. No directories.
+   * If nothing matches then returns an empty list. If I/O issue occurred then returns the
list collected to the point
+   * when exception was thrown.
+   *
+   * @see DirectoryStream
+   * @see DirectoryStream.Filter
+   */
+  private List<File> getMatchingFilesNoCache() {
+    List<File> result = Lists.newArrayList();
+    try (DirectoryStream<Path> stream = Files.newDirectoryStream(parentDir.toPath(),
fileFilter)) {
+      for (Path entry : stream) {
+        result.add(entry.toFile());
+      }
+    } catch (IOException e) {
+      logger.error("I/O exception occurred while listing parent directory. Files already
matched will be returned. " +
+          parentDir.toPath(), e);
+    }
+    return result;
+  }
+
+  /**
+   * Utility function to sort matched files based on last modification time.
+   * Sorting itself use only a snapshot of last modification times captured before the sorting
to keep the
+   * number of stat system calls to the required minimum.
+   *
+   * @param files list of files in any order
+   * @return sorted list
+   */
+  private static List<File> sortByLastModifiedTime(List<File> files) {
+    final HashMap<File, Long> lastModificationTimes = new HashMap<File, Long>(files.size());
+    for (File f: files) {
+      lastModificationTimes.put(f, f.lastModified());
+    }
+    Collections.sort(files, new Comparator<File>() {
+      @Override
+      public int compare(File o1, File o2) {
+        return lastModificationTimes.get(o1).compareTo(lastModificationTimes.get(o2));
+      }
+    });
+
+    return files;
+  }
+
+  @Override
+  public String toString() {
+    return "{" +
+        "filegroup='" + fileGroup + '\'' +
+        ", filePattern='" + filePattern + '\'' +
+        ", cached=" + cachePatternMatching +
+        '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TaildirMatcher that = (TaildirMatcher) o;
+
+    return fileGroup.equals(that.fileGroup);
+
+  }
+
+  @Override
+  public int hashCode() {
+    return fileGroup.hashCode();
+  }
+
+  public String getFileGroup() {
+    return fileGroup;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/7d1e683f/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
index 8816327..dfb5b29 100644
--- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
@@ -78,6 +78,7 @@ public class TaildirSource extends AbstractSource implements
   private int checkIdleInterval = 5000;
   private int writePosInitDelay = 5000;
   private int writePosInterval;
+  private boolean cachePatternMatching;
 
   private List<Long> existingInodes = new CopyOnWriteArrayList<Long>();
   private List<Long> idleInodes = new CopyOnWriteArrayList<Long>();
@@ -94,6 +95,7 @@ public class TaildirSource extends AbstractSource implements
           .positionFilePath(positionFilePath)
           .skipToEnd(skipToEnd)
           .addByteOffset(byteOffsetHeader)
+          .cachePatternMatching(cachePatternMatching)
           .build();
     } catch (IOException e) {
       throw new FlumeException("Error instantiating ReliableTaildirEventReader", e);
@@ -166,6 +168,7 @@ public class TaildirSource extends AbstractSource implements
     byteOffsetHeader = context.getBoolean(BYTE_OFFSET_HEADER, DEFAULT_BYTE_OFFSET_HEADER);
     idleTimeout = context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT);
     writePosInterval = context.getInteger(WRITE_POS_INTERVAL, DEFAULT_WRITE_POS_INTERVAL);
+    cachePatternMatching = context.getBoolean(CACHE_PATTERN_MATCHING, DEFAULT_CACHE_PATTERN_MATCHING);
 
     backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT
             , PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);

http://git-wip-us.apache.org/repos/asf/flume/blob/7d1e683f/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
index 6165276..b0c934d 100644
--- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
@@ -49,4 +49,8 @@ public class TaildirSourceConfigurationConstants {
   public static final String BYTE_OFFSET_HEADER = "byteOffsetHeader";
   public static final String BYTE_OFFSET_HEADER_KEY = "byteoffset";
   public static final boolean DEFAULT_BYTE_OFFSET_HEADER = false;
+
+  /** Whether to cache the list of files matching the specified file patterns till parent
directory is modified. */
+  public static final String CACHE_PATTERN_MATCHING = "cachePatternMatching";
+  public static final boolean DEFAULT_CACHE_PATTERN_MATCHING = true;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/7d1e683f/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java
new file mode 100644
index 0000000..4bff841
--- /dev/null
+++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.source.taildir;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class TestTaildirMatcher {
+  private File tmpDir;
+  private Map<String, File> files;
+  private boolean isCachingNeeded = true;
+
+  final String msgAlreadyExistingFile = "a file was not found but it was created before matcher";
+  final String msgAfterNewFileCreated = "files which were created after last check are not
found";
+  final String msgAfterAppend = "a file was not found although it was just appended within
the dir";
+  final String msgEmptyDir = "empty dir should return an empty list";
+  final String msgNoMatch = "no match should return an empty list";
+  final String msgSubDirs = "only files on the same level as the pattern should be returned";
+  final String msgNoChange = "file wasn't touched after last check cannot be found";
+  final String msgAfterDelete = "file was returned even after it was deleted";
+
+  /**
+   * Append a line to the specified file within tmpDir.
+   * If file doesn't exist it will be created.
+   */
+  private void append(String fileName) throws IOException {
+    File f;
+    if(!files.containsKey(fileName)){
+      f = new File(tmpDir, fileName);
+      files.put(fileName, f);
+    }else{
+      f = files.get(fileName);
+    }
+    Files.append(fileName + "line\n", f, Charsets.UTF_8);
+  }
+
+  /**
+   * Translate a list of files to list of filename strings.
+   */
+  private static List<String> filesToNames(List<File> origList){
+    Function<File, String> file2nameFn = new Function<File, String>() {
+      @Override
+      public String apply(File input) {
+        return input.getName();
+      }
+    };
+    return Lists.transform(origList, file2nameFn);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    files = Maps.newHashMap();
+    tmpDir = Files.createTempDir();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    for (File f : tmpDir.listFiles()) {
+      if (f.isDirectory()) {
+        for (File sdf : f.listFiles()) {
+          sdf.delete();
+        }
+      }
+      f.delete();
+    }
+    tmpDir.delete();
+    files = null;
+  }
+
+  @Test
+  public void getMatchingFiles() throws Exception {
+    append("file0");
+    append("file1");
+
+    TaildirMatcher tm = new TaildirMatcher("f1", tmpDir.getAbsolutePath() + File.separator
+ "file.*", isCachingNeeded);
+    List<String> files = filesToNames(tm.getMatchingFiles());
+    assertEquals(msgAlreadyExistingFile, 2, files.size());
+    assertTrue(msgAlreadyExistingFile, files.contains("file1"));
+
+    append("file1");
+    files = filesToNames(tm.getMatchingFiles());
+    assertEquals(msgAfterNewFileCreated, 2, files.size());
+    assertTrue(msgAfterNewFileCreated, files.contains("file0"));
+    assertTrue(msgAfterNewFileCreated, files.contains("file1"));
+
+    append("file2");
+    append("file3");
+    files = filesToNames(tm.getMatchingFiles());
+    assertEquals(msgAfterAppend, 4, files.size());
+    assertTrue(msgAfterAppend, files.contains("file0"));
+    assertTrue(msgAfterAppend, files.contains("file1"));
+    assertTrue(msgAfterAppend, files.contains("file2"));
+    assertTrue(msgAfterAppend, files.contains("file3"));
+
+    this.files.get("file0").delete();
+    files = filesToNames(tm.getMatchingFiles());
+    assertEquals(msgAfterDelete, 3, files.size());
+    assertFalse(msgAfterDelete, files.contains("file0"));
+    assertTrue(msgNoChange, files.contains("file1"));
+    assertTrue(msgNoChange, files.contains("file2"));
+    assertTrue(msgNoChange, files.contains("file3"));
+  }
+
+  @Test
+  public void getMatchingFilesNoCache() throws Exception {
+    append("file0");
+    append("file1");
+
+    TaildirMatcher tm = new TaildirMatcher("f1", tmpDir.getAbsolutePath() + File.separator
+ "file.*", false);
+    List<String> files = filesToNames(tm.getMatchingFiles());
+    assertEquals(msgAlreadyExistingFile, 2, files.size());
+    assertTrue(msgAlreadyExistingFile, files.contains("file1"));
+
+    append("file1");
+    files = filesToNames(tm.getMatchingFiles());
+    assertEquals(msgAfterAppend, 2, files.size());
+    assertTrue(msgAfterAppend, files.contains("file0"));
+    assertTrue(msgAfterAppend, files.contains("file1"));
+
+    append("file2");
+    append("file3");
+    files = filesToNames(tm.getMatchingFiles());
+    assertEquals(msgAfterNewFileCreated, 4, files.size());
+    assertTrue(msgAfterNewFileCreated, files.contains("file0"));
+    assertTrue(msgAfterNewFileCreated, files.contains("file1"));
+    assertTrue(msgAfterNewFileCreated, files.contains("file2"));
+    assertTrue(msgAfterNewFileCreated, files.contains("file3"));
+
+    this.files.get("file0").delete();
+    files = filesToNames(tm.getMatchingFiles());
+    assertEquals(msgAfterDelete, 3, files.size());
+    assertFalse(msgAfterDelete, files.contains("file0"));
+    assertTrue(msgNoChange, files.contains("file1"));
+    assertTrue(msgNoChange, files.contains("file2"));
+    assertTrue(msgNoChange, files.contains("file3"));
+  }
+
+  @Test
+  public void testEmtpyDirMatching() throws Exception {
+    TaildirMatcher tm = new TaildirMatcher("empty", tmpDir.getAbsolutePath() + File.separator
+ ".*", isCachingNeeded);
+    List<File> files = tm.getMatchingFiles();
+    assertNotNull(msgEmptyDir, files);
+    assertTrue(msgEmptyDir, files.isEmpty());
+  }
+
+  @Test
+  public void testNoMatching() throws Exception {
+    TaildirMatcher tm = new TaildirMatcher("nomatch", tmpDir.getAbsolutePath() + File.separator
+ "abracadabra_nonexisting", isCachingNeeded);
+    List<File> files = tm.getMatchingFiles();
+    assertNotNull(msgNoMatch, files);
+    assertTrue(msgNoMatch, files.isEmpty());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testNonExistingDir() {
+    TaildirMatcher tm = new TaildirMatcher("exception", "/abracadabra/doesntexist/.*", isCachingNeeded);
+  }
+
+  @Test
+  public void testDirectoriesAreNotListed() throws Exception {
+    new File(tmpDir, "outerFile").createNewFile();
+    new File(tmpDir, "recursiveDir").mkdir();
+    new File(tmpDir + File.separator + "recursiveDir", "innerFile").createNewFile();
+    TaildirMatcher tm = new TaildirMatcher("f1", tmpDir.getAbsolutePath() + File.separator
+ ".*", isCachingNeeded);
+    List<String> files = filesToNames(tm.getMatchingFiles());
+
+    assertEquals(msgSubDirs, 1, files.size());
+    assertTrue(msgSubDirs, files.contains("outerFile"));
+  }
+
+  @Test
+  public void testRegexFileNameFiltering() throws IOException {
+    append("a.log");
+    append("a.log.1");
+    append("b.log");
+    append("c.log.yyyy.MM-01");
+    append("c.log.yyyy.MM-02");
+
+    // Tail a.log and b.log
+    TaildirMatcher tm1 = new TaildirMatcher("ab", tmpDir.getAbsolutePath() + File.separator
+ "[ab].log", isCachingNeeded);
+    // Tail files that starts with c.log
+    TaildirMatcher tm2 = new TaildirMatcher("c", tmpDir.getAbsolutePath() + File.separator
+ "c.log.*", isCachingNeeded);
+
+    List<String> files1 = filesToNames(tm1.getMatchingFiles());
+    List<String> files2 = filesToNames(tm2.getMatchingFiles());
+
+    assertEquals(2, files1.size());
+    assertEquals(2, files2.size());
+    // Make sure we got every file
+    assertTrue("Regex pattern for ab should have matched a.log file", files1.contains("a.log"));
+    assertFalse("Regex pattern for ab should NOT have matched a.log.1 file", files1.contains("a.log.1"));
+    assertTrue("Regex pattern for ab should have matched b.log file", files1.contains("b.log"));
+    assertTrue("Regex pattern for c should have matched c.log.yyyy-MM-01 file", files2.contains("c.log.yyyy.MM-01"));
+    assertTrue("Regex pattern for c should have matched c.log.yyyy-MM-02 file", files2.contains("c.log.yyyy.MM-02"));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/7d1e683f/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
index f9e614c..f6289cd 100644
--- a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
+++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
@@ -77,7 +77,7 @@ public class TestTaildirSource {
   }
 
   @Test
-  public void testRegexFileNameFiltering() throws IOException {
+  public void testRegexFileNameFilteringEndToEnd() throws IOException {
     File f1 = new File(tmpDir, "a.log");
     File f2 = new File(tmpDir, "a.log.1");
     File f3 = new File(tmpDir, "b.log");


Mime
View raw message