flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject flume git commit: FLUME-2955. Add file path to the header in TaildirSource
Date Wed, 20 Jul 2016 18:21:46 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk 358bb6700 -> 1ca0765aa


FLUME-2955. Add file path to the header in TaildirSource

Allow for adding a file path to the header dynamically. This is
particularly useful when the filegroup path contains a regex expression.

(tinawenqiao via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1ca0765a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1ca0765a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1ca0765a

Branch: refs/heads/trunk
Commit: 1ca0765aae795a41a43e39324f5f1c8bae57b751
Parents: 358bb67
Author: wenqiao <315524513@qq.com>
Authored: Wed Jul 20 11:12:40 2016 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Wed Jul 20 11:18:49 2016 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  3 ++
 .../taildir/ReliableTaildirEventReader.java     | 33 +++++++++++++++++---
 .../flume/source/taildir/TaildirSource.java     |  8 +++++
 .../TaildirSourceConfigurationConstants.java    |  8 +++++
 .../flume/source/taildir/TestTaildirSource.java | 32 +++++++++++++++++--
 5 files changed, 78 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/1ca0765a/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index d8bfebf..105a036 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1134,6 +1134,8 @@ cachePatternMatching                true                           Listing
direc
                                                                    containing thousands of
files. Caching the list of matching files can improve performance.
                                                                    The order in which files
are consumed will also be cached.
                                                                    Requires that the file
system keeps track of modification times with at least a 1-second granularity.
+fileHeader                          false                          Whether to add a header
storing the absolute path filename.
+fileHeaderKey                       file                           Header key to use when
appending absolute path filename to event header.
 =================================== ============================== ===================================================
 
 Example for agent named a1:
@@ -1151,6 +1153,7 @@ Example for agent named a1:
   a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
   a1.sources.r1.headers.f2.headerKey1 = value2
   a1.sources.r1.headers.f2.headerKey2 = value2-2
+  a1.sources.r1.fileHeader = true
 
 Twitter 1% firehose Source (experimental)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

http://git-wip-us.apache.org/repos/asf/flume/blob/1ca0765a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
index 1409f25..8838320 100644
--- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
@@ -57,13 +57,16 @@ public class ReliableTaildirEventReader implements ReliableEventReader
{
   private boolean addByteOffset;
   private boolean cachePatternMatching;
   private boolean committed = true;
+  private final boolean annotateFileName;
+  private final String fileNameHeader;
 
   /**
    * Create a ReliableTaildirEventReader to watch the given directory.
    */
   private ReliableTaildirEventReader(Map<String, String> filePaths,
       Table<String, String, String> headerTable, String positionFilePath,
-      boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching) throws IOException
{
+      boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching,
+      boolean annotateFileName, String fileNameHeader) throws IOException {
     // Sanity checks
     Preconditions.checkNotNull(filePaths);
     Preconditions.checkNotNull(positionFilePath);
@@ -84,6 +87,8 @@ public class ReliableTaildirEventReader implements ReliableEventReader {
     this.headerTable = headerTable;
     this.addByteOffset = addByteOffset;
     this.cachePatternMatching = cachePatternMatching;
+    this.annotateFileName = annotateFileName;
+    this.fileNameHeader = fileNameHeader;
     updateTailFiles(skipToEnd);
 
     logger.info("Updating position from position file: " + positionFilePath);
@@ -193,9 +198,14 @@ public class ReliableTaildirEventReader implements ReliableEventReader
{
     }
 
     Map<String, String> headers = currentFile.getHeaders();
-    if (headers != null && !headers.isEmpty()) {
+    if (annotateFileName || (headers != null && !headers.isEmpty())) {
       for (Event event : events) {
-        event.getHeaders().putAll(headers);
+        if (headers != null && !headers.isEmpty()) {
+          event.getHeaders().putAll(headers);
+        }
+        if (annotateFileName) {
+          event.getHeaders().put(fileNameHeader, currentFile.getPath());
+        }
       }
     }
     committed = false;
@@ -287,6 +297,10 @@ public class ReliableTaildirEventReader implements ReliableEventReader
{
     private boolean skipToEnd;
     private boolean addByteOffset;
     private boolean cachePatternMatching;
+    private Boolean annotateFileName =
+            TaildirSourceConfigurationConstants.DEFAULT_FILE_HEADER;
+    private String fileNameHeader =
+            TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY;
 
     public Builder filePaths(Map<String, String> filePaths) {
       this.filePaths = filePaths;
@@ -318,9 +332,20 @@ public class ReliableTaildirEventReader implements ReliableEventReader
{
       return this;
     }
 
+    public Builder annotateFileName(boolean annotateFileName) {
+      this.annotateFileName = annotateFileName;
+      return this;
+    }
+
+    public Builder fileNameHeader(String fileNameHeader) {
+      this.fileNameHeader = fileNameHeader;
+      return this;
+    }
+
     public ReliableTaildirEventReader build() throws IOException {
       return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd,
-                                            addByteOffset, cachePatternMatching);
+                                            addByteOffset, cachePatternMatching,
+                                            annotateFileName, fileNameHeader);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/1ca0765a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
index eae1b1a..a107a01 100644
--- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
@@ -84,6 +84,8 @@ public class TaildirSource extends AbstractSource implements
   private List<Long> idleInodes = new CopyOnWriteArrayList<Long>();
   private Long backoffSleepIncrement;
   private Long maxBackOffSleepInterval;
+  private boolean fileHeader;
+  private String fileHeaderKey;
 
   @Override
   public synchronized void start() {
@@ -96,6 +98,8 @@ public class TaildirSource extends AbstractSource implements
           .skipToEnd(skipToEnd)
           .addByteOffset(byteOffsetHeader)
           .cachePatternMatching(cachePatternMatching)
+          .annotateFileName(fileHeader)
+          .fileNameHeader(fileHeaderKey)
           .build();
     } catch (IOException e) {
       throw new FlumeException("Error instantiating ReliableTaildirEventReader", e);
@@ -176,6 +180,10 @@ public class TaildirSource extends AbstractSource implements
         PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
     maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,
         PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
+    fileHeader = context.getBoolean(FILENAME_HEADER,
+            DEFAULT_FILE_HEADER);
+    fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
+            DEFAULT_FILENAME_HEADER_KEY);
 
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());

http://git-wip-us.apache.org/repos/asf/flume/blob/1ca0765a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
index 2c49540..f2347f3 100644
--- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
@@ -55,4 +55,12 @@ public class TaildirSourceConfigurationConstants {
    */
   public static final String CACHE_PATTERN_MATCHING = "cachePatternMatching";
   public static final boolean DEFAULT_CACHE_PATTERN_MATCHING = true;
+
+  /** Header in which to put absolute path filename. */
+  public static final String FILENAME_HEADER_KEY = "fileHeaderKey";
+  public static final String DEFAULT_FILENAME_HEADER_KEY = "file";
+
+  /** Whether to include absolute path filename in a header. */
+  public static final String FILENAME_HEADER = "fileHeader";
+  public static final boolean DEFAULT_FILE_HEADER = false;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/1ca0765a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
index e090b74..097ee0b 100644
--- a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
+++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
@@ -41,13 +41,15 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS;
-import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants
-                  .FILE_GROUPS_PREFIX;
+import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS_PREFIX;
 import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.HEADERS_PREFIX;
 import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.POSITION_FILE;
+import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER;
+import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER_KEY;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -288,4 +290,30 @@ public class TestTaildirSource {
     assertArrayEquals("Files not consumed in expected order", expected.toArray(),
                       consumedOrder.toArray());
   }
+
+  @Test
+  public void testPutFilenameHeader() throws IOException {
+    File f1 = new File(tmpDir, "file1");
+    Files.write("f1\n", f1, Charsets.UTF_8);
+
+    Context context = new Context();
+    context.put(POSITION_FILE, posFilePath);
+    context.put(FILE_GROUPS, "fg");
+    context.put(FILE_GROUPS_PREFIX + "fg", tmpDir.getAbsolutePath() + "/file.*");
+    context.put(FILENAME_HEADER, "true");
+    context.put(FILENAME_HEADER_KEY, "path");
+
+    Configurables.configure(source, context);
+    source.start();
+    source.process();
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    Event e = channel.take();
+    txn.commit();
+    txn.close();
+
+    assertNotNull(e.getHeaders().get("path"));
+    assertEquals(f1.getAbsolutePath(),
+            e.getHeaders().get("path"));
+  }
 }


Mime
View raw message