flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bes...@apache.org
Subject flume git commit: FLUME-3083. Check byte position of file in update condition of Taildir Source
Date Thu, 27 Apr 2017 19:06:37 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk c718dae09 -> dfa062757


FLUME-3083. Check byte position of file in update condition of Taildir Source

This patch addresses an edge case of the Taildir Source wherein it can miss
reading events written in the same second as the file closing.

This closes #128

Reviewers: Satoshi Iijima, Bessenyei Balázs Donát

(eskrm via Bessenyei Balázs Donát)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/dfa06275
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/dfa06275
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/dfa06275

Branch: refs/heads/trunk
Commit: dfa0627573b9a75a25dc7149a7d63c9bac953ff4
Parents: c718dae
Author: eskrm <eskrm@yahoo.com>
Authored: Sun Apr 9 13:02:54 2017 -0700
Committer: Bessenyei Balázs Donát <bessbd@apache.org>
Committed: Thu Apr 27 19:03:46 2017 +0000

----------------------------------------------------------------------
 .../taildir/ReliableTaildirEventReader.java     |  2 +-
 .../source/taildir/TestTaildirEventReader.java  | 25 ++++++++++++++++++++
 2 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/dfa06275/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
index 8838320..633d3c1 100644
--- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
@@ -248,7 +248,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader
{
           long startPos = skipToEnd ? f.length() : 0;
           tf = openFile(f, headers, inode, startPos);
         } else {
-          boolean updated = tf.getLastUpdated() < f.lastModified();
+          boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length();
           if (updated) {
             if (tf.getRaf() == null) {
               tf = openFile(f, headers, inode, tf.getPos());

http://git-wip-us.apache.org/repos/asf/flume/blob/dfa06275/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java
index bcfe4bb..e75543c 100644
--- a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java
+++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java
@@ -493,4 +493,29 @@ public class TestTaildirEventReader {
     assertTrue(out.contains("file1line3"));
     assertTrue(out.contains("file1line4"));
   }
+
+  @Test
+  // Ensure tail file is set to be read when its last updated time
+  // equals the underlying file's modification time and there are
+  // pending bytes to be read.
+  public void testUpdateWhenLastUpdatedSameAsModificationTime() throws IOException {
+    File file = new File(tmpDir, "file");
+    Files.write("line1\n", file, Charsets.UTF_8);
+
+    ReliableTaildirEventReader reader = getReader();
+    for (TailFile tf : reader.getTailFiles().values()) {
+      reader.readEvents(tf, 1);
+      reader.commit();
+    }
+
+    Files.append("line2\n", file, Charsets.UTF_8);
+    for (TailFile tf : reader.getTailFiles().values()) {
+      tf.setLastUpdated(file.lastModified());
+    }
+
+    reader.updateTailFiles();
+    for (TailFile tf : reader.getTailFiles().values()) {
+      assertEquals(true, tf.needTail());
+    }
+  }
 }


Mime
View raw message