flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-1425. Create a SpoolDirectory Source and Client.
Date Tue, 06 Nov 2012 02:16:05 GMT
Updated Branches:
  refs/heads/flume-1.4 f5eb9bbb0 -> b4402d9b7


FLUME-1425. Create a SpoolDirectory Source and Client.

(Patrick Wendell via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b4402d9b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b4402d9b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b4402d9b

Branch: refs/heads/flume-1.4
Commit: b4402d9b7a1be17f02dd9e7aeff13e29de7b2502
Parents: f5eb9bb
Author: Mike Percy <mpercy@apache.org>
Authored: Mon Nov 5 18:13:40 2012 -0800
Committer: Mike Percy <mpercy@apache.org>
Committed: Mon Nov 5 18:15:23 2012 -0800

----------------------------------------------------------------------
 .../flume/conf/source/SourceConfiguration.java     |    8 +-
 .../org/apache/flume/conf/source/SourceType.java   |    9 +-
 .../apache/flume/client/avro/AvroCLIClient.java    |   77 +-
 .../flume/client/avro/BufferedLineReader.java      |   67 ++
 .../org/apache/flume/client/avro/LineReader.java   |   51 ++
 .../flume/client/avro/SpoolingFileLineReader.java  |  354 ++++++++
 .../apache/flume/source/SpoolDirectorySource.java  |  155 ++++
 ...SpoolDirectorySourceConfigurationConstants.java |   47 ++
 .../flume/client/avro/TestBufferedLineReader.java  |  117 +++
 .../client/avro/TestSpoolingFileLineReader.java    |  640 +++++++++++++++
 .../flume/source/TestSpoolDirectorySource.java     |  134 +++
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   55 ++-
 12 files changed, 1675 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/b4402d9b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
index da804d7..3027ac0 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
@@ -170,8 +170,14 @@ public class SourceConfiguration extends ComponentConfiguration {
      *
      * @see org.apache.flume.source.SyslogUDPSource
      */
+    SYSLOGUDP("org.apache.flume.conf.source.SyslogUDPSourceConfiguration"),
 
-    SYSLOGUDP("org.apache.flume.conf.source.SyslogUDPSourceConfiguration");
+    /**
+     * Spool directory source
+     *
+     * @see org.apache.flume.source.SpoolDirectorySource
+     */
+    SPOOLDIR("org.apache.flume.conf.source.SpoolDirectorySourceConfiguration");
 
     private String srcConfigurationName;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/b4402d9b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
index abbbf1c..426d112 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
@@ -70,7 +70,14 @@ public enum SourceType {
    * @see org.apache.flume.source.SyslogUDPSource
    */
 
-  SYSLOGUDP("org.apache.flume.source.SyslogUDPSource");
+  SYSLOGUDP("org.apache.flume.source.SyslogUDPSource"),
+
+  /**
+   * Spool directory source
+   *
+   * @see org.apache.flume.source.SpoolDirectorySource
+   */
+  SPOOLDIR("org.apache.flume.conf.source.SpoolDirectorySource");
 
   private final String sourceClassName;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/b4402d9b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
index 4a5ecae..fd0d44f 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
@@ -19,7 +19,6 @@
 
 package org.apache.flume.client.avro;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileReader;
@@ -31,7 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
@@ -47,16 +45,21 @@ import org.apache.flume.event.EventBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
 public class AvroCLIClient {
 
   private static final Logger logger = LoggerFactory
       .getLogger(AvroCLIClient.class);
 
   private static final int BATCH_SIZE = 5;
+  private static final int MAX_LINE_LENGTH = 2000;
 
   private String hostname;
   private int port;
   private String fileName;
+  private String dirName;
   private Map<String, String> headers = new HashMap<String, String>();
   private int sent;
 
@@ -121,6 +124,7 @@ public class AvroCLIClient {
     options.addOption("p", "port", true, "port of the avro source")
         .addOption("H", "host", true, "hostname of the avro source")
         .addOption("F", "filename", true, "file to stream to avro source")
+        .addOption("D", "dirname", true, "directory to stream to avro source")
         .addOption("R", "headerFile", true, ("file containing headers as " +
             "key/value pairs on each new line"))
         .addOption("h", "help", false, "display help text");
@@ -129,11 +133,18 @@ public class AvroCLIClient {
     CommandLine commandLine = parser.parse(options, args);
 
     if (commandLine.hasOption('h')) {
-      new HelpFormatter().printHelp("flume-ng avro-client", options, true);
+      new HelpFormatter().printHelp("flume-ng avro-client", "", options,
+          "The --dirname option assumes that a spooling directory exists " +
+          "where immutable log files are dropped.", true);
 
       return false;
     }
 
+    if (commandLine.hasOption("filename") && commandLine.hasOption("dirname")) {
+      throw new ParseException(
+          "--filename and --dirname options cannot be used simultaneously");
+    }
+
     if (!commandLine.hasOption("port")) {
       throw new ParseException(
           "You must specify a port to connect to with --port");
@@ -148,6 +159,7 @@ public class AvroCLIClient {
 
     hostname = commandLine.getOptionValue("host");
     fileName = commandLine.getOptionValue("filename");
+    dirName = commandLine.getOptionValue("dirname");
 
     if (commandLine.hasOption("headerFile")){
       parseHeaders(commandLine);
@@ -163,51 +175,44 @@ public class AvroCLIClient {
   private void run() throws IOException, FlumeException,
       EventDeliveryException {
 
-    BufferedReader reader = null;
+    LineReader reader = null;
 
-    RpcClient rpcClient = RpcClientFactory.getDefaultInstance(hostname, port, BATCH_SIZE);
+    RpcClient rpcClient = RpcClientFactory.getDefaultInstance(hostname, port,
+        BATCH_SIZE);
     try {
-      List<Event> eventBuffer = Lists.newArrayList();
-
       if (fileName != null) {
-        reader = new BufferedReader(new FileReader(new File(fileName)));
-      } else {
-        reader = new BufferedReader(new InputStreamReader(System.in));
+        reader = new BufferedLineReader(new FileReader(new File(fileName)));
+      } else if (dirName != null) {
+        reader = new SpoolingFileLineReader(new File(dirName), ".COMPLETED",
+            BATCH_SIZE, MAX_LINE_LENGTH);
       }
+      else {
+        reader = new BufferedLineReader(new InputStreamReader(System.in));
+      }
+
 
-      String line;
       long lastCheck = System.currentTimeMillis();
       long sentBytes = 0;
 
       int batchSize = rpcClient.getBatchSize();
-      while ((line = reader.readLine()) != null) {
-        // logger.debug("read:{}", line);
-
-        int size = eventBuffer.size();
-        if (size == batchSize) {
-          rpcClient.appendBatch(eventBuffer);
-          eventBuffer.clear();
+      List<String> lines = Lists.newLinkedList();
+      while (!(lines = reader.readLines(batchSize)).isEmpty()) {
+        List<Event> eventBuffer = Lists.newArrayList();
+        for (String line : lines) {
+          Event event = EventBuilder.withBody(line, Charsets.UTF_8);
+          setHeaders(event);
+          eventBuffer.add(event);
+          sentBytes += event.getBody().length;
+          sent++;
+
+          long now = System.currentTimeMillis();
+          if (now >= lastCheck + 5000) {
+            logger.debug("Packed {} bytes, {} events", sentBytes, sent);
+            lastCheck = now;
+          }
         }
-
-        Event event = EventBuilder.withBody(line, Charset.forName("UTF8"));
-        setHeaders(event);
-        eventBuffer.add(event);
-
-        sentBytes += event.getBody().length;
-        sent++;
-
-        long now = System.currentTimeMillis();
-
-        if (now >= lastCheck + 5000) {
-          logger.debug("Packed {} bytes, {} events", sentBytes, sent);
-          lastCheck = now;
-        }
-      }
-
-      if (!eventBuffer.isEmpty()) {
         rpcClient.appendBatch(eventBuffer);
       }
-
       logger.debug("Finished");
     } finally {
       if (reader != null) {

http://git-wip-us.apache.org/repos/asf/flume/blob/b4402d9b/flume-ng-core/src/main/java/org/apache/flume/client/avro/BufferedLineReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/BufferedLineReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/BufferedLineReader.java
new file mode 100644
index 0000000..718e1b2
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/BufferedLineReader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.client.avro;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+/**
+ * A {@link LineReader} implementation which delegates to a
+ * {@link BufferedReader}.
+ */
+public class BufferedLineReader implements LineReader {
+  private BufferedReader reader;
+
+  public BufferedLineReader(Reader in) {
+    reader = new BufferedReader(in);
+  }
+
+  @Override
+  public String readLine() throws IOException {
+    return reader.readLine();
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+
+  @Override
+  public List<String> readLines(int n) throws IOException {
+    List<String> out = Lists.newLinkedList();
+    String line;
+    while((line = readLine()) != null && out.size() < n) {
+      out.add(line);
+    }
+    return out;
+  }
+
+  public void mark(int readAheadLimit) throws IOException {
+    reader.mark(readAheadLimit);
+  }
+
+  public void reset() throws IOException {
+    reader.reset();
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/b4402d9b/flume-ng-core/src/main/java/org/apache/flume/client/avro/LineReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/LineReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/LineReader.java
new file mode 100644
index 0000000..904f22c
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/LineReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.client.avro;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A line reader produces a stream of lines for the {@link AvroCLIClient} to
+ * ingest into Flume. The stream may be finite or infinite.
+ */
+public interface LineReader {
+
+  /**
+   * Get the next line associated with the input stream. If this returns
+   * {@code null}, the input underlying input source is considered finished.
+   * Note that this is allowed to block for indefinite amounts of time waiting
+   * to generate a new line.
+   */
+  public String readLine() throws IOException;
+
+  /**
+   * Get up to {@code n} lines associated with the input stream. If this returns
+   * less than n lines, the input underlying input source is considered
+   * finished. Note that this is allowed to block for indefinite amounts of
+   * time waiting to generate a new line.
+   */
+  public List<String> readLines(int n) throws IOException;
+
+  /**
+   * Clean-up any state associated with this reader.
+   */
+  public void close() throws IOException ;
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/b4402d9b/flume-ng-core/src/main/java/org/apache/flume/client/avro/SpoolingFileLineReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/SpoolingFileLineReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/SpoolingFileLineReader.java
new file mode 100644
index 0000000..abd2f61
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/SpoolingFileLineReader.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.client.avro;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.flume.FlumeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+/**
+ * A {@link LineReader} which reads log data from files stored in a
+ * spooling directory and renames each file once all of its data has been
+ * read (through {@link #readLine()} calls). The user must {@link #commit()}
+ * each read, to indicate that the lines have been fully processed.
+ * Read calls will return no data if there are no files left to read. This
+ * class, in general, is not thread safe.
+ *
+ * This reader assumes that files with unique file names are left in the
+ * spooling directory and not modified once they are placed there. Any user
+ * behavior which violates these assumptions, when detected, will result in a
+ * FlumeException being thrown.
+ *
+ * This class makes the following guarantees, if above assumptions are met:
+ *   1) Once a log file has been renamed with the {@link #completedSuffix},
+ *      all of its records have been read through the {@link #readLine()}
+ *      function and {@link #commit()}'d exactly once.
+ *   2) All files in the spooling directory will eventually be opened
+ *      and delivered to a {@link #readLine()} caller.
+ *
+ * NOTE: This class is for internal Flume components, this is not an extension
+ * point for developers customizing Flume.
+ */
+public class SpoolingFileLineReader implements LineReader {
+  private static final Logger logger = LoggerFactory
+      .getLogger(SpoolingFileLineReader.class);
+  /** When a line is too long, how many characters do we print to err log. **/
+  private static final int OVERFLOW_LINE_PRINT_CHARS = 30;
+
+  private File directory;
+  public String completedSuffix;
+  private Optional<FileInfo> currentFile = Optional.absent();
+  /** Always contains the last file from which lines have been read. **/
+  private Optional<FileInfo> lastFileRead = Optional.absent();
+  private boolean committed = true;
+  private int bufferMaxLines;
+  private int bufferMaxLineLength;
+  /** A flag to signal an un-recoverable error has occured. */
+  private boolean disabled = false;
+
+  /**
+   * Create a SpoolingFileLineReader to watch the given directory.
+   *
+   * Lines are buffered between when they are read and when they are committed.
+   * The buffer has a fixed size. Its size is determined by (maxLinestoBuffer *
+   * bufferSizePerLine).
+   *
+   * @param directory The directory to watch
+   * @param completedSuffix The suffix to append to completed files
+   * @param bufferMaxLines The maximum number of lines to keep in a pre-commit
+   *                         buffer
+   * @param bufferMaxLineLength The maximum line length for lines in the pre-commit
+   *                           buffer, in characters
+   */
+  public SpoolingFileLineReader(File directory, String completedSuffix,
+      int bufferMaxLines, int bufferMaxLineLength) {
+    // Verify directory exists and is readable/writable
+    Preconditions.checkNotNull(directory);
+    Preconditions.checkState(directory.exists(),
+        "Directory does not exist: " + directory.getAbsolutePath());
+    Preconditions.checkState(directory.isDirectory(),
+        "Path is not a directory: " + directory.getAbsolutePath());
+    Preconditions.checkState(bufferMaxLines > 0);
+    Preconditions.checkState(bufferMaxLineLength > 0);
+
+    // Do a canary test to make sure we have access to spooling directory
+    try {
+      File f1 = File.createTempFile("flume", "test", directory);
+      Files.write("testing flume file permissions\n", f1, Charsets.UTF_8);
+      Files.readLines(f1, Charsets.UTF_8);
+      f1.delete();
+    } catch (IOException e) {
+      throw new FlumeException("Unable to read and modify files" +
+          " in the spooling directory: " + directory, e);
+    }
+    this.directory = directory;
+    this.completedSuffix = completedSuffix;
+    this.bufferMaxLines = bufferMaxLines;
+    this.bufferMaxLineLength = bufferMaxLineLength;
+  }
+
+  /** Return the filename which generated the data from the last successful
+   * {@link #readLines()} call. Returns null if called before any file
+   * contents are read. */
+  public String getLastFileRead() {
+    if (!lastFileRead.isPresent()) {
+      return null;
+    }
+    return lastFileRead.get().getFile().getAbsolutePath();
+  }
+
+  @Override
+  public String readLine() throws IOException {
+    if (disabled) {
+      throw new IllegalStateException("Reader has been disabled.");
+    }
+    List<String> read = readLines(1);
+    if (read.size() == 0) {
+      return null;
+    }
+    return read.get(0);
+  }
+
+  /** Commit the last lines which were read. */
+  public void commit() throws IOException {
+    if (disabled) {
+      throw new IllegalStateException("Reader has been disabled.");
+    }
+    currentFile.get().reader.mark(bufferMaxLines * bufferMaxLineLength);
+    committed = true;
+  }
+
+  @Override
+  /** Reads up to n lines from the current file. Returns an empty list if no
+   *  files are left to read in the directory. */
+  public List<String> readLines(int n) throws IOException {
+    if (disabled) {
+      throw new IllegalStateException("Reader has been disabled.");
+    }
+    if (!committed) {
+      if (!currentFile.isPresent()) {
+        throw new IllegalStateException("File should not roll when " +
+          " commit is outstanding.");
+      }
+      logger.info("Last read was never comitted - resetting mark position.");
+      currentFile.get().getReader().reset();
+    } else {
+      // Check if new files have arrived since last call
+      if (!currentFile.isPresent()) {
+        currentFile = getNextFile();
+      }
+      // Return empty list if no new files
+      if (!currentFile.isPresent()) {
+        return Collections.emptyList();
+      }
+    }
+
+    String outLine = currentFile.get().getReader().readLine();
+
+    /* It's possible that the last read took us just up to a file boundary.
+     * If so, try to roll to the next file, if there is one. */
+    if (outLine == null) {
+      retireCurrentFile();
+      currentFile = getNextFile();
+      if (!currentFile.isPresent()) {
+        return Collections.emptyList();
+      }
+      outLine = currentFile.get().getReader().readLine();
+    }
+
+    List<String> out = Lists.newArrayList();
+    while (outLine != null) {
+      if (outLine.length() > bufferMaxLineLength) {
+        logger.error("Found line longer than " + bufferMaxLineLength +
+            " characters, cannot make progress.");
+        int lastCharToPrint = Math.min(OVERFLOW_LINE_PRINT_CHARS,
+            outLine.length());
+        logger.error("Invalid line starts with: " +
+          outLine.substring(0, lastCharToPrint));
+        disabled = true;
+        throw new FlumeException("Encoutered line that was too long.");
+      }
+      out.add(outLine);
+      if (out.size() == n) { break; }
+      outLine = currentFile.get().getReader().readLine();
+    }
+
+    committed = false;
+    lastFileRead = currentFile;
+    return out;
+  }
+
+  /**
+   * Closes currentFile and attempt to rename it.
+   *
+   * If these operations fail in a way that may cause duplicate log entries,
+   * an error is logged but no exceptions are thrown. If these operations fail
+   * in a way that indicates potential misuse of the spooling directory, a
+   * FlumeException will be thrown.
+   * @throws FlumeException if files do not conform to spooling assumptions
+   */
+  private void retireCurrentFile() throws IOException {
+    Preconditions.checkState(currentFile.isPresent());
+
+    String currPath = currentFile.get().getFile().getAbsolutePath();
+    String newPath = currPath + completedSuffix;
+    logger.info("Preparing to move file " + currPath + " to " + newPath);
+
+    File newFile = new File(currPath);
+
+    // Verify that spooling assumptions hold
+    if (newFile.lastModified() != currentFile.get().getLastModified()) {
+      String message = "File has been modified since being read: " + currPath;
+      throw new IllegalStateException(message);
+    }
+    if (newFile.length() != currentFile.get().getLength()) {
+      String message = "File has changed size since being read: " + currPath;
+      throw new IllegalStateException(message);
+    }
+
+    // Before renaming, check whether destination file name exists
+    File existing = new File(newPath);
+    if (existing.exists() &&
+        System.getProperty("os.name").toLowerCase().indexOf("win") >= 0) {
+      /*
+       * If we are here, it means the completed file already exists. In almost
+       * every case this means the user is violating an assumption of Flume
+       * (that log files are placed in the spooling directory with unique
+       * names). However, there is a corner case on Windows systems where the
+       * file was already rolled but the rename was not atomic. If that seems
+       * likely, we let it pass with only a warning.
+       */
+      if (Files.equal(currentFile.get().getFile(), existing)) {
+        logger.warn("Completed file " + newPath +
+            " already exists, but files match, so continuing.");
+        boolean deleted = newFile.delete();
+        if (!deleted) {
+          logger.error("Unable to delete file " + newFile.getAbsolutePath() +
+              ". It will likely be ingested another time.");
+        }
+      } else {
+        String message = "File name has been re-used with different" +
+            " files. Spooling assumption violated for " + newPath;
+        throw new IllegalStateException(message);
+      }
+    } else if (existing.exists()) { // Dest file exists and not on windows
+      String message = "File name has been re-used with different" +
+          " files. Spooling assumption violated for " + newPath;
+      throw new IllegalStateException(message);
+    } else  { // Dest file does not already exist
+      boolean renamed = newFile.renameTo(new File(newPath));
+      if (!renamed) {
+        /* If we are here then the file cannot be renamed for a reason other
+         * than that the destination file exists (actually, that remains
+         * possible w/ small probability due to TOC-TOU conditions).*/
+        String message = "Unable to move " + currPath + " to " + newPath +
+            ". This will likely cause duplicate events. Please verify that " +
+            "flume has sufficient permissions to perform these operations.";
+        throw new FlumeException(message);
+      }
+    }
+    currentFile.get().reader.close();
+  }
+
+  /**
+   * Find and open the oldest file in the chosen directory. If the directory is
+   * empty, this will return an absent option.
+   */
+  private Optional<FileInfo> getNextFile() {
+    /* Filter to exclude finished or hidden files */
+    FileFilter filter = new FileFilter() {
+      public boolean accept(File pathName) {
+        if ((pathName.getName().endsWith(completedSuffix)) ||
+            (pathName.getName().startsWith("."))) {
+          return false;
+        }
+        return true;
+      }
+    };
+    List<File> candidateFiles = Arrays.asList(directory.listFiles(filter));
+    if (candidateFiles.isEmpty()) {
+      return Optional.absent();
+    } else {
+      Collections.sort(candidateFiles, new Comparator<File>() {
+        public int compare(File a, File b) {
+          return new Long(a.lastModified()).compareTo(
+              new Long(b.lastModified()));
+        }
+      });
+      File nextFile = candidateFiles.get(0);
+      try {
+        int bufferSize = bufferMaxLines * bufferMaxLineLength;
+        BufferedReader reader = new BufferedReader(new FileReader(nextFile),
+            bufferSize);
+        reader.mark(bufferSize);
+        return Optional.of(new FileInfo(nextFile, reader));
+      } catch (FileNotFoundException e) {
+        // File could have been deleted in the interim
+        logger.warn("Could not find file: " + nextFile, e);
+        return Optional.absent();
+      } catch (IOException e) {
+        logger.error("Exception opening file: " + nextFile, e);
+        return Optional.absent();
+      }
+    }
+  }
+
+  /** An immutable class with information about a file being processed. */
+  private class FileInfo {
+    private File file;
+    private long length;
+    private long lastModified;
+    private BufferedReader reader;
+
+    public FileInfo(File file, BufferedReader reader) {
+      this.file = file;
+      this.length = file.length();
+      this.lastModified = file.lastModified();
+      this.reader = reader;
+    }
+
+    public long getLength() { return this.length; }
+    public long getLastModified() { return this.lastModified; }
+    public BufferedReader getReader() { return this.reader; }
+    public File getFile() { return this.file; }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // No-op
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/b4402d9b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
new file mode 100644
index 0000000..61824d8
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.flume.source;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Context;
+import org.apache.flume.CounterGroup;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.client.avro.SpoolingFileLineReader;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class SpoolDirectorySource extends AbstractSource implements
+Configurable, EventDrivenSource {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(SpoolDirectorySource.class);
+
+  // Delay used when polling for new files
+  private static int POLL_DELAY_MS = 500;
+
+  /* Config options */
+  private String completedSuffix;
+  private String spoolDirectory;
+  private boolean fileHeader;
+  private String fileHeaderKey;
+  private int batchSize;
+  private int bufferMaxLines;
+  private int bufferMaxLineLength;
+
+  private ScheduledExecutorService executor;
+  private CounterGroup counterGroup;
+  private Runnable runner;
+  SpoolingFileLineReader reader;
+
+  @Override
+  public void start() {
+    logger.info("SpoolDirectorySource source starting with directory:{}",
+        spoolDirectory);
+
+    executor = Executors.newSingleThreadScheduledExecutor();
+    counterGroup = new CounterGroup();
+
+    File directory = new File(spoolDirectory);
+    reader = new SpoolingFileLineReader(directory, completedSuffix,
+        bufferMaxLines, bufferMaxLineLength);
+    runner = new SpoolDirectoryRunnable(reader, counterGroup);
+
+    executor.scheduleWithFixedDelay(
+        runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);
+
+    super.start();
+    logger.debug("SpoolDirectorySource source started");
+  }
+
+  @Override
+  public void stop() {
+    super.stop();
+  }
+
+  @Override
+  public void configure(Context context) {
+    spoolDirectory = context.getString(
+        SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY);
+    Preconditions.checkState(spoolDirectory != null,
+        "Configuration must specify a spooling directory");
+
+    completedSuffix = context.getString(
+        SpoolDirectorySourceConfigurationConstants.SPOOLED_FILE_SUFFIX,
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_SPOOLED_FILE_SUFFIX);
+    fileHeader = context.getBoolean(
+        SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER,
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER);
+    fileHeaderKey = context.getString(
+        SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER_KEY,
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY);
+    batchSize = context.getInteger(
+        SpoolDirectorySourceConfigurationConstants.BATCH_SIZE,
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_BATCH_SIZE);
+    bufferMaxLines = context.getInteger(
+        SpoolDirectorySourceConfigurationConstants.BUFFER_MAX_LINES,
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_BUFFER_MAX_LINES);
+    bufferMaxLineLength = context.getInteger(
+        SpoolDirectorySourceConfigurationConstants.BUFFER_MAX_LINE_LENGTH,
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_BUFFER_MAX_LINE_LENGTH);
+  }
+
+  private Event createEvent(String lineEntry, String filename) {
+    Event out = EventBuilder.withBody(lineEntry.getBytes());
+    if (fileHeader) {
+      out.getHeaders().put(fileHeaderKey, filename);
+    }
+    return out;
+  }
+
+  private class SpoolDirectoryRunnable implements Runnable {
+    private SpoolingFileLineReader reader;
+    private CounterGroup counterGroup;
+
+    public SpoolDirectoryRunnable(SpoolingFileLineReader reader,
+        CounterGroup counterGroup) {
+      this.reader = reader;
+      this.counterGroup = counterGroup;
+    }
+    @Override
+    public void run() {
+      try {
+        while (true) {
+          List<String> strings = reader.readLines(batchSize);
+          if (strings.size() == 0) { break; }
+          String file = reader.getLastFileRead();
+          List<Event> events = Lists.newArrayList();
+          for (String s: strings) {
+            counterGroup.incrementAndGet("spooler.lines.read");
+            events.add(createEvent(s, file));
+          }
+          getChannelProcessor().processEventBatch(events);
+          reader.commit();
+        }
+      }
+      catch (Throwable t) {
+        logger.error("Uncaught exception in Runnable", t);
+        if (t instanceof Error) {
+          throw (Error) t;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/b4402d9b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
new file mode 100644
index 0000000..806a661
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.flume.source;
+
+public class SpoolDirectorySourceConfigurationConstants {
+  /** Directory where files are deposited. */
+  public static final String SPOOL_DIRECTORY = "spoolDir";
+
+  /** Suffix appended to files when they are finished being sent. */
+  public static final String SPOOLED_FILE_SUFFIX = "fileSuffix";
+  public static final String DEFAULT_SPOOLED_FILE_SUFFIX = ".COMPLETED";
+
+  /** Header in which to put filename. */
+  public static final String FILENAME_HEADER_KEY = "fileHeaderKey";
+  public static final String DEFAULT_FILENAME_HEADER_KEY = "file";
+
+  /** Whether to include filename in a header. */
+  public static final String FILENAME_HEADER = "fileHeader";
+  public static final boolean DEFAULT_FILE_HEADER = false;
+
+  /** What size to batch with before sending to ChannelProcessor. */
+  public static final String BATCH_SIZE = "batchSize";
+  public static final int DEFAULT_BATCH_SIZE = 10;
+
+  /** Maximum number of lines to buffer between commits. */
+  public static final String BUFFER_MAX_LINES = "bufferMaxLines";
+  public static final int DEFAULT_BUFFER_MAX_LINES = 100;
+
+  /** Maximum length of line (in characters) in buffer between commits. */
+  public static final String BUFFER_MAX_LINE_LENGTH = "bufferMaxLineLength";
+  public static final int DEFAULT_BUFFER_MAX_LINE_LENGTH = 5000;
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/b4402d9b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestBufferedLineReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestBufferedLineReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestBufferedLineReader.java
new file mode 100644
index 0000000..169abe5
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestBufferedLineReader.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.client.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public class TestBufferedLineReader {
+  private File tmpDir;
+
+  @Before
+  public void before() {
+    tmpDir = Files.createTempDir();
+  }
+
+  @After
+  public void after() {
+    for (File f : tmpDir.listFiles()) {
+      f.delete();
+    }
+    tmpDir.delete();
+  }
+
+  @Test
+  public void testSimpleRead() throws IOException {
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+                f1, Charsets.UTF_8);
+
+    BufferedLineReader reader = new BufferedLineReader(new FileReader(f1));
+
+    assertEquals("file1line1", reader.readLine());
+    assertEquals("file1line2", reader.readLine());
+    assertEquals("file1line3", reader.readLine());
+    assertEquals("file1line4", reader.readLine());
+    assertEquals("file1line5", reader.readLine());
+    assertEquals("file1line6", reader.readLine());
+    assertEquals("file1line7", reader.readLine());
+    assertEquals("file1line8", reader.readLine());
+    assertEquals(null, reader.readLine());
+  }
+
+  @Test
+  public void testBatchedReadsWithinAFile() throws IOException {
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+                f1, Charsets.UTF_8);
+
+    BufferedLineReader reader = new BufferedLineReader(new FileReader(f1));
+
+    List<String> out = reader.readLines(5);
+
+    // Make sure we got every line
+    assertEquals(5, out.size());
+    assertTrue(out.contains("file1line1"));
+    assertTrue(out.contains("file1line2"));
+    assertTrue(out.contains("file1line3"));
+    assertTrue(out.contains("file1line4"));
+    assertTrue(out.contains("file1line5"));
+  }
+
+  @Test
+  public void testBatchedReadsAtFileBoundary() throws IOException {
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+                f1, Charsets.UTF_8);
+
+    BufferedLineReader reader = new BufferedLineReader(new FileReader(f1));
+
+    List<String> out = reader.readLines(10);
+
+    // Make sure we got exactly 8 lines
+    assertEquals(8, out.size());
+    assertTrue(out.contains("file1line1"));
+    assertTrue(out.contains("file1line2"));
+    assertTrue(out.contains("file1line3"));
+    assertTrue(out.contains("file1line4"));
+    assertTrue(out.contains("file1line5"));
+    assertTrue(out.contains("file1line6"));
+    assertTrue(out.contains("file1line7"));
+    assertTrue(out.contains("file1line8"));
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/b4402d9b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
new file mode 100644
index 0000000..e1c306e
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.client.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.flume.FlumeException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class TestSpoolingFileLineReader {
+  private static String completedSuffix = ".COMPLETE";
+  private static int bufferMaxLineLength = 500;
+  private static int bufferMaxLines = 30;
+
+  private File tmpDir;
+
+  @Before
+  public void setUp() {
+    tmpDir = Files.createTempDir();
+  }
+
+  @After
+  public void tearDown() {
+    for (File f : tmpDir.listFiles()) {
+      f.delete();
+    }
+    tmpDir.delete();
+  }
+
+  @Test
+  /** Create three multi-line files then read them back out. Ensures that
+   * files are accessed in correct order and that lines are read correctly
+   * from files. */
+  public void testBasicSpooling() throws IOException {
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+    File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
+    File f3 = new File(tmpDir.getAbsolutePath() + "/file3");
+
+    Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8);
+    Files.write("file2line1\nfile2line2\n", f2, Charsets.UTF_8);
+    Files.write("file3line1\nfile3line2\n", f3, Charsets.UTF_8);
+
+    SpoolingFileLineReader reader = new SpoolingFileLineReader(
+        tmpDir, completedSuffix, bufferMaxLines, bufferMaxLineLength);
+
+    List<String> out = Lists.newArrayList();
+    for (int i = 0; i < 6; i++) {
+      out.add(reader.readLine());
+      reader.commit();
+    }
+    // Make sure we got every line
+    assertTrue(out.contains("file1line1"));
+    assertTrue(out.contains("file1line2"));
+    assertTrue(out.contains("file2line1"));
+    assertTrue(out.contains("file2line2"));
+    assertTrue(out.contains("file3line1"));
+    assertTrue(out.contains("file3line2"));
+
+    List<File> outFiles = Lists.newArrayList(tmpDir.listFiles());
+
+    assertEquals(3, outFiles.size());
+
+    // Make sure files 1 and 2 have been processed and file 3 is still open
+    assertTrue(outFiles.contains(new File(tmpDir + "/file1" + completedSuffix)));
+    assertTrue(outFiles.contains(new File(tmpDir + "/file2" + completedSuffix)));
+    assertTrue(outFiles.contains(new File(tmpDir + "/file3")));
+  }
+
+  @Test
+  /** Make sure this works when there are initially no files */
+  public void testInitiallyEmptyDirectory() throws IOException {
+
+    SpoolingFileLineReader reader = new SpoolingFileLineReader(
+        tmpDir, completedSuffix, bufferMaxLines, bufferMaxLineLength);
+
+    assertNull(reader.readLine());
+    assertEquals(0, reader.readLines(10).size());
+
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+    Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8);
+
+    List<String> out = reader.readLines(2);
+    reader.commit();
+
+    // Make sure we got all of the first file
+    assertTrue(out.contains("file1line1"));
+    assertTrue(out.contains("file1line2"));
+
+    File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
+    Files.write("file2line1\nfile2line2\n", f2, Charsets.UTF_8);
+
+    reader.readLine(); // force roll
+    reader.commit();
+
+    List<File> outFiles = Lists.newArrayList(tmpDir.listFiles());
+
+    assertEquals(2, outFiles.size());
+    assertTrue(
+        outFiles.contains(new File(tmpDir + "/file1" + completedSuffix)));
+    assertTrue(outFiles.contains(new File(tmpDir + "/file2")));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  /** Ensures that file immutability is enforced. */
+  public void testFileChangesDuringRead() throws IOException {
+    File tmpDir1 = Files.createTempDir();
+    File f1 = new File(tmpDir1.getAbsolutePath() + "/file1");
+
+    Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8);
+
+    SpoolingFileLineReader reader1 =
+        new SpoolingFileLineReader(tmpDir1, completedSuffix, bufferMaxLines, bufferMaxLineLength);
+
+    List<String> out = Lists.newArrayList();
+    out.addAll(reader1.readLines(2));
+    reader1.commit();
+
+    assertEquals(2, out.size());
+    assertTrue(out.contains("file1line1"));
+    assertTrue(out.contains("file1line2"));
+
+    Files.append("file1line3\n", f1, Charsets.UTF_8);
+
+    out.add(reader1.readLine());
+    reader1.commit();
+    out.add(reader1.readLine());
+    reader1.commit();
+  }
+
+
+  /** Test when a competing destination file is found, but it matches,
+   *  and we are on a Windows machine. */
+  @Test
+  public void testDestinationExistsAndSameFileWindows() throws IOException {
+    System.setProperty("os.name", "Some version of Windows");
+
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+    File f1Completed = new File(tmpDir.getAbsolutePath() + "/file1" +
+        completedSuffix);
+
+    Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8);
+    Files.write("file1line1\nfile1line2\n", f1Completed, Charsets.UTF_8);
+
+    SpoolingFileLineReader reader =
+        new SpoolingFileLineReader(tmpDir, completedSuffix, bufferMaxLines, bufferMaxLineLength);
+
+    List<String> out = Lists.newArrayList();
+
+    for (int i = 0; i < 2; i++) {
+      out.add(reader.readLine());
+      reader.commit();
+    }
+
+    File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
+    Files.write("file2line1\nfile2line2\n", f2, Charsets.UTF_8);
+
+    for (int i = 0; i < 2; i++) {
+      out.add(reader.readLine());
+      reader.commit();
+    }
+
+    // Make sure we got every line
+    assertEquals(4, out.size());
+    assertTrue(out.contains("file1line1"));
+    assertTrue(out.contains("file1line2"));
+    assertTrue(out.contains("file2line1"));
+    assertTrue(out.contains("file2line2"));
+
+    // Make sure original is deleted
+    List<File> outFiles = Lists.newArrayList(tmpDir.listFiles());
+    assertEquals(2, outFiles.size());
+    assertTrue(outFiles.contains(new File(tmpDir + "/file2")));
+    assertTrue(outFiles.contains(
+        new File(tmpDir + "/file1" + completedSuffix)));
+  }
+
+  /** Test when a competing destination file is found, but it matches,
+   *  and we are not on a Windows machine. */
+  @Test(expected = IllegalStateException.class)
+  public void testDestinationExistsAndSameFileNotOnWindows() throws IOException {
+    System.setProperty("os.name", "Some version of Linux");
+
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+    File f1Completed = new File(tmpDir.getAbsolutePath() + "/file1" +
+        completedSuffix);
+
+    Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8);
+    Files.write("file1line1\nfile1line2\n", f1Completed, Charsets.UTF_8);
+
+    SpoolingFileLineReader reader =
+        new SpoolingFileLineReader(tmpDir, completedSuffix, bufferMaxLines, bufferMaxLineLength);
+
+    List<String> out = Lists.newArrayList();
+
+    for (int i = 0; i < 2; i++) {
+      out.add(reader.readLine());
+      reader.commit();
+    }
+
+    File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
+    Files.write("file2line1\nfile2line2\n", f2, Charsets.UTF_8);
+
+    for (int i = 0; i < 2; i++) {
+      out.add(reader.readLine());
+      reader.commit();
+    }
+
+    // Not reached
+  }
+
+  @Test
+  /** Test a basic case where a commit is missed. */
+  public void testBasicCommitFailure() throws IOException {
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n" +
+                "file1line9\nfile1line10\nfile1line11\nfile1line12\n",
+                f1, Charsets.UTF_8);
+
+    SpoolingFileLineReader reader = new SpoolingFileLineReader(
+        tmpDir, completedSuffix, bufferMaxLines, bufferMaxLineLength);
+
+    List<String> out1 = reader.readLines(4);
+    assertTrue(out1.contains("file1line1"));
+    assertTrue(out1.contains("file1line2"));
+    assertTrue(out1.contains("file1line3"));
+    assertTrue(out1.contains("file1line4"));
+
+    List<String> out2 = reader.readLines(4);
+    assertTrue(out2.contains("file1line1"));
+    assertTrue(out2.contains("file1line2"));
+    assertTrue(out2.contains("file1line3"));
+    assertTrue(out2.contains("file1line4"));
+
+    reader.commit();
+
+    List<String> out3 = reader.readLines(4);
+    assertTrue(out3.contains("file1line5"));
+    assertTrue(out3.contains("file1line6"));
+    assertTrue(out3.contains("file1line7"));
+    assertTrue(out3.contains("file1line8"));
+
+    reader.commit();
+    List<String> out4 = reader.readLines(10);
+    assertEquals(4, out4.size());
+    assertTrue(out4.contains("file1line9"));
+    assertTrue(out4.contains("file1line10"));
+    assertTrue(out4.contains("file1line11"));
+    assertTrue(out4.contains("file1line12"));
+  }
+
+  @Test
+  /** Test a case where a commit is missed and the buffer size shrinks. */
+  public void testBasicCommitFailureAndBufferSizeChanges() throws IOException {
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n" +
+                "file1line9\nfile1line10\nfile1line11\nfile1line12\n",
+                f1, Charsets.UTF_8);
+
+    SpoolingFileLineReader reader = new SpoolingFileLineReader(
+        tmpDir, completedSuffix, bufferMaxLines, bufferMaxLineLength);
+
+    List<String> out1 = reader.readLines(5);
+    assertTrue(out1.contains("file1line1"));
+    assertTrue(out1.contains("file1line2"));
+    assertTrue(out1.contains("file1line3"));
+    assertTrue(out1.contains("file1line4"));
+    assertTrue(out1.contains("file1line5"));
+
+    List<String> out2 = reader.readLines(2);
+    assertTrue(out2.contains("file1line1"));
+    assertTrue(out2.contains("file1line2"));
+
+    reader.commit();
+    List<String> out3 = reader.readLines(2);
+    assertTrue(out3.contains("file1line3"));
+    assertTrue(out3.contains("file1line4"));
+
+    reader.commit();
+    List<String> out4 = reader.readLines(2);
+    assertTrue(out4.contains("file1line5"));
+    assertTrue(out4.contains("file1line6"));
+
+    reader.commit();
+    List<String> out5 = reader.readLines(2);
+    assertTrue(out5.contains("file1line7"));
+    assertTrue(out5.contains("file1line8"));
+
+    reader.commit();
+
+    List<String> out6 = reader.readLines(15);
+    assertTrue(out6.contains("file1line9"));
+    assertTrue(out6.contains("file1line10"));
+    assertTrue(out6.contains("file1line11"));
+    assertTrue(out6.contains("file1line12"));
+  }
+
+  /** Test when a competing destination file is found and it does not match. */
+  @Test(expected = IllegalStateException.class)
+  public void testDestinationExistsAndDifferentFile() throws IOException {
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+    File f1Completed =
+        new File(tmpDir.getAbsolutePath() + "/file1" + completedSuffix);
+
+    Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8);
+    Files.write("file1line1\nfile1XXXe2\n", f1Completed, Charsets.UTF_8);
+
+    SpoolingFileLineReader reader =
+        new SpoolingFileLineReader(tmpDir, completedSuffix, bufferMaxLines, bufferMaxLineLength);
+
+    List<String> out = Lists.newArrayList();
+
+    for (int i = 0; i < 2; i++) {
+      out.add(reader.readLine());
+      reader.commit();
+    }
+
+    File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
+    Files.write("file2line1\nfile2line2\n", f2, Charsets.UTF_8);
+
+    for (int i = 0; i < 2; i++) {
+      out.add(reader.readLine());
+      reader.commit();
+    }
+    // Not reached
+  }
+
+
+  @Test
+  /** Empty files should be skipped over when reading directory. */
+  public void testBehaviorWithEmptyFile() throws IOException {
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+    Files.touch(f1);
+
+    SpoolingFileLineReader reader =
+        new SpoolingFileLineReader(tmpDir, completedSuffix, bufferMaxLines, bufferMaxLineLength);
+
+    File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+                f2, Charsets.UTF_8);
+
+    List<String> out = reader.readLines(8); // Expect to skip over first file
+    reader.commit();
+    assertEquals(8, out.size());
+
+    assertTrue(out.contains("file1line1"));
+    assertTrue(out.contains("file1line2"));
+    assertTrue(out.contains("file1line3"));
+    assertTrue(out.contains("file1line4"));
+    assertTrue(out.contains("file1line5"));
+    assertTrue(out.contains("file1line6"));
+    assertTrue(out.contains("file1line7"));
+    assertTrue(out.contains("file1line8"));
+
+    assertNull(reader.readLine());
+
+    // Make sure original is deleted
+    List<File> outFiles = Lists.newArrayList(tmpDir.listFiles());
+    assertEquals(2, outFiles.size());
+    assertTrue(outFiles.contains(
+        new File(tmpDir + "/file1")));
+    assertTrue(outFiles.contains(
+        new File(tmpDir + "/file2" + completedSuffix)));
+  }
+
+  @Test
+  public void testBatchedReadsWithinAFile() throws IOException {
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+                f1, Charsets.UTF_8);
+
+    SpoolingFileLineReader reader = new SpoolingFileLineReader(tmpDir,
+        completedSuffix, bufferMaxLines, bufferMaxLineLength);
+
+    List<String> out = reader.readLines(5);
+    reader.commit();
+
+    // Make sure we got every line
+    assertEquals(5, out.size());
+    assertTrue(out.contains("file1line1"));
+    assertTrue(out.contains("file1line2"));
+    assertTrue(out.contains("file1line3"));
+    assertTrue(out.contains("file1line4"));
+    assertTrue(out.contains("file1line5"));
+  }
+
+  @Test
+  public void testBatchedReadsAcrossFileBoundary() throws IOException {
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+                f1, Charsets.UTF_8);
+
+    SpoolingFileLineReader reader = new SpoolingFileLineReader(tmpDir,
+        completedSuffix, bufferMaxLines, bufferMaxLineLength);
+    List<String> out1 = reader.readLines(5);
+    reader.commit();
+
+    File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
+    Files.write("file2line1\nfile2line2\nfile2line3\nfile2line4\n" +
+        "file2line5\nfile2line6\nfile2line7\nfile2line8\n",
+        f2, Charsets.UTF_8);
+
+    List<String> out2 = reader.readLines(5);
+    reader.commit();
+    List<String> out3 = reader.readLines(5);
+    reader.commit();
+
+    // Should have first 5 lines of file1
+    assertEquals(5, out1.size());
+    assertTrue(out1.contains("file1line1"));
+    assertTrue(out1.contains("file1line2"));
+    assertTrue(out1.contains("file1line3"));
+    assertTrue(out1.contains("file1line4"));
+    assertTrue(out1.contains("file1line5"));
+
+    // Should have 3 remaining lines of file1
+    assertEquals(3, out2.size());
+    assertTrue(out2.contains("file1line6"));
+    assertTrue(out2.contains("file1line7"));
+    assertTrue(out2.contains("file1line8"));
+
+    // Should have first 5 lines of file2
+    assertEquals(5, out3.size());
+    assertTrue(out3.contains("file2line1"));
+    assertTrue(out3.contains("file2line2"));
+    assertTrue(out3.contains("file2line3"));
+    assertTrue(out3.contains("file2line4"));
+    assertTrue(out3.contains("file2line5"));
+
+    // file1 should be moved now
+    List<File> outFiles = Lists.newArrayList(tmpDir.listFiles());
+    assertEquals(2, outFiles.size());
+    assertTrue(outFiles.contains(
+        new File(tmpDir + "/file1" + completedSuffix)));
+    assertTrue(outFiles.contains(new File(tmpDir + "/file2")));
+  }
+
+  @Test
+  /** Test the case where we read finish reading and fully commit a file, then
+   *  the directory is empty. */
+  public void testEmptyDirectoryAfterCommittingFile() throws IOException {
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+
+    Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8);
+
+    SpoolingFileLineReader reader = new SpoolingFileLineReader(
+        tmpDir, completedSuffix, bufferMaxLines, bufferMaxLineLength);
+
+    List<String> allLines = reader.readLines(2);
+    assertEquals(2, allLines.size());
+    reader.commit();
+
+    List<String> empty = reader.readLines(10);
+    assertEquals(0, empty.size());
+  }
+
+  @Test(expected = FlumeException.class)
+  /** When a line violates the character limit, we should throw an exception,
+   * even if the buffer limit is not actually exceeded. */
+  public void testLineExceedsMaxLineLengthButNotBufferSize() throws IOException {
+    final int maxLineLength = 15;
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n" +
+                "reallyreallyreallyreallyreallyLongfile1line9\n" +
+                "file1line10\nfile1line11\nfile1line12\nfile1line13\n",
+                f1, Charsets.UTF_8);
+
+    SpoolingFileLineReader reader = new SpoolingFileLineReader(
+        tmpDir, completedSuffix, bufferMaxLines, maxLineLength);
+
+    List<String> out1 = reader.readLines(5);
+    assertTrue(out1.contains("file1line1"));
+    assertTrue(out1.contains("file1line2"));
+    assertTrue(out1.contains("file1line3"));
+    assertTrue(out1.contains("file1line4"));
+    assertTrue(out1.contains("file1line5"));
+    reader.commit();
+
+    reader.readLines(5);
+  }
+
+  @Test(expected = FlumeException.class)
+  /** When a line is larger than the entire buffer, we should definitely throw
+   * an exception. */
+  public void testLineExceedsBufferSize() throws IOException {
+    final int maxLineLength = 15;
+    final int bufferMaxLines = 10;
+
+    // String slightly longer than buffer
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < (bufferMaxLines + 1) * maxLineLength; i++) {
+      sb.append('x');
+    }
+
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+    Files.write(sb.toString() + "\n",
+                f1, Charsets.UTF_8);
+
+    SpoolingFileLineReader reader = new SpoolingFileLineReader(
+        tmpDir, completedSuffix, bufferMaxLines, maxLineLength);
+
+    reader.readLines(5);
+  }
+
+  @Test
+  /** When a line is larger than the entire buffer, we should definitely throw
+   * an exception. */
+  public void testCallsFailWhenReaderDisabled() throws IOException {
+    final int maxLineLength = 15;
+    final int bufferMaxLines = 10;
+
+    // String slightly longer than buffer
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < (bufferMaxLines + 1) * maxLineLength; i++) {
+      sb.append('x');
+    }
+
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+    Files.write(sb.toString() + "\n",
+                f1, Charsets.UTF_8);
+
+    SpoolingFileLineReader reader = new SpoolingFileLineReader(
+        tmpDir, completedSuffix, bufferMaxLines, maxLineLength);
+
+    boolean exceptionOnRead = false;
+    boolean illegalStateOnReadLine = false;
+    boolean illegalStateOnReadLines = false;
+    boolean illegalStateOnCommit = false;
+    try {
+      reader.readLines(5);
+    } catch (FlumeException e) {
+      exceptionOnRead = true;
+    }
+    try {
+      reader.readLine();
+    } catch (IllegalStateException e) {
+      illegalStateOnReadLine = true;
+    }
+    try {
+      reader.readLines(5);
+    } catch (IllegalStateException e) {
+      illegalStateOnReadLines = true;
+    }
+    try {
+      reader.commit();
+    } catch (IllegalStateException e) {
+      illegalStateOnCommit = true;
+    }
+
+    Assert.assertTrue("Got FlumeException when reading long line.",
+        exceptionOnRead);
+    Assert.assertTrue("Got IllegalStateException when reading line.",
+        illegalStateOnReadLine);
+    Assert.assertTrue("Got IllegalStateException when reading lines.",
+        illegalStateOnReadLines);
+    Assert.assertTrue("Got IllegalStateException when commiting",
+        illegalStateOnCommit);
+
+  }
+
+
+
+  @Test
+  public void testNameCorrespondsToLatestRead() throws IOException {
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+                f1, Charsets.UTF_8);
+
+    SpoolingFileLineReader reader = new SpoolingFileLineReader(tmpDir,
+        completedSuffix, bufferMaxLines, bufferMaxLineLength);
+    reader.readLines(5);
+    reader.commit();
+
+    assertNotNull(reader.getLastFileRead());
+    assertTrue(reader.getLastFileRead().endsWith("file1"));
+
+    File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
+    Files.write("file2line1\nfile2line2\nfile2line3\nfile2line4\n" +
+        "file2line5\nfile2line6\nfile2line7\nfile2line8\n",
+        f2, Charsets.UTF_8);
+
+    reader.readLines(5);
+    reader.commit();
+    assertNotNull(reader.getLastFileRead());
+    assertTrue(reader.getLastFileRead().endsWith("file1"));
+
+    reader.readLines(5);
+    reader.commit();
+    assertNotNull(reader.getLastFileRead());
+    assertTrue(reader.getLastFileRead().endsWith("file2"));
+
+    reader.readLines(5);
+    assertTrue(reader.getLastFileRead().endsWith("file2"));
+
+    reader.readLines(5);
+    assertTrue(reader.getLastFileRead().endsWith("file2"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/b4402d9b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
new file mode 100644
index 0000000..6e87b21
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.flume.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public class TestSpoolDirectorySource {
+  static SpoolDirectorySource source;
+  static MemoryChannel channel;
+  private File tmpDir;
+
+  @Before
+  public void setUp() {
+    source = new SpoolDirectorySource();
+    channel = new MemoryChannel();
+
+    Configurables.configure(channel, new Context());
+
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(channel);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(channels);
+
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+    tmpDir = Files.createTempDir();
+  }
+
+  @After
+  public void tearDown() {
+    for (File f : tmpDir.listFiles()) {
+      f.delete();
+    }
+    tmpDir.delete();
+  }
+
+  @Test
+  public void testPutFilenameHeader() throws IOException, InterruptedException {
+    Context context = new Context();
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+                f1, Charsets.UTF_8);
+
+    context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
+        tmpDir.getAbsolutePath());
+    context.put(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER,
+        "true");
+    context.put(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER_KEY,
+        "fileHeaderKeyTest");
+
+    Configurables.configure(source, context);
+    source.start();
+    Thread.sleep(500);
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    Event e = channel.take();
+    Assert.assertNotNull(e.getHeaders());
+    Assert.assertNotNull(e.getHeaders().get("fileHeaderKeyTest"));
+    Assert.assertEquals(e.getHeaders().get("fileHeaderKeyTest"),
+        f1.getAbsolutePath());
+    txn.commit();
+    txn.close();
+  }
+
+  @Test
+  public void testLifecycle() throws IOException, InterruptedException {
+    Context context = new Context();
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+                f1, Charsets.UTF_8);
+
+    context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
+        tmpDir.getAbsolutePath());
+
+    Configurables.configure(source, context);
+
+    for (int i = 0; i < 10; i++) {
+      source.start();
+
+      Assert
+          .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+              source, LifecycleState.START_OR_ERROR));
+      Assert.assertEquals("Server is started", LifecycleState.START,
+          source.getLifecycleState());
+
+      source.stop();
+      Assert.assertTrue("Reached stop or error",
+          LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
+      Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+          source.getLifecycleState());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/b4402d9b/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 3af400e..e37d0d1 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -682,7 +682,9 @@ interceptors.*
              never guarantee data has been received when using a unidirectional
              asynchronous interface such as ExecSource! As an extension of this
              warning - and to be completely clear - there is absolutely zero guarantee
-             of event delivery when using this source. You have been warned.
+             of event delivery when using this source. For stronger reliability
+             guarantees, consider the Spooling Directory Source or direct integration
+             with Flume via the SDK.
 
 .. note:: You can use ExecSource to emulate TailSource from Flume 0.9x (flume og).
           Just use unix command ``tail -F /full/path/to/your/file``. Parameter
@@ -698,6 +700,57 @@ Example for agent named **agent_foo**:
   agent_foo.sources.tailsource-1.command = tail -F /var/log/secure
   agent_foo.sources.tailsource-1.channels = memoryChannel-1
 
+Spooling Directory Source
+~~~~~~~~~~~~~~~~~~~~~~~~~
+This source lets you ingest data by dropping files in a spooling directory on
+disk. **Unlike other asynchronous sources, this source
+avoids data loss even if Flume is restarted or fails.**
+Flume will watch the directory for new files and read then ingest them
+as they appear. After a given file has been fully read into the channel,
+it is renamed to indicate completion. This allows a cleaner process to remove
+completed files periodically. Note, however,
+that events may be duplicated if failures occur, consistent with the semantics
+offered by other Flume components. The channel optionally inserts the full path of
+the origin file into a header field of each event. This source buffers file data
+in memory during reads; be sure to set the `bufferMaxLineLength` option to a number
+greater than the longest line you expect to see in your input data.
+
+.. warning:: This channel expects that only immutable, uniquely named files
+             are dropped in the spooling directory. If duplicate names are
+             used, or files are modified while being read, the source will
+             fail with an error message. For some use cases this may require
+             adding unique identifiers (such as a timestamp) to log file names
+             when they are copied into the spooling directory.
+
+=================    ============   ==========================================================
+Property Name        Default        Description
+=================    ============   ==========================================================
+**channels**         --
+**type**             --             The component type name, needs to be ``spooldir``
+**spoolDir**         --             The directory where log files will be spooled
+fileSuffix           .COMPLETED     Suffix to append to completely ingested files
+fileHeader           false	   Whether to add a header storing the filename
+fileHeaderKey        file           Header key to use when appending filename to header
+batchSize            10             Granularity at which to batch transfer to the channel
+bufferMaxLines       100            Maximum number of lines the commit buffer can hold
+bufferMaxLineLength  5000           Maximum length of a line in the commit buffer
+selector.type        replicating    replicating or multiplexing
+selector.*                          Depends on the selector.type value
+interceptors         --             Space separated list of interceptors
+interceptors.*
+=================    ============   ==========================================================
+
+Example for agent named **agent_foo**:
+
+.. code-block:: properties
+
+  agent_foo.sources = spooldir-1
+  agent_foo.channels = memoryChannel-1
+  agent_foo.sources.spooldir-1.type = spooldir
+  agent_foo.sources.spooldir-1.spoolDir = /var/log/apache/flumeSpool
+  agent_foo.sources.spooldir-1.fileHeader = true
+  agent_foo.sources.spooldir-1.channels = memoryChannel-1
+
 NetCat Source
 ~~~~~~~~~~~~~
 


Mime
View raw message