flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject flume git commit: FLUME-2911. Add include pattern option in SpoolDir source
Date Mon, 10 Oct 2016 15:29:22 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk 27c725802 -> 7d5ceacac


FLUME-2911. Add include pattern option in SpoolDir source

* Documented what happens when ignorePattern and includePattern both
  match for a given file.
* Added two tests to simulate what happens when both ignorePattern and
  includePattern options are specified
* Refactored of ReliableSpoolingFileEventReader test and fix of code
  style violations

Closes #60

Reviewers: Bessenyei Balázs Donát, Denes Arvay, Attila Simon

(Andrea Rota via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7d5ceaca
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7d5ceaca
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7d5ceaca

Branch: refs/heads/trunk
Commit: 7d5ceacac49f5d15bf8f75e0209592c5524a3dda
Parents: 27c7258
Author: Andrea Rota <andrearota37354@gmail.com>
Authored: Thu Aug 4 10:09:16 2016 +0200
Committer: Mike Percy <mpercy@apache.org>
Committed: Mon Oct 10 17:19:13 2016 +0200

----------------------------------------------------------------------
 .../avro/ReliableSpoolingFileEventReader.java   |  15 +-
 .../flume/source/SpoolDirectorySource.java      |   3 +
 ...olDirectorySourceConfigurationConstants.java |   4 +
 .../TestReliableSpoolingFileEventReader.java    | 191 +++++++++++++++----
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   9 +-
 5 files changed, 179 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/7d5ceaca/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index a0f929c..1e1d955 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -92,6 +92,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
   private final String completedSuffix;
   private final String deserializerType;
   private final Context deserializerContext;
+  private final Pattern includePattern;
   private final Pattern ignorePattern;
   private final File metaFile;
   private final boolean annotateFileName;
@@ -118,7 +119,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
    * Create a ReliableSpoolingFileEventReader to watch the given directory.
    */
   private ReliableSpoolingFileEventReader(File spoolDirectory,
-      String completedSuffix, String ignorePattern, String trackerDirPath,
+      String completedSuffix, String includePattern, String ignorePattern, String trackerDirPath,
       boolean annotateFileName, String fileNameHeader,
       boolean annotateBaseName, String baseNameHeader,
       String deserializerType, Context deserializerContext,
@@ -130,6 +131,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
     // Sanity checks
     Preconditions.checkNotNull(spoolDirectory);
     Preconditions.checkNotNull(completedSuffix);
+    Preconditions.checkNotNull(includePattern);
     Preconditions.checkNotNull(ignorePattern);
     Preconditions.checkNotNull(trackerDirPath);
     Preconditions.checkNotNull(deserializerType);
@@ -183,6 +185,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
     this.fileNameHeader = fileNameHeader;
     this.annotateBaseName = annotateBaseName;
     this.baseNameHeader = baseNameHeader;
+    this.includePattern = Pattern.compile(includePattern);
     this.ignorePattern = Pattern.compile(ignorePattern);
     this.deletePolicy = deletePolicy;
     this.inputCharset = Charset.forName(inputCharset);
@@ -250,6 +253,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
           String fileName = candidate.getFileName().toString();
           if (!fileName.endsWith(completedSuffix) &&
               !fileName.startsWith(".") &&
+              includePattern.matcher(fileName).matches() &&
               !ignorePattern.matcher(fileName).matches()) {
             candidateFiles.add(candidate.toFile());
           }
@@ -658,6 +662,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
     private File spoolDirectory;
     private String completedSuffix =
         SpoolDirectorySourceConfigurationConstants.SPOOLED_FILE_SUFFIX;
+    private String includePattern = 
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_INCLUDE_PAT;
     private String ignorePattern =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_IGNORE_PAT;
     private String trackerDirPath =
@@ -695,6 +701,11 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
       return this;
     }
 
+    public Builder includePattern(String includePattern) {
+      this.includePattern = includePattern;
+      return this;
+    }
+
     public Builder ignorePattern(String ignorePattern) {
       this.ignorePattern = ignorePattern;
       return this;
@@ -762,7 +773,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
 
     public ReliableSpoolingFileEventReader build() throws IOException {
       return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
-          ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
+          includePattern, ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
           annotateBaseName, baseNameHeader, deserializerType,
           deserializerContext, deletePolicy, inputCharset, decodeErrorPolicy,
           consumeOrder, recursiveDirectorySearch);

http://git-wip-us.apache.org/repos/asf/flume/blob/7d5ceaca/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index c8c7cda..107a381 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -57,6 +57,7 @@ public class SpoolDirectorySource extends AbstractSource
   private boolean basenameHeader;
   private String basenameHeaderKey;
   private int batchSize;
+  private String includePattern;
   private String ignorePattern;
   private String trackerDirPath;
   private String deserializerType;
@@ -89,6 +90,7 @@ public class SpoolDirectorySource extends AbstractSource
       reader = new ReliableSpoolingFileEventReader.Builder()
           .spoolDirectory(directory)
           .completedSuffix(completedSuffix)
+          .includePattern(includePattern)
           .ignorePattern(ignorePattern)
           .trackerDirPath(trackerDirPath)
           .annotateFileName(fileHeader)
@@ -162,6 +164,7 @@ public class SpoolDirectorySource extends AbstractSource
         context.getString(DECODE_ERROR_POLICY, DEFAULT_DECODE_ERROR_POLICY)
             .toUpperCase(Locale.ENGLISH));
 
+    includePattern = context.getString(INCLUDE_PAT, DEFAULT_INCLUDE_PAT);
     ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
     trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);
 

