flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roshann...@apache.org
Subject flume git commit: FLUME-2498. Implement Taildir Source
Date Tue, 18 Aug 2015 02:47:52 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk 32ef64dd6 -> 757a560db


FLUME-2498.  Implement Taildir Source

(Satoshi Iijima via Roshan Naik)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/757a560d
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/757a560d
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/757a560d

Branch: refs/heads/trunk
Commit: 757a560db73c2e6fbec56deea4c753a45ccf9032
Parents: 32ef64d
Author: Roshan Naik <roshan@hortonworks.com>
Authored: Mon Aug 17 19:34:03 2015 -0700
Committer: Roshan Naik <roshan@hortonworks.com>
Committed: Mon Aug 17 19:43:19 2015 -0700

----------------------------------------------------------------------
 .../flume/conf/source/SourceConfiguration.java  |  10 +-
 .../apache/flume/conf/source/SourceType.java    |  10 +-
 flume-ng-dist/pom.xml                           |   4 +
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  55 ++-
 flume-ng-sources/flume-taildir-source/pom.xml   |  60 +++
 .../taildir/ReliableTaildirEventReader.java     | 347 +++++++++++++
 .../apache/flume/source/taildir/TailFile.java   | 163 ++++++
 .../flume/source/taildir/TaildirSource.java     | 331 +++++++++++++
 .../TaildirSourceConfigurationConstants.java    |  52 ++
 .../source/taildir/TestTaildirEventReader.java  | 492 +++++++++++++++++++
 .../flume/source/taildir/TestTaildirSource.java | 283 +++++++++++
 flume-ng-sources/pom.xml                        |   1 +
 pom.xml                                         |   6 +
 13 files changed, 1811 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/757a560d/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
index 899d805..068bd69 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
@@ -207,7 +207,15 @@ public class SourceConfiguration extends ComponentConfiguration {
      *
      * @see org.apache.flume.source.jms.JMSSource
      */
-    JMS("org.apache.flume.conf.source.jms.JMSSourceConfiguration");
+    JMS("org.apache.flume.conf.source.jms.JMSSourceConfiguration"),
+
+    /**
+     * TAILDIR Source
+     *
+     * @see org.apache.flume.source.taildir.TaildirSource
+     */
+    TAILDIR("org.apache.flume.source.taildir.TaildirSourceConfiguration")
+    ;
 
     private String srcConfigurationName;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/757a560d/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
index 4144faa..4f4073a 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
@@ -103,7 +103,15 @@ public enum SourceType {
    *
    * @see org.apache.flume.source.jms.JMSSource
    */
-  JMS("org.apache.flume.source.jms.JMSSource");
+  JMS("org.apache.flume.source.jms.JMSSource"),
+
+  /**
+   * Taildir Source
+   *
+   * @see org.apache.flume.source.taildir.TaildirSource
+   */
+  TAILDIR("org.apache.flume.source.taildir.TaildirSource")
+  ;
 
   private final String sourceClassName;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/757a560d/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index 218c6b8..7fdf36a 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -189,6 +189,10 @@
       <artifactId>flume-thrift-source</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.flume.flume-ng-sources</groupId>
+      <artifactId>flume-taildir-source</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.flume.flume-ng-clients</groupId>
       <artifactId>flume-ng-log4jappender</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/757a560d/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 7ddcc48..897a2ca 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -994,7 +994,7 @@ trackerDir            .flumespool     Directory to store metadata related to pro
 consumeOrder          oldest          In which order files in the spooling directory will be consumed ``oldest``,
                                       ``youngest`` and ``random``. In case of ``oldest`` and ``youngest``, the last modified
                                       time of the files will be used to compare the files. In case of a tie, the file
-                                      with smallest laxicographical order will be consumed first. In case of ``random`` any
+                                      with smallest lexicographical order will be consumed first. In case of ``random`` any
                                       file will be picked randomly. When using ``oldest`` and ``youngest`` the whole
                                       directory will be scanned to pick the oldest/youngest file, which might be slow if there
                                       are a large number of files, while using ``random`` may cause old files to be consumed
@@ -1090,6 +1090,59 @@ Property Name               Default             Description
 deserializer.maxBlobLength  100000000           The maximum number of bytes to read and buffer for a given request
 ==========================  ==================  =======================================================================
 
+Taildir Source
+~~~~~~~~~~~~~~~~~~~~~~~~~
+.. note:: **This source is provided as a preview feature. It does not work on Windows.** This source requires Java version 1.7 or later.
+
+Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files.
+If the new lines are being written, this source will retry reading them in wait for the completion of the write.
+
+This source is reliable and will not miss data even when the tailing files rotate.
+It periodically writes the last read position of each files on the given position file in JSON format.
+If Flume is stopped or down for some reason, it can restart tailing from the position written on the existing position file.
+
+In other use case, this source can also start tailing from the arbitrary position for each files using the given position file.
+When there is no position file on the specified path, it will start tailing from the first line of each files by default.
+
+Files will be consumed in order of their modification time. File with the oldest modification time will be consumed first.
+
+This source does not rename or delete or do any modifications to the file being tailed.
+Currently this source does not support tailing binary files. It reads text files line by line.
+
+=================================== ============================== ===================================================
+Property Name                       Default                        Description
+=================================== ============================== ===================================================
+**channels**                        --
+**type**                            --                             The component type name, needs to be ``TAILDIR``.
+**filegroups**                      --                             Space-separated list of file groups. Each file group indicates a set of files to be tailed.
+**filegroups.<filegroupName>**      --                             Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only.
+positionFile                        ~/.flume/taildir_position.json File in JSON format to record the inode, the absolute path and the last position of each tailing file.
+headers.<filegroupName>.<headerKey> --                             Header value which is the set with header key. Multiple headers can be specified for one file group.
+byteOffsetHeader                    false                          Whether to add the byte offset of a tailed line to a header called 'byteoffset'.
+skipToEnd                           false                          Whether to skip the position to EOF in the case of files not written on the position file.
+idleTimeout                         120000                         Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it.
+writePosInterval                    3000                           Interval time (ms) to write the last position of each file on the position file.
+batchSize                           100                            Max number of lines to read and send to the channel at a time. Using the default is usually fine.
+backoffSleepIncrement               1000                           The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data.
+maxBackoffSleep                     5000                           The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data.
+=================================== ============================== ===================================================
+
+Example for agent named a1:
+
+.. code-block:: properties
+
+  a1.sources = r1
+  a1.channels = c1
+  a1.sources.r1.type = TAILDIR
+  a1.sources.r1.channels = c1
+  a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
+  a1.sources.r1.filegroups = f1 f2
+  a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
+  a1.sources.r1.headers.f1.headerKey1 = value1
+  a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
+  a1.sources.r1.headers.f2.headerKey1 = value2
+  a1.sources.r1.headers.f2.headerKey2 = value2-2
+
 Twitter 1% firehose Source (experimental)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

http://git-wip-us.apache.org/repos/asf/flume/blob/757a560d/flume-ng-sources/flume-taildir-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/pom.xml b/flume-ng-sources/flume-taildir-source/pom.xml
new file mode 100644
index 0000000..09063fb
--- /dev/null
+++ b/flume-ng-sources/flume-taildir-source/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-ng-sources</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.7.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume.flume-ng-sources</groupId>
+  <artifactId>flume-taildir-source</artifactId>
+  <name>Flume Taildir Source</name>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.3.2</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/757a560d/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
new file mode 100644
index 0000000..951b786
--- /dev/null
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.source.taildir;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Pattern;
+
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.client.avro.ReliableEventReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
+import com.google.common.collect.Table.Cell;
+import com.google.gson.stream.JsonReader;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ReliableTaildirEventReader implements ReliableEventReader {
+  private static final Logger logger = LoggerFactory.getLogger(ReliableTaildirEventReader.class);
+
+  private final Table<String, File, Pattern> tailFileTable;
+  private final Table<String, String, String> headerTable;
+
+  private TailFile currentFile = null;
+  private Map<Long, TailFile> tailFiles = Maps.newHashMap();
+  private long updateTime;
+  private boolean addByteOffset;
+  private boolean committed = true;
+
+  /**
+   * Create a ReliableTaildirEventReader to watch the given directory.
+   */
+  private ReliableTaildirEventReader(Map<String, String> filePaths,
+      Table<String, String, String> headerTable, String positionFilePath,
+      boolean skipToEnd, boolean addByteOffset) throws IOException {
+    // Sanity checks
+    Preconditions.checkNotNull(filePaths);
+    Preconditions.checkNotNull(positionFilePath);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Initializing {} with directory={}, metaDir={}",
+          new Object[] { ReliableTaildirEventReader.class.getSimpleName(), filePaths });
+    }
+
+    Table<String, File, Pattern> tailFileTable = HashBasedTable.create();
+    for (Entry<String, String> e : filePaths.entrySet()) {
+      File f = new File(e.getValue());
+      File parentDir =  f.getParentFile();
+      Preconditions.checkState(parentDir.exists(),
+        "Directory does not exist: " + parentDir.getAbsolutePath());
+      Pattern fileNamePattern = Pattern.compile(f.getName());
+      tailFileTable.put(e.getKey(), parentDir, fileNamePattern);
+    }
+    logger.info("tailFileTable: " + tailFileTable.toString());
+    logger.info("headerTable: " + headerTable.toString());
+
+    this.tailFileTable = tailFileTable;
+    this.headerTable = headerTable;
+    this.addByteOffset = addByteOffset;
+    updateTailFiles(skipToEnd);
+
+    logger.info("Updating position from position file: " + positionFilePath);
+    loadPositionFile(positionFilePath);
+  }
+
+  /**
+   * Load a position file which has the last read position of each file.
+   * If the position file exists, update tailFiles mapping.
+   */
+  public void loadPositionFile(String filePath) {
+    Long inode, pos;
+    String path;
+    FileReader fr = null;
+    JsonReader jr = null;
+    try {
+      fr = new FileReader(filePath);
+      jr = new JsonReader(fr);
+      jr.beginArray();
+      while (jr.hasNext()) {
+        inode = null;
+        pos = null;
+        path = null;
+        jr.beginObject();
+        while (jr.hasNext()) {
+          switch (jr.nextName()) {
+          case "inode":
+            inode = jr.nextLong();
+            break;
+          case "pos":
+            pos = jr.nextLong();
+            break;
+          case "file":
+            path = jr.nextString();
+            break;
+          }
+        }
+        jr.endObject();
+
+        for (Object v : Arrays.asList(inode, pos, path)) {
+          Preconditions.checkNotNull(v, "Detected missing value in position file. "
+              + "inode: " + inode + ", pos: " + pos + ", path: " + path);
+        }
+        TailFile tf = tailFiles.get(inode);
+        if (tf != null && tf.updatePos(path, inode, pos)) {
+          tailFiles.put(inode, tf);
+        } else {
+          logger.info("Missing file: " + path + ", inode: " + inode + ", pos: " + pos);
+        }
+      }
+      jr.endArray();
+    } catch (FileNotFoundException e) {
+      logger.info("File not found: " + filePath + ", not updating position");
+    } catch (IOException e) {
+      logger.error("Failed loading positionFile: " + filePath, e);
+    } finally {
+      try {
+        if (fr != null) fr.close();
+        if (jr != null) jr.close();
+      } catch (IOException e) {
+        logger.error("Error: " + e.getMessage(), e);
+      }
+    }
+  }
+
+  public Map<Long, TailFile> getTailFiles() {
+    return tailFiles;
+  }
+
+  public void setCurrentFile(TailFile currentFile) {
+    this.currentFile = currentFile;
+  }
+
+  @Override
+  public Event readEvent() throws IOException {
+    List<Event> events = readEvents(1);
+    if (events.isEmpty()) {
+      return null;
+    }
+    return events.get(0);
+  }
+
+  @Override
+  public List<Event> readEvents(int numEvents) throws IOException {
+    return readEvents(numEvents, false);
+  }
+
+  @VisibleForTesting
+  public List<Event> readEvents(TailFile tf, int numEvents) throws IOException {
+    setCurrentFile(tf);
+    return readEvents(numEvents, true);
+  }
+
+  public List<Event> readEvents(int numEvents, boolean backoffWithoutNL)
+      throws IOException {
+    if (!committed) {
+      if (currentFile == null) {
+        throw new IllegalStateException("current file does not exist. " + currentFile.getPath());
+      }
+      logger.info("Last read was never committed - resetting position");
+      long lastPos = currentFile.getPos();
+      currentFile.getRaf().seek(lastPos);
+    }
+    List<Event> events = currentFile.readEvents(numEvents, backoffWithoutNL, addByteOffset);
+    if (events.isEmpty()) {
+      return events;
+    }
+
+    Map<String, String> headers = currentFile.getHeaders();
+    if (headers != null && !headers.isEmpty()) {
+      for (Event event : events) {
+        event.getHeaders().putAll(headers);
+      }
+    }
+    committed = false;
+    return events;
+  }
+
+  @Override
+  public void close() throws IOException {
+    for (TailFile tf : tailFiles.values()) {
+      if (tf.getRaf() != null) tf.getRaf().close();
+    }
+  }
+
+  /** Commit the last lines which were read. */
+  @Override
+  public void commit() throws IOException {
+    if (!committed && currentFile != null) {
+      long pos = currentFile.getRaf().getFilePointer();
+      currentFile.setPos(pos);
+      currentFile.setLastUpdated(updateTime);
+      committed = true;
+    }
+  }
+
+  /**
+   * Update tailFiles mapping if a new file is created or appends are detected
+   * to the existing file.
+   */
+  public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
+    updateTime = System.currentTimeMillis();
+    List<Long> updatedInodes = Lists.newArrayList();
+
+    for (Cell<String, File, Pattern> cell : tailFileTable.cellSet()) {
+      Map<String, String> headers = headerTable.row(cell.getRowKey());
+      File parentDir = cell.getColumnKey();
+      Pattern fileNamePattern = cell.getValue();
+
+      for (File f : getMatchFiles(parentDir, fileNamePattern)) {
+        long inode = getInode(f);
+        TailFile tf = tailFiles.get(inode);
+        if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
+          long startPos = skipToEnd ? f.length() : 0;
+          tf = openFile(f, headers, inode, startPos);
+        } else{
+          boolean updated = tf.getLastUpdated() < f.lastModified();
+          if (updated) {
+            if (tf.getRaf() == null) {
+              tf = openFile(f, headers, inode, tf.getPos());
+            }
+            if (f.length() < tf.getPos()) {
+              logger.info("Pos " + tf.getPos() + " is larger than file size! "
+                  + "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode);
+              tf.updatePos(tf.getPath(), inode, 0);
+            }
+          }
+          tf.setNeedTail(updated);
+        }
+        tailFiles.put(inode, tf);
+        updatedInodes.add(inode);
+      }
+    }
+    return updatedInodes;
+  }
+
+  public List<Long> updateTailFiles() throws IOException {
+    return updateTailFiles(false);
+  }
+
+  private List<File> getMatchFiles(File parentDir, final Pattern fileNamePattern) {
+    FileFilter filter = new FileFilter() {
+      public boolean accept(File f) {
+        String fileName = f.getName();
+        if (f.isDirectory() || !fileNamePattern.matcher(fileName).matches()) {
+          return false;
+        }
+        return true;
+      }
+    };
+    File[] files = parentDir.listFiles(filter);
+    ArrayList<File> result = Lists.newArrayList(files);
+    Collections.sort(result, new TailFile.CompareByLastModifiedTime());
+    return result;
+  }
+
+  private long getInode(File file) throws IOException {
+    long inode = (long) Files.getAttribute(file.toPath(), "unix:ino");
+    return inode;
+  }
+
+  private TailFile openFile(File file, Map<String, String> headers, long inode, long pos) {
+    try {
+      logger.info("Opening file: " + file + ", inode: " + inode + ", pos: " + pos);
+      return new TailFile(file, headers, inode, pos);
+    } catch (IOException e) {
+      throw new FlumeException("Failed opening file: " + file, e);
+    }
+  }
+
+  /**
+   * Special builder class for ReliableTaildirEventReader
+   */
+  public static class Builder {
+    private Map<String, String> filePaths;
+    private Table<String, String, String> headerTable;
+    private String positionFilePath;
+    private boolean skipToEnd;
+    private boolean addByteOffset;
+
+    public Builder filePaths(Map<String, String> filePaths) {
+      this.filePaths = filePaths;
+      return this;
+    }
+
+    public Builder headerTable(Table<String, String, String> headerTable) {
+      this.headerTable = headerTable;
+      return this;
+    }
+
+    public Builder positionFilePath(String positionFilePath) {
+      this.positionFilePath = positionFilePath;
+      return this;
+    }
+
+    public Builder skipToEnd(boolean skipToEnd) {
+      this.skipToEnd = skipToEnd;
+      return this;
+    }
+
+    public Builder addByteOffset(boolean addByteOffset) {
+      this.addByteOffset = addByteOffset;
+      return this;
+    }
+
+    public ReliableTaildirEventReader build() throws IOException {
+      return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd, addByteOffset);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/757a560d/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
new file mode 100644
index 0000000..99683da
--- /dev/null
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.source.taildir;
+
+import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+
+public class TailFile {
+  private static final Logger logger = LoggerFactory.getLogger(TailFile.class);
+
+  private static final String LINE_SEP = "\n";
+  private static final String LINE_SEP_WIN = "\r\n";
+
+  private RandomAccessFile raf;
+  private final String path;
+  private final long inode;
+  private long pos;
+  private long lastUpdated;
+  private boolean needTail;
+  private final Map<String, String> headers;
+
+  public TailFile(File file, Map<String, String> headers, long inode, long pos)
+      throws IOException {
+    this.raf = new RandomAccessFile(file, "r");
+    if (pos > 0) raf.seek(pos);
+    this.path = file.getAbsolutePath();
+    this.inode = inode;
+    this.pos = pos;
+    this.lastUpdated = 0L;
+    this.needTail = true;
+    this.headers = headers;
+  }
+
+  public RandomAccessFile getRaf() { return raf; }
+  public String getPath() { return path; }
+  public long getInode() { return inode; }
+  public long getPos() { return pos; }
+  public long getLastUpdated() { return lastUpdated; }
+  public boolean needTail() { return needTail; }
+  public Map<String, String> getHeaders() { return headers; }
+
+  public void setPos(long pos) { this.pos = pos; }
+  public void setLastUpdated(long lastUpdated) { this.lastUpdated = lastUpdated; }
+  public void setNeedTail(boolean needTail) { this.needTail = needTail; }
+
+  public boolean updatePos(String path, long inode, long pos) throws IOException {
+    if (this.inode == inode && this.path.equals(path)) {
+      raf.seek(pos);
+      setPos(pos);
+      logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);
+      return true;
+    }
+    return false;
+  }
+
+  public List<Event> readEvents(int numEvents, boolean backoffWithoutNL,
+      boolean addByteOffset) throws IOException {
+    List<Event> events = Lists.newLinkedList();
+    for (int i = 0; i < numEvents; i++) {
+      Event event = readEvent(backoffWithoutNL, addByteOffset);
+      if (event == null) {
+        break;
+      }
+      events.add(event);
+    }
+    return events;
+  }
+
+  private Event readEvent(boolean backoffWithoutNL, boolean addByteOffset) throws IOException {
+    Long posTmp = raf.getFilePointer();
+    String line = readLine();
+    if (line == null) {
+      return null;
+    }
+    if (backoffWithoutNL && !line.endsWith(LINE_SEP)) {
+      logger.info("Backing off in file without newline: "
+          + path + ", inode: " + inode + ", pos: " + raf.getFilePointer());
+      raf.seek(posTmp);
+      return null;
+    }
+
+    String lineSep = LINE_SEP;
+    if(line.endsWith(LINE_SEP_WIN)) {
+      lineSep = LINE_SEP_WIN;
+    }
+    Event event = EventBuilder.withBody(StringUtils.removeEnd(line, lineSep), Charsets.UTF_8);
+    if (addByteOffset == true) {
+      event.getHeaders().put(BYTE_OFFSET_HEADER_KEY, posTmp.toString());
+    }
+    return event;
+  }
+
+  private String readLine() throws IOException {
+    ByteArrayDataOutput out = ByteStreams.newDataOutput(300);
+    int i = 0;
+    int c;
+    while ((c = raf.read()) != -1) {
+      i++;
+      out.write((byte) c);
+      if (c == LINE_SEP.charAt(0)) {
+        break;
+      }
+    }
+    if (i == 0) {
+      return null;
+    }
+    return new String(out.toByteArray(), Charsets.UTF_8);
+  }
+
+  public void close() {
+    try {
+      raf.close();
+      raf = null;
+      long now = System.currentTimeMillis();
+      setLastUpdated(now);
+    } catch (IOException e) {
+      logger.error("Failed closing file: " + path + ", inode: " + inode, e);
+    }
+  }
+
+  public static class CompareByLastModifiedTime implements Comparator<File> {
+    @Override
+    public int compare(File f1, File f2) {
+      return Long.valueOf(f1.lastModified()).compareTo(f2.lastModified());
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/757a560d/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
new file mode 100644
index 0000000..97ca43b
--- /dev/null
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.flume.source.taildir;
+
+import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.*;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.PollableSource;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.source.AbstractSource;
+import org.apache.flume.source.PollableSourceConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.Gson;
+
+public class TaildirSource extends AbstractSource implements
+    PollableSource, Configurable {
+
+  private static final Logger logger = LoggerFactory.getLogger(TaildirSource.class);
+
+  private Map<String, String> filePaths;
+  private Table<String, String, String> headerTable;
+  private int batchSize;
+  private String positionFilePath;
+  private boolean skipToEnd;
+  private boolean byteOffsetHeader;
+
+  private SourceCounter sourceCounter;
+  private ReliableTaildirEventReader reader;
+  private ScheduledExecutorService idleFileChecker;
+  private ScheduledExecutorService positionWriter;
+  private int retryInterval = 1000;
+  private int maxRetryInterval = 5000;
+  private int idleTimeout;
+  private int checkIdleInterval = 5000;
+  private int writePosInitDelay = 5000;
+  private int writePosInterval;
+
+  private List<Long> existingInodes = new CopyOnWriteArrayList<Long>();
+  private List<Long> idleInodes = new CopyOnWriteArrayList<Long>();
+  private Long backoffSleepIncrement;
+  private Long maxBackOffSleepInterval;
+
+  @Override
+  public synchronized void start() {
+    logger.info("{} TaildirSource source starting with directory: {}", getName(), filePaths);
+    try {
+      reader = new ReliableTaildirEventReader.Builder()
+          .filePaths(filePaths)
+          .headerTable(headerTable)
+          .positionFilePath(positionFilePath)
+          .skipToEnd(skipToEnd)
+          .addByteOffset(byteOffsetHeader)
+          .build();
+    } catch (IOException e) {
+      throw new FlumeException("Error instantiating ReliableTaildirEventReader", e);
+    }
+    idleFileChecker = Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build());
+    idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(),
+        idleTimeout, checkIdleInterval, TimeUnit.MILLISECONDS);
+
+    positionWriter = Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder().setNameFormat("positionWriter").build());
+    positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(),
+        writePosInitDelay, writePosInterval, TimeUnit.MILLISECONDS);
+
+    super.start();
+    logger.debug("TaildirSource started");
+    sourceCounter.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    try {
+      super.stop();
+      ExecutorService[] services = {idleFileChecker, positionWriter};
+      for (ExecutorService service : services) {
+        service.shutdown();
+        if (!service.awaitTermination(1, TimeUnit.SECONDS)) {
+          service.shutdownNow();
+        }
+      }
+      // write the last position
+      writePosition();
+      reader.close();
+    } catch (InterruptedException e) {
+      logger.info("Interrupted while awaiting termination", e);
+    } catch (IOException e) {
+      logger.info("Failed: " + e.getMessage(), e);
+    }
+    sourceCounter.stop();
+    logger.info("Taildir source {} stopped. Metrics: {}", getName(), sourceCounter);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Taildir source: { positionFile: %s, skipToEnd: %s, "
+        + "byteOffsetHeader: %s, idleTimeout: %s, writePosInterval: %s }",
+        positionFilePath, skipToEnd, byteOffsetHeader, idleTimeout, writePosInterval);
+  }
+
+  @Override
+  public synchronized void configure(Context context) {
+    String fileGroups = context.getString(FILE_GROUPS);
+    Preconditions.checkState(fileGroups != null, "Missing param: " + FILE_GROUPS);
+
+    filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX), fileGroups.split("\\s+"));
+    Preconditions.checkState(!filePaths.isEmpty(),
+        "Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'");
+
+    String homePath = System.getProperty("user.home").replace('\\', '/');
+    positionFilePath = context.getString(POSITION_FILE, homePath + DEFAULT_POSITION_FILE);
+    headerTable = getTable(context, HEADERS_PREFIX);
+    batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
+    skipToEnd = context.getBoolean(SKIP_TO_END, DEFAULT_SKIP_TO_END);
+    byteOffsetHeader = context.getBoolean(BYTE_OFFSET_HEADER, DEFAULT_BYTE_OFFSET_HEADER);
+    idleTimeout = context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT);
+    writePosInterval = context.getInteger(WRITE_POS_INTERVAL, DEFAULT_WRITE_POS_INTERVAL);
+
+    backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT
+            , PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
+    maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP
+            , PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
+
+    if (sourceCounter == null) {
+      sourceCounter = new SourceCounter(getName());
+    }
+  }
+
+  private Map<String, String> selectByKeys(Map<String, String> map, String[] keys) {
+    Map<String, String> result = Maps.newHashMap();
+    for (String key : keys) {
+      if (map.containsKey(key)) {
+        result.put(key, map.get(key));
+      }
+    }
+    return result;
+  }
+
+  private Table<String, String, String> getTable(Context context, String prefix) {
+    Table<String, String, String> table = HashBasedTable.create();
+    for (Entry<String, String> e : context.getSubProperties(prefix).entrySet()) {
+      String[] parts = e.getKey().split("\\.", 2);
+      table.put(parts[0], parts[1], e.getValue());
+    }
+    return table;
+  }
+
+  @VisibleForTesting
+  protected SourceCounter getSourceCounter() {
+    return sourceCounter;
+  }
+
+  @Override
+  public Status process() {
+    Status status = Status.READY;
+    try {
+      existingInodes.clear();
+      existingInodes.addAll(reader.updateTailFiles());
+      for (long inode : existingInodes) {
+        TailFile tf = reader.getTailFiles().get(inode);
+        if (tf.needTail()) {
+          tailFileProcess(tf, true);
+        }
+      }
+      closeTailFiles();
+      try {
+        TimeUnit.MILLISECONDS.sleep(retryInterval);
+      } catch (InterruptedException e) {
+        logger.info("Interrupted while sleeping");
+      }
+    } catch (Throwable t) {
+      logger.error("Unable to tail files", t);
+      status = Status.BACKOFF;
+    }
+    return status;
+  }
+
+  @Override
+  public long getBackOffSleepIncrement() {
+    return backoffSleepIncrement;
+  }
+
+  @Override
+  public long getMaxBackOffSleepInterval() {
+    return maxBackOffSleepInterval;
+  }
+
+  private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
+      throws IOException, InterruptedException {
+    while (true) {
+      reader.setCurrentFile(tf);
+      List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
+      if (events.isEmpty()) {
+        break;
+      }
+      sourceCounter.addToEventReceivedCount(events.size());
+      sourceCounter.incrementAppendBatchReceivedCount();
+      try {
+        getChannelProcessor().processEventBatch(events);
+        reader.commit();
+      } catch (ChannelException ex) {
+        logger.warn("The channel is full or unexpected failure. " +
+          "The source will try again after " + retryInterval + " ms");
+        TimeUnit.MILLISECONDS.sleep(retryInterval);
+        retryInterval = retryInterval << 1;
+        retryInterval = Math.min(retryInterval, maxRetryInterval);
+        continue;
+      }
+      retryInterval = 1000;
+      sourceCounter.addToEventAcceptedCount(events.size());
+      sourceCounter.incrementAppendBatchAcceptedCount();
+      if (events.size() < batchSize) {
+        break;
+      }
+    }
+  }
+
+  private void closeTailFiles() throws IOException, InterruptedException {
+    for (long inode : idleInodes) {
+      TailFile tf = reader.getTailFiles().get(inode);
+      if (tf.getRaf() != null) { // when file has not closed yet
+        tailFileProcess(tf, false);
+        tf.close();
+        logger.info("Closed file: " + tf.getPath() + ", inode: " + inode + ", pos: " + tf.getPos());
+      }
+    }
+    idleInodes.clear();
+  }
+
+  /**
+   * Runnable class that checks whether there are files which should be closed.
+   */
+  private class idleFileCheckerRunnable implements Runnable {
+    @Override
+    public void run() {
+      try {
+        long now = System.currentTimeMillis();
+        for (TailFile tf : reader.getTailFiles().values()) {
+          if (tf.getLastUpdated() + idleTimeout < now && tf.getRaf() != null) {
+            idleInodes.add(tf.getInode());
+          }
+        }
+      } catch (Throwable t) {
+        logger.error("Uncaught exception in IdleFileChecker thread", t);
+      }
+    }
+  }
+
+  /**
+   * Runnable class that writes a position file which has the last read position
+   * of each file.
+   */
+  private class PositionWriterRunnable implements Runnable {
+    @Override
+    public void run() {
+      writePosition();
+    }
+  }
+
+  private void writePosition() {
+    File file = new File(positionFilePath);
+    FileWriter writer = null;
+    try {
+      writer = new FileWriter(file);
+      if (!existingInodes.isEmpty()) {
+        String json = toPosInfoJson();
+        writer.write(json);
+      }
+    } catch (Throwable t){
+      logger.error("Failed writing positionFile", t);
+    } finally {
+      try {
+        if (writer != null) writer.close();
+      } catch (IOException e) {
+        logger.error("Error: " + e.getMessage(), e);
+      }
+    }
+  }
+
+  private String toPosInfoJson() {
+    @SuppressWarnings("rawtypes")
+    List<Map> posInfos = Lists.newArrayList();
+    for (Long inode : existingInodes) {
+      TailFile tf = reader.getTailFiles().get(inode);
+      posInfos.add(ImmutableMap.of("inode", inode, "pos", tf.getPos(), "file", tf.getPath()));
+    }
+    return new Gson().toJson(posInfos);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/757a560d/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
new file mode 100644
index 0000000..6165276
--- /dev/null
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.flume.source.taildir;
+
+public class TaildirSourceConfigurationConstants {
+  /** Mapping for tailing file groups. */
+  public static final String FILE_GROUPS = "filegroups";
+  public static final String FILE_GROUPS_PREFIX = FILE_GROUPS + ".";
+
+  /** Mapping for putting headers to events grouped by file groups. */
+  public static final String HEADERS_PREFIX = "headers.";
+
+  /** Path of position file. */
+  public static final String POSITION_FILE = "positionFile";
+  public static final String DEFAULT_POSITION_FILE = "/.flume/taildir_position.json";
+
+  /** What size to batch with before sending to ChannelProcessor. */
+  public static final String BATCH_SIZE = "batchSize";
+  public static final int DEFAULT_BATCH_SIZE = 100;
+
+  /** Whether to skip the position to EOF in the case of files not written on the position file. */
+  public static final String SKIP_TO_END = "skipToEnd";
+  public static final boolean DEFAULT_SKIP_TO_END = false;
+
+  /** Time (ms) to close idle files. */
+  public static final String IDLE_TIMEOUT = "idleTimeout";
+  public static final int DEFAULT_IDLE_TIMEOUT = 120000;
+
+  /** Interval time (ms) to write the last position of each file on the position file. */
+  public static final String WRITE_POS_INTERVAL = "writePosInterval";
+  public static final int DEFAULT_WRITE_POS_INTERVAL = 3000;
+
+  /** Whether to add the byte offset of a tailed line to the header */
+  public static final String BYTE_OFFSET_HEADER = "byteOffsetHeader";
+  public static final String BYTE_OFFSET_HEADER_KEY = "byteoffset";
+  public static final boolean DEFAULT_BYTE_OFFSET_HEADER = false;
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/757a560d/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java
new file mode 100644
index 0000000..1896883
--- /dev/null
+++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.source.taildir;
+
+import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.*;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flume.Event;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
+import com.google.common.io.Files;
+
+public class TestTaildirEventReader {
+  private File tmpDir;
+  private String posFilePath;
+
+  public static String bodyAsString(Event event) {
+    return new String(event.getBody());
+  }
+
+  static List<String> bodiesAsStrings(List<Event> events) {
+    List<String> bodies = Lists.newArrayListWithCapacity(events.size());
+    for (Event event : events) {
+      bodies.add(new String(event.getBody()));
+    }
+    return bodies;
+  }
+
+  static List<String> headersAsStrings(List<Event> events, String headerKey) {
+    List<String> headers = Lists.newArrayListWithCapacity(events.size());
+    for (Event event : events) {
+      headers.add(new String(event.getHeaders().get(headerKey)));
+    }
+    return headers;
+  }
+
+  private ReliableTaildirEventReader getReader(Map<String, String> filePaths,
+      Table<String, String, String> headerTable, boolean addByteOffset) {
+    ReliableTaildirEventReader reader;
+    try {
+      reader = new ReliableTaildirEventReader.Builder()
+          .filePaths(filePaths)
+          .headerTable(headerTable)
+          .positionFilePath(posFilePath)
+          .skipToEnd(false)
+          .addByteOffset(addByteOffset)
+          .build();
+      reader.updateTailFiles();
+    } catch (IOException ioe) {
+      throw Throwables.propagate(ioe);
+    }
+    return reader;
+  }
+
+  private ReliableTaildirEventReader getReader(boolean addByteOffset) {
+    Map<String, String> filePaths = ImmutableMap.of("testFiles", tmpDir.getAbsolutePath() + "/file.*");
+    Table<String, String, String> headerTable = HashBasedTable.create();
+    return getReader(filePaths, headerTable, addByteOffset);
+  }
+
+  private ReliableTaildirEventReader getReader() {
+    return getReader(false);
+  }
+
+  @Before
+  public void setUp() {
+    tmpDir = Files.createTempDir();
+    posFilePath = tmpDir.getAbsolutePath() + "/taildir_position_test.json";
+  }
+
+  @After
+  public void tearDown() {
+    for (File f : tmpDir.listFiles()) {
+      if (f.isDirectory()) {
+        for (File sdf : f.listFiles()) {
+          sdf.delete();
+        }
+      }
+      f.delete();
+    }
+    tmpDir.delete();
+  }
+
+  @Test
+  // Create three multi-line files then read them back out. Ensures that
+  // lines and appended ones are read correctly from files.
+  public void testBasicReadFiles() throws IOException {
+    File f1 = new File(tmpDir, "file1");
+    File f2 = new File(tmpDir, "file2");
+    File f3 = new File(tmpDir, "file3");
+    Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8);
+    Files.write("file2line1\nfile2line2\n", f2, Charsets.UTF_8);
+    Files.write("file3line1\nfile3line2\n", f3, Charsets.UTF_8);
+
+    ReliableTaildirEventReader reader = getReader();
+    List<String> out = Lists.newArrayList();
+    for (TailFile tf : reader.getTailFiles().values()) {
+      List<String> bodies = bodiesAsStrings(reader.readEvents(tf, 2));
+      out.addAll(bodies);
+      reader.commit();
+    }
+    assertEquals(6, out.size());
+    // Make sure we got every line
+    assertTrue(out.contains("file1line1"));
+    assertTrue(out.contains("file1line2"));
+    assertTrue(out.contains("file2line1"));
+    assertTrue(out.contains("file2line2"));
+    assertTrue(out.contains("file3line1"));
+    assertTrue(out.contains("file3line2"));
+
+    Files.append("file3line3\nfile3line4\n", f3, Charsets.UTF_8);
+
+    reader.updateTailFiles();
+    for (TailFile tf : reader.getTailFiles().values()) {
+      List<String> bodies = bodiesAsStrings(reader.readEvents(tf, 2));
+      out.addAll(bodies);
+      reader.commit();
+    }
+    assertEquals(8, out.size());
+    assertTrue(out.contains("file3line3"));
+    assertTrue(out.contains("file3line4"));
+  }
+
+  @Test
+  // Make sure this works when there are initially no files
+  // and we finish reading all files and fully commit.
+  public void testInitiallyEmptyDirAndBehaviorAfterReadingAll() throws IOException {
+    ReliableTaildirEventReader reader = getReader();
+
+    List<Long> fileInodes = reader.updateTailFiles();
+    assertEquals(0, fileInodes.size());
+
+    File f1 = new File(tmpDir, "file1");
+    Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8);
+
+    reader.updateTailFiles();
+    List<String> out = null;
+    for (TailFile tf : reader.getTailFiles().values()) {
+      out = bodiesAsStrings(reader.readEvents(tf, 2));
+      reader.commit();
+    }
+    assertEquals(2, out.size());
+    // Make sure we got every line
+    assertTrue(out.contains("file1line1"));
+    assertTrue(out.contains("file1line2"));
+
+    reader.updateTailFiles();
+    List<String> empty = null;
+    for (TailFile tf : reader.getTailFiles().values()) {
+      empty = bodiesAsStrings(reader.readEvents(tf, 15));
+      reader.commit();
+    }
+    assertEquals(0, empty.size());
+  }
+
+  @Test
+  // Test a basic case where a commit is missed.
+  public void testBasicCommitFailure() throws IOException {
+    File f1 = new File(tmpDir, "file1");
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n" +
+                "file1line9\nfile1line10\nfile1line11\nfile1line12\n",
+                f1, Charsets.UTF_8);
+
+    ReliableTaildirEventReader reader = getReader();
+    List<String> out1 = null;
+    for (TailFile tf : reader.getTailFiles().values()) {
+      out1 = bodiesAsStrings(reader.readEvents(tf, 4));
+    }
+    assertTrue(out1.contains("file1line1"));
+    assertTrue(out1.contains("file1line2"));
+    assertTrue(out1.contains("file1line3"));
+    assertTrue(out1.contains("file1line4"));
+
+    List<String> out2 = bodiesAsStrings(reader.readEvents(4));
+    assertTrue(out2.contains("file1line1"));
+    assertTrue(out2.contains("file1line2"));
+    assertTrue(out2.contains("file1line3"));
+    assertTrue(out2.contains("file1line4"));
+
+    reader.commit();
+    List<String> out3 = bodiesAsStrings(reader.readEvents(4));
+    assertTrue(out3.contains("file1line5"));
+    assertTrue(out3.contains("file1line6"));
+    assertTrue(out3.contains("file1line7"));
+    assertTrue(out3.contains("file1line8"));
+
+    reader.commit();
+    List<String> out4 = bodiesAsStrings(reader.readEvents(4));
+    assertEquals(4, out4.size());
+    assertTrue(out4.contains("file1line9"));
+    assertTrue(out4.contains("file1line10"));
+    assertTrue(out4.contains("file1line11"));
+    assertTrue(out4.contains("file1line12"));
+  }
+
+  @Test
+  // Test a case where a commit is missed and the batch size changes.
+  public void testBasicCommitFailureAndBatchSizeChanges() throws IOException {
+    File f1 = new File(tmpDir, "file1");
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+                f1, Charsets.UTF_8);
+
+    ReliableTaildirEventReader reader = getReader();
+    List<String> out1 = null;
+    for (TailFile tf : reader.getTailFiles().values()) {
+      out1 = bodiesAsStrings(reader.readEvents(tf, 5));
+    }
+    assertTrue(out1.contains("file1line1"));
+    assertTrue(out1.contains("file1line2"));
+    assertTrue(out1.contains("file1line3"));
+    assertTrue(out1.contains("file1line4"));
+    assertTrue(out1.contains("file1line5"));
+
+    List<String> out2 = bodiesAsStrings(reader.readEvents(2));
+    assertTrue(out2.contains("file1line1"));
+    assertTrue(out2.contains("file1line2"));
+
+    reader.commit();
+    List<String> out3 = bodiesAsStrings(reader.readEvents(2));
+    assertTrue(out3.contains("file1line3"));
+    assertTrue(out3.contains("file1line4"));
+
+    reader.commit();
+    List<String> out4 = bodiesAsStrings(reader.readEvents(15));
+    assertTrue(out4.contains("file1line5"));
+    assertTrue(out4.contains("file1line6"));
+    assertTrue(out4.contains("file1line7"));
+    assertTrue(out4.contains("file1line8"));
+  }
+
+  @Test
+  public void testIncludeEmptyFile() throws IOException {
+    File f1 = new File(tmpDir, "file1");
+    File f2 = new File(tmpDir, "file2");
+    Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8);
+    Files.touch(f2);
+
+    ReliableTaildirEventReader reader = getReader();
+    // Expect to read nothing from empty file
+    List<String> out = Lists.newArrayList();
+    for (TailFile tf : reader.getTailFiles().values()) {
+      out.addAll(bodiesAsStrings(reader.readEvents(tf, 5)));
+      reader.commit();
+    }
+    assertEquals(2, out.size());
+    assertTrue(out.contains("file1line1"));
+    assertTrue(out.contains("file1line2"));
+    assertNull(reader.readEvent());
+  }
+
+  @Test
+  public void testBackoffWithoutNewLine() throws IOException {
+    File f1 = new File(tmpDir, "file1");
+    Files.write("file1line1\nfile1", f1, Charsets.UTF_8);
+
+    ReliableTaildirEventReader reader = getReader();
+    List<String> out = Lists.newArrayList();
+    // Expect to read only the line with newline
+    for (TailFile tf : reader.getTailFiles().values()) {
+      out.addAll(bodiesAsStrings(reader.readEvents(tf, 5)));
+      reader.commit();
+    }
+    assertEquals(1, out.size());
+    assertTrue(out.contains("file1line1"));
+
+    Files.append("line2\nfile1line3\nfile1line4", f1, Charsets.UTF_8);
+
+    for (TailFile tf : reader.getTailFiles().values()) {
+      out.addAll(bodiesAsStrings(reader.readEvents(tf, 5)));
+      reader.commit();
+    }
+    assertEquals(3, out.size());
+    assertTrue(out.contains("file1line2"));
+    assertTrue(out.contains("file1line3"));
+
+    // Should read the last line if it finally has no newline
+    out.addAll(bodiesAsStrings(reader.readEvents(5, false)));
+    reader.commit();
+    assertEquals(4, out.size());
+    assertTrue(out.contains("file1line4"));
+  }
+
+  @Test
+  public void testBatchedReadsAcrossFileBoundary() throws IOException {
+    File f1 = new File(tmpDir, "file1");
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+                f1, Charsets.UTF_8);
+
+    ReliableTaildirEventReader reader = getReader();
+    List<String> out1 = Lists.newArrayList();
+    for (TailFile tf : reader.getTailFiles().values()) {
+      out1.addAll(bodiesAsStrings(reader.readEvents(tf, 5)));
+      reader.commit();
+    }
+
+    File f2 = new File(tmpDir, "file2");
+    Files.write("file2line1\nfile2line2\nfile2line3\nfile2line4\n" +
+                "file2line5\nfile2line6\nfile2line7\nfile2line8\n",
+                f2, Charsets.UTF_8);
+
+    List<String> out2 = bodiesAsStrings(reader.readEvents(5));
+    reader.commit();
+
+    reader.updateTailFiles();
+    List<String> out3 = Lists.newArrayList();
+    for (TailFile tf : reader.getTailFiles().values()) {
+      out3.addAll(bodiesAsStrings(reader.readEvents(tf, 5)));
+      reader.commit();
+    }
+
+    // Should have first 5 lines of file1
+    assertEquals(5, out1.size());
+    assertTrue(out1.contains("file1line1"));
+    assertTrue(out1.contains("file1line2"));
+    assertTrue(out1.contains("file1line3"));
+    assertTrue(out1.contains("file1line4"));
+    assertTrue(out1.contains("file1line5"));
+
+    // Should have 3 remaining lines of file1
+    assertEquals(3, out2.size());
+    assertTrue(out2.contains("file1line6"));
+    assertTrue(out2.contains("file1line7"));
+    assertTrue(out2.contains("file1line8"));
+
+    // Should have first 5 lines of file2
+    assertEquals(5, out3.size());
+    assertTrue(out3.contains("file2line1"));
+    assertTrue(out3.contains("file2line2"));
+    assertTrue(out3.contains("file2line3"));
+    assertTrue(out3.contains("file2line4"));
+    assertTrue(out3.contains("file2line5"));
+  }
+
+  @Test
+  public void testLargeNumberOfFiles() throws IOException {
+    int fileNum = 1000;
+    Set<String> expected = Sets.newHashSet();
+
+    for (int i = 0; i < fileNum; i++) {
+      String data = "data" + i;
+      File f = new File(tmpDir, "file" + i);
+      Files.write(data + "\n", f, Charsets.UTF_8);
+      expected.add(data);
+    }
+
+    ReliableTaildirEventReader reader = getReader();
+    for (TailFile tf : reader.getTailFiles().values()) {
+      List<Event> events = reader.readEvents(tf, 10);
+      for (Event e : events) {
+        expected.remove(new String(e.getBody()));
+      }
+      reader.commit();
+    }
+    assertEquals(0, expected.size());
+  }
+
+  @Test
+  public void testLoadPositionFile() throws IOException {
+    File f1 = new File(tmpDir, "file1");
+    File f2 = new File(tmpDir, "file2");
+    File f3 = new File(tmpDir, "file3");
+
+    Files.write("file1line1\nfile1line2\nfile1line3\n", f1, Charsets.UTF_8);
+    Files.write("file2line1\nfile2line2\n", f2, Charsets.UTF_8);
+    Files.write("file3line1\n", f3, Charsets.UTF_8);
+
+    ReliableTaildirEventReader reader = getReader();
+    Map<Long, TailFile> tailFiles = reader.getTailFiles();
+
+    long pos = f2.length();
+    int i = 1;
+    File posFile = new File(posFilePath);
+    for (TailFile tf : tailFiles.values()) {
+      Files.append(i == 1 ? "[" : "", posFile, Charsets.UTF_8);
+      Files.append(String.format("{\"inode\":%s,\"pos\":%s,\"file\":\"%s\"}",
+          tf.getInode(), pos, tf.getPath()), posFile, Charsets.UTF_8);
+      Files.append(i == 3 ? "]" : ",", posFile, Charsets.UTF_8);
+      i++;
+    }
+    reader.loadPositionFile(posFilePath);
+
+    for (TailFile tf : tailFiles.values()) {
+      if (tf.getPath().equals(tmpDir + "file3")) {
+        // when given position is larger than file size
+        assertEquals(0, tf.getPos());
+      } else {
+        assertEquals(pos, tf.getPos());
+      }
+    }
+  }
+
+  @Test
+  public void testSkipToEndPosition() throws IOException {
+    ReliableTaildirEventReader reader = getReader();
+    File f1 = new File(tmpDir, "file1");
+    Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8);
+
+    reader.updateTailFiles();
+    for (TailFile tf : reader.getTailFiles().values()) {
+      if (tf.getPath().equals(tmpDir + "file1")) {
+        assertEquals(0, tf.getPos());
+      }
+    }
+
+    File f2 = new File(tmpDir, "file2");
+    Files.write("file2line1\nfile2line2\n", f2, Charsets.UTF_8);
+    // Expect to skip to EOF the read position when skipToEnd option is true
+    reader.updateTailFiles(true);
+    for (TailFile tf : reader.getTailFiles().values()) {
+      if (tf.getPath().equals(tmpDir + "file2")) {
+        assertEquals(f2.length(), tf.getPos());
+      }
+    }
+  }
+
+  @Test
+  public void testByteOffsetHeader() throws IOException {
+    File f1 = new File(tmpDir, "file1");
+    String line1 = "file1line1\n";
+    String line2 = "file1line2\n";
+    String line3 = "file1line3\n";
+    Files.write(line1 + line2 + line3, f1, Charsets.UTF_8);
+
+    ReliableTaildirEventReader reader = getReader(true);
+    List<String> headers = null;
+    for (TailFile tf : reader.getTailFiles().values()) {
+      headers = headersAsStrings(reader.readEvents(tf, 5), BYTE_OFFSET_HEADER_KEY);
+      reader.commit();
+    }
+    assertEquals(3, headers.size());
+    // Make sure we got byte offset position
+    assertTrue(headers.contains(String.valueOf(0)));
+    assertTrue(headers.contains(String.valueOf(line1.length())));
+    assertTrue(headers.contains(String.valueOf((line1 + line2).length())));
+  }
+
+  @Test
+  public void testNewLineBoundaries() throws IOException {
+    File f1 = new File(tmpDir, "file1");
+    Files.write("file1line1\nfile1line2\rfile1line2\nfile1line3\r\nfile1line4\n", f1, Charsets.UTF_8);
+
+    ReliableTaildirEventReader reader = getReader();
+    List<String> out = Lists.newArrayList();
+    for (TailFile tf : reader.getTailFiles().values()) {
+      out.addAll(bodiesAsStrings(reader.readEvents(tf, 5)));
+      reader.commit();
+    }
+    assertEquals(4, out.size());
+    //Should treat \n as line boundary
+    assertTrue(out.contains("file1line1"));
+    //Should not treat \r as line boundary
+    assertTrue(out.contains("file1line2\rfile1line2"));
+    //Should treat \r\n as line boundary
+    assertTrue(out.contains("file1line3"));
+    assertTrue(out.contains("file1line4"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/757a560d/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
new file mode 100644
index 0000000..f9e614c
--- /dev/null
+++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.flume.source.taildir;
+
+import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.*;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class TestTaildirSource {
+  static TaildirSource source;
+  static MemoryChannel channel;
+  private File tmpDir;
+  private String posFilePath;
+
+  @Before
+  public void setUp() {
+    source = new TaildirSource();
+    channel = new MemoryChannel();
+
+    Configurables.configure(channel, new Context());
+
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(channel);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(channels);
+
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+    tmpDir = Files.createTempDir();
+    posFilePath = tmpDir.getAbsolutePath() + "/taildir_position_test.json";
+  }
+
+  @After
+  public void tearDown() {
+    for (File f : tmpDir.listFiles()) {
+      f.delete();
+    }
+    tmpDir.delete();
+  }
+
+  @Test
+  public void testRegexFileNameFiltering() throws IOException {
+    File f1 = new File(tmpDir, "a.log");
+    File f2 = new File(tmpDir, "a.log.1");
+    File f3 = new File(tmpDir, "b.log");
+    File f4 = new File(tmpDir, "c.log.yyyy-MM-01");
+    File f5 = new File(tmpDir, "c.log.yyyy-MM-02");
+    Files.write("a.log\n", f1, Charsets.UTF_8);
+    Files.write("a.log.1\n", f2, Charsets.UTF_8);
+    Files.write("b.log\n", f3, Charsets.UTF_8);
+    Files.write("c.log.yyyy-MM-01\n", f4, Charsets.UTF_8);
+    Files.write("c.log.yyyy-MM-02\n", f5, Charsets.UTF_8);
+
+    Context context = new Context();
+    context.put(POSITION_FILE, posFilePath);
+    context.put(FILE_GROUPS, "ab c");
+    // Tail a.log and b.log
+    context.put(FILE_GROUPS_PREFIX + "ab", tmpDir.getAbsolutePath() + "/[ab].log");
+    // Tail files that starts with c.log
+    context.put(FILE_GROUPS_PREFIX + "c", tmpDir.getAbsolutePath() + "/c.log.*");
+
+    Configurables.configure(source, context);
+    source.start();
+    source.process();
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    List<String> out = Lists.newArrayList();
+    for (int i = 0; i < 5; i++) {
+      Event e = channel.take();
+      if (e != null) {
+        out.add(TestTaildirEventReader.bodyAsString(e));
+      }
+    }
+    txn.commit();
+    txn.close();
+
+    assertEquals(4, out.size());
+    // Make sure we got every file
+    assertTrue(out.contains("a.log"));
+    assertFalse(out.contains("a.log.1"));
+    assertTrue(out.contains("b.log"));
+    assertTrue(out.contains("c.log.yyyy-MM-01"));
+    assertTrue(out.contains("c.log.yyyy-MM-02"));
+  }
+
+  @Test
+  public void testHeaderMapping() throws IOException {
+    File f1 = new File(tmpDir, "file1");
+    File f2 = new File(tmpDir, "file2");
+    File f3 = new File(tmpDir, "file3");
+    Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8);
+    Files.write("file2line1\nfile2line2\n", f2, Charsets.UTF_8);
+    Files.write("file3line1\nfile3line2\n", f3, Charsets.UTF_8);
+
+    Context context = new Context();
+    context.put(POSITION_FILE, posFilePath);
+    context.put(FILE_GROUPS, "f1 f2 f3");
+    context.put(FILE_GROUPS_PREFIX + "f1", tmpDir.getAbsolutePath() + "/file1$");
+    context.put(FILE_GROUPS_PREFIX + "f2", tmpDir.getAbsolutePath() + "/file2$");
+    context.put(FILE_GROUPS_PREFIX + "f3", tmpDir.getAbsolutePath() + "/file3$");
+    context.put(HEADERS_PREFIX + "f1.headerKeyTest", "value1");
+    context.put(HEADERS_PREFIX + "f2.headerKeyTest", "value2");
+    context.put(HEADERS_PREFIX + "f2.headerKeyTest2", "value2-2");
+
+    Configurables.configure(source, context);
+    source.start();
+    source.process();
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    for (int i = 0; i < 6; i++) {
+      Event e = channel.take();
+      String body = new String(e.getBody(), Charsets.UTF_8);
+      String headerValue = e.getHeaders().get("headerKeyTest");
+      String headerValue2 = e.getHeaders().get("headerKeyTest2");
+      if (body.startsWith("file1")) {
+        assertEquals("value1", headerValue);
+        assertNull(headerValue2);
+      } else if (body.startsWith("file2")) {
+        assertEquals("value2", headerValue);
+        assertEquals("value2-2", headerValue2);
+      } else if (body.startsWith("file3")) {
+        // No header
+        assertNull(headerValue);
+        assertNull(headerValue2);
+      }
+    }
+    txn.commit();
+    txn.close();
+  }
+
+  @Test
+  public void testLifecycle() throws IOException, InterruptedException {
+    File f1 = new File(tmpDir, "file1");
+    Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8);
+
+    Context context = new Context();
+    context.put(POSITION_FILE, posFilePath);
+    context.put(FILE_GROUPS, "f1");
+    context.put(FILE_GROUPS_PREFIX + "f1", tmpDir.getAbsolutePath() + "/file1$");
+    Configurables.configure(source, context);
+
+    for (int i = 0; i < 3; i++) {
+      source.start();
+      source.process();
+      assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+          source, LifecycleState.START_OR_ERROR));
+      assertEquals("Server is started", LifecycleState.START,
+          source.getLifecycleState());
+
+      source.stop();
+      assertTrue("Reached stop or error",
+          LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
+      assertEquals("Server is stopped", LifecycleState.STOP,
+          source.getLifecycleState());
+    }
+  }
+
+  @Test
+  public void testFileConsumeOrder() throws IOException {
+    System.out.println(tmpDir.toString());
+    // 1) Create 1st file
+    File f1 = new File(tmpDir, "file1");
+    String line1 = "file1line1\n";
+    String line2 = "file1line2\n";
+    String line3 = "file1line3\n";
+    Files.write(line1 + line2 + line3, f1, Charsets.UTF_8);
+    try {
+      Thread.sleep(1000); // wait before creating a new file
+    } catch (InterruptedException e) {
+    }
+
+    // 1) Create 2nd file
+    String line1b = "file2line1\n";
+    String line2b = "file2line2\n";
+    String line3b = "file2line3\n";
+    File f2 = new File(tmpDir, "file2");
+    Files.write(line1b + line2b + line3b, f2, Charsets.UTF_8);
+    try {
+      Thread.sleep(1000); // wait before creating next file
+    } catch (InterruptedException e) {
+    }
+
+    // 3) Create 3rd file
+    String line1c = "file3line1\n";
+    String line2c = "file3line2\n";
+    String line3c = "file3line3\n";
+    File f3 = new File(tmpDir, "file3");
+    Files.write(line1c + line2c + line3c, f3, Charsets.UTF_8);
+
+    try {
+      Thread.sleep(1000); // wait before creating a new file
+    } catch (InterruptedException e) {
+    }
+
+
+    // 4) Create 4th file
+    String line1d = "file4line1\n";
+    String line2d = "file4line2\n";
+    String line3d = "file4line3\n";
+    File f4 = new File(tmpDir, "file4");
+    Files.write(line1d + line2d + line3d, f4, Charsets.UTF_8);
+
+    try {
+      Thread.sleep(1000); // wait before creating a new file
+    } catch (InterruptedException e) {
+    }
+
+
+    // 5) Now update the 3rd file so that its the latest file and gets consumed last
+    f3.setLastModified(System.currentTimeMillis());
+
+    // 4) Consume the files
+    ArrayList<String> consumedOrder = Lists.newArrayList();
+    Context context = new Context();
+    context.put(POSITION_FILE, posFilePath);
+    context.put(FILE_GROUPS, "g1");
+    context.put(FILE_GROUPS_PREFIX + "g1", tmpDir.getAbsolutePath() + "/.*");
+
+    Configurables.configure(source, context);
+    source.start();
+    source.process();
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    for (int i = 0; i < 12; i++) {
+      Event e = channel.take();
+      String body = new String(e.getBody(), Charsets.UTF_8);
+      consumedOrder.add(body);
+    }
+    txn.commit();
+    txn.close();
+
+    System.out.println(consumedOrder);
+
+    // 6) Ensure consumption order is in order of last update time
+    ArrayList<String> expected = Lists.newArrayList(line1, line2, line3,    // file1
+                                                    line1b, line2b, line3b, // file2
+                                                    line1d, line2d, line3d, // file4
+                                                    line1c, line2c, line3c  // file3
+                                                     );
+    for(int i =0; i!=expected.size(); ++i) {
+      expected.set(i, expected.get(i).trim() );
+    }
+    assertArrayEquals("Files not consumed in expected order", expected.toArray(), consumedOrder.toArray());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/757a560d/flume-ng-sources/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/pom.xml b/flume-ng-sources/pom.xml
index 79de5fa..f526956 100644
--- a/flume-ng-sources/pom.xml
+++ b/flume-ng-sources/pom.xml
@@ -45,6 +45,7 @@ limitations under the License.
     <module>flume-jms-source</module>
     <module>flume-twitter-source</module>
     <module>flume-kafka-source</module>
+    <module>flume-taildir-source</module>
   </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/757a560d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 64decfd..d3d64b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1232,6 +1232,12 @@ limitations under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.flume.flume-ng-sources</groupId>
+        <artifactId>flume-taildir-source</artifactId>
+        <version>1.7.0-SNAPSHOT</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.flume</groupId>
         <artifactId>flume-ng-sdk</artifactId>
         <version>1.7.0-SNAPSHOT</version>


Mime
View raw message