http://git-wip-us.apache.org/repos/asf/flume/blob/7d5ceaca/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index 5859aa2..a065dc0 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -59,6 +59,10 @@ public class SpoolDirectorySourceConfigurationConstants {
   @Deprecated
   public static final int DEFAULT_BUFFER_MAX_LINE_LENGTH = 5000;
 
+  /** Pattern of files to include */
+  public static final String INCLUDE_PAT = "includePattern";
+  public static final String DEFAULT_INCLUDE_PAT = "^.*$"; // any file
+
   /** Pattern of files to ignore */
   public static final String IGNORE_PAT = "ignorePattern";
   public static final String DEFAULT_IGNORE_PAT = "^$"; // no effect

http://git-wip-us.apache.org/repos/asf/flume/blob/7d5ceaca/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index 59fb1c3..b257999 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -17,30 +17,15 @@
  */
 package org.apache.flume.client.avro;
 
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.io.Files;
-import junit.framework.Assert;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.SystemUtils;
-import org.apache.flume.Event;
-import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.DeletePolicy;
-import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
-import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.ConsumeOrder;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -52,13 +37,34 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.SystemUtils;
+import org.apache.flume.Event;
+import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.DeletePolicy;
+import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
+import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.ConsumeOrder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+import junit.framework.Assert;
+
 public class TestReliableSpoolingFileEventReader {
 
-  private static final Logger logger =
-      LoggerFactory.getLogger(TestReliableSpoolingFileEventReader.class);
+  private static final Logger logger = LoggerFactory.getLogger(TestReliableSpoolingFileEventReader.class);
 
-  private static final File WORK_DIR = new File("target/test/work/" +
-      TestReliableSpoolingFileEventReader.class.getSimpleName());
+  private static final File WORK_DIR = new File(
+      "target/test/work/" + TestReliableSpoolingFileEventReader.class.getSimpleName());
+
+  private static final File TRACKER_DIR = new File(WORK_DIR,
+      SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR);
 
   @Before
   public void setup() throws IOException, InterruptedException {
@@ -111,6 +117,62 @@ public class TestReliableSpoolingFileEventReader {
     }
   }
 
+  private void processEventsWithReader(ReliableEventReader reader, int nEvents) throws IOException
{
+    List<Event> events;
+    do {
+      events = reader.readEvents(nEvents);
+      reader.commit();
+    } while (!events.isEmpty());
+  }
+
+  /**
+   * Verify if the give dir contains only the given files
+   * 
+   * @param dir
+   *          the directory to check
+   * @param files
+   *          the files that should be contained in dir
+   * @return true only if the dir contains exactly the same files given, false
+   *         otherwise
+   */
+  private boolean checkLeftFilesInDir(File dir, String[] files) {
+
+    List<File> actualFiles = listFiles(dir);
+    Set<String> expectedFiles = new HashSet<String>(Arrays.asList(files));
+    
+    // Verify if the number of files in the dir is the expected
+    if (actualFiles.size() != expectedFiles.size()) {
+      return false;
+    }
+    
+    // Then check files by name
+    for (File f : actualFiles) {
+      expectedFiles.remove(f.getName());
+    }
+
+    return expectedFiles.isEmpty();
+  }
+
+  @Test
+  public void testIncludePattern() throws IOException {
+    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
+        .spoolDirectory(WORK_DIR)
+        .includePattern("^file2$")
+        .deletePolicy(DeletePolicy.IMMEDIATE.toString())
+        .build();
+    
+    String[] beforeFiles = { "file0", "file1", "file2", "file3", "emptylineFile" };
+    Assert.assertTrue("Expected " + beforeFiles.length + " files in working dir",
+        checkLeftFilesInDir(WORK_DIR, beforeFiles));
+
+    processEventsWithReader(reader, 10);
+
+    String[] afterFiles = { "file0", "file1", "file3", "emptylineFile" };
+    Assert.assertTrue("Expected " + afterFiles.length + " files left in working dir",
+        checkLeftFilesInDir(WORK_DIR, afterFiles));
+    Assert.assertTrue("Expected no files left in tracker dir", checkLeftFilesInDir(TRACKER_DIR,
new String[0]));
+  }
+
   @Test
   public void testIgnorePattern() throws IOException {
     ReliableEventReader reader =
@@ -120,22 +182,71 @@ public class TestReliableSpoolingFileEventReader {
             .deletePolicy(DeletePolicy.IMMEDIATE.toString())
             .build();
 
-    List<File> before = listFiles(WORK_DIR);
-    Assert.assertEquals("Expected 5, not: " + before, 5, before.size());
+    String[] beforeFiles = { "file0", "file1", "file2", "file3", "emptylineFile" };
+    Assert.assertTrue("Expected " + beforeFiles.length + " files in working dir",
+        checkLeftFilesInDir(WORK_DIR, beforeFiles));
 
-    List<Event> events;
-    do {
-      events = reader.readEvents(10);
-      reader.commit();
-    } while (!events.isEmpty());
+    processEventsWithReader(reader, 10);
 
-    List<File> after = listFiles(WORK_DIR);
-    Assert.assertEquals("Expected 1, not: " + after, 1, after.size());
-    Assert.assertEquals("file2", after.get(0).getName());
-    List<File> trackerFiles = listFiles(new File(WORK_DIR,
-        SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR));
-    Assert.assertEquals("Expected 0, not: " + trackerFiles, 0,
-        trackerFiles.size());
+    String[] files = { "file2" };
+    Assert.assertTrue("Expected " + files.length + " files left in working dir", checkLeftFilesInDir(WORK_DIR,
files));
+    Assert.assertTrue("Expected no files left in tracker dir", checkLeftFilesInDir(TRACKER_DIR,
new String[0]));
+  }
+
+  @Test
+  public void testIncludeExcludePatternNoConflict() throws IOException {
+
+    // Expected behavior mixing include/exclude conditions:
+    // - file0, file1, file3: not deleted as matching ignore pattern and not
+    // matching include pattern
+    // - file2: deleted as not matching ignore pattern and matching include
+    // pattern
+    // - emptylineFile: not deleted as not matching ignore pattern but not
+    // matching include pattern as well
+    
+    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
+        .spoolDirectory(WORK_DIR)
+        .ignorePattern("^file[013]$")
+        .includePattern("^file2$")
+        .deletePolicy(DeletePolicy.IMMEDIATE.toString())
+        .build(); 
+
+    String[] beforeFiles = { "file0", "file1", "file2", "file3", "emptylineFile" };
+    Assert.assertTrue("Expected " + beforeFiles.length + " files in working dir",
+        checkLeftFilesInDir(WORK_DIR, beforeFiles));
+
+    processEventsWithReader(reader, 10);
+
+    String[] files = { "file0", "file1", "file3", "emptylineFile" };
+    Assert.assertTrue("Expected " + files.length + " files left in working dir", checkLeftFilesInDir(WORK_DIR,
files));
+    Assert.assertTrue("Expected no files left in tracker dir", checkLeftFilesInDir(TRACKER_DIR,
new String[0]));
+  }
+
+  @Test
+  public void testIncludeExcludePatternConflict() throws IOException {
+
+    // This test will stress what happens when both ignore and include options
+    // are specified and the two patterns match at the same time.
+    // Expected behavior:
+    // - file2: not deleted as both include and ignore patterns match (safety
+    // measure: ignore always wins on conflict)
+    
+    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
+        .spoolDirectory(WORK_DIR)
+        .ignorePattern("^file2$")
+        .includePattern("^file2$")
+        .deletePolicy(DeletePolicy.IMMEDIATE.toString())
+        .build();
+    
+    String[] beforeFiles = { "file0", "file1", "file2", "file3", "emptylineFile" };
+    Assert.assertTrue("Expected " + beforeFiles.length + " files in working dir",
+        checkLeftFilesInDir(WORK_DIR, beforeFiles));
+
+    processEventsWithReader(reader, 10);
+
+    String[] files = { "file0", "file1", "file2", "file3", "emptylineFile" };
+    Assert.assertTrue("Expected " + files.length + " files left in working dir", checkLeftFilesInDir(WORK_DIR,
files));
+    Assert.assertTrue("Expected no files left in tracker dir", checkLeftFilesInDir(TRACKER_DIR,
new String[0]));
   }
 
   @Test
@@ -522,17 +633,17 @@ public class TestReliableSpoolingFileEventReader {
         throw new IOException(ex);
       }
     }
-  }    
+  }
   /* Create expected results out of the files created in the setup method. */
   private void createExpectedFromFilesInSetup(Collection<String> expected) {
     expected.add("");
-    for (int i = 0; i < 4; i++) {      
-      for (int j = 0; j < i; j++) {        
+    for (int i = 0; i < 4; i++) {
+      for (int j = 0; j < i; j++) {
         expected.add("file" + i + "line" + j);
-      }      
+      }
     }
   }
-  
+
   private static List<File> listFiles(File dir) {
     List<File> files = Lists.newArrayList(dir.listFiles(new FileFilter() {
       @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/7d5ceaca/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index ab71d38..1cf4100 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1018,7 +1018,14 @@ fileHeader                false           Whether to add a header storing
the ab
 fileHeaderKey             file            Header key to use when appending absolute path
filename to event header.
 basenameHeader            false           Whether to add a header storing the basename of
the file.
 basenameHeaderKey         basename        Header Key to use when appending  basename of file
to event header.
-ignorePattern             ^$              Regular expression specifying which files to ignore
(skip)
+includePattern            ^.*$            Regular expression specifying which files to include.
+                                          It can used together with ``ignorePattern``.
+                                          If a file matches both ``ignorePattern`` and ``includePattern``
regex,
+                                          the file is ignored.
+ignorePattern             ^$              Regular expression specifying which files to ignore
(skip).
+                                          It can used together with ``includePattern``.
+                                          If a file matches both ``ignorePattern`` and ``includePattern``
regex,
+                                          the file is ignored.
 trackerDir                .flumespool     Directory to store metadata related to processing
of files.
                                           If this path is not an absolute path, then it is
interpreted as relative to the spoolDir.
 consumeOrder              oldest          In which order files in the spooling directory
will be consumed ``oldest``,


Mime
View raw message