flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject flume git commit: FLUME-2318: Make SpoolingDirectorySource able to handle empty files
Date Wed, 17 Aug 2016 17:26:15 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk c1fae53bf -> a9a775a9c


FLUME-2318: Make SpoolingDirectorySource able to handle empty files

(Bessenyei Balázs Donát via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a9a775a9
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a9a775a9
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a9a775a9

Branch: refs/heads/trunk
Commit: a9a775a9c8324e0724ed4720ae4f383896ea8c96
Parents: c1fae53
Author: Bessenyei Balázs Donát <bessbd@cloudera.com>
Authored: Wed Aug 17 09:52:21 2016 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Wed Aug 17 10:14:14 2016 -0700

----------------------------------------------------------------------
 .../avro/ReliableSpoolingFileEventReader.java   | 73 +++++++++++------
 .../flume/source/TestSpoolDirectorySource.java  | 83 +++++++++++++++++---
 2 files changed, 120 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/a9a775a9/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index 01381a5..a0f929c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -28,6 +28,7 @@ import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.event.EventBuilder;
 import org.apache.flume.serialization.DecodeErrorPolicy;
 import org.apache.flume.serialization.DurablePositionTracker;
 import org.apache.flume.serialization.EventDeserializer;
@@ -58,20 +59,18 @@ import java.util.Locale;
 import java.util.regex.Pattern;
 
 /**
- * <p/>A {@link ReliableEventReader} which reads log data from files stored
+ * <p>A {@link ReliableEventReader} which reads log data from files stored
  * in a spooling directory and renames each file once all of its data has been
  * read (through {@link EventDeserializer#readEvent()} calls). The user must
  * {@link #commit()} each read, to indicate that the lines have been fully
  * processed.
- * <p/>Read calls will return no data if there are no files left to read. This
+ * <p>Read calls will return no data if there are no files left to read. This
  * class, in general, is not thread safe.
- *
- * <p/>This reader assumes that files with unique file names are left in the
+ * <p>This reader assumes that files with unique file names are left in the
  * spooling directory and not modified once they are placed there. Any user
  * behavior which violates these assumptions, when detected, will result in a
  * FlumeException being thrown.
- *
- * <p/>This class makes the following guarantees, if above assumptions are met:
+ * <p>This class makes the following guarantees, if above assumptions are met:
  * <ul>
  * <li> Once a log file has been renamed with the {@link #completedSuffix},
  *      all of its records have been read through the
@@ -106,11 +105,12 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
   private final boolean recursiveDirectorySearch;
 
   private Optional<FileInfo> currentFile = Optional.absent();
-  /** Always contains the last file from which lines have been read. **/
+  /** Always contains the last file from which lines have been read. */
   private Optional<FileInfo> lastFileRead = Optional.absent();
   private boolean committed = true;
+  private boolean firstTimeRead = true;
 
-  /** Instance var to Cache directory listing **/
+  /** Instance var to Cache directory listing */
   private Iterator<File> candidateFileIter = null;
   private int listFilesCount = 0;
 
@@ -220,6 +220,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
 
   /**
    * Recursively gather candidate files
+   *
    * @param directory the directory to gather files from
    * @return list of files within the passed in directory
    */
@@ -269,9 +270,11 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
     return listFilesCount;
   }
 
-  /** Return the filename which generated the data from the last successful
+  /**
+   * Return the filename which generated the data from the last successful
    * {@link #readEvents(int)} call. Returns null if called before any file
-   * contents are read. */
+   * contents are read.
+   */
   public String getLastFileRead() {
     if (!lastFileRead.isPresent()) {
       return null;
@@ -308,8 +311,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
       }
     }
 
-    EventDeserializer des = currentFile.get().getDeserializer();
-    List<Event> events = des.readEvents(numEvents);
+    List<Event> events = readDeserializerEvents(numEvents);
 
     /* It's possible that the last read took us just up to a file boundary.
      * If so, try to roll to the next file, if there is one.
@@ -322,9 +324,27 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
       if (!currentFile.isPresent()) {
         return Collections.emptyList();
       }
-      events = currentFile.get().getDeserializer().readEvents(numEvents);
+      events = readDeserializerEvents(numEvents);
+    }
+
+    fillHeader(events);
+
+    committed = false;
+    lastFileRead = currentFile;
+    return events;
+  }
+
+  private List<Event> readDeserializerEvents(int numEvents) throws IOException {
+    EventDeserializer des = currentFile.get().getDeserializer();
+    List<Event> events = des.readEvents(numEvents);
+    if (events.isEmpty() && firstTimeRead) {
+      events.add(EventBuilder.withBody(new byte[0]));
     }
+    firstTimeRead = false;
+    return events;
+  }
 
+  private void fillHeader(List<Event> events) {
     if (annotateFileName) {
       String filename = currentFile.get().getFile().getAbsolutePath();
       for (Event event : events) {
@@ -338,10 +358,6 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
         event.getHeaders().put(baseNameHeader, basename);
       }
     }
-
-    committed = false;
-    lastFileRead = currentFile;
-    return events;
   }
 
   @Override
@@ -352,7 +368,9 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
     }
   }
 
-  /** Commit the last lines which were read. */
+  /**
+   * Commit the last lines which were read.
+   */
   @Override
   public void commit() throws IOException {
     if (!committed && currentFile.isPresent()) {
@@ -363,11 +381,12 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
 
   /**
    * Closes currentFile and attempt to rename it.
-   *
+   * <p>
    * If these operations fail in a way that may cause duplicate log entries,
    * an error is logged but no exceptions are thrown. If these operations fail
    * in a way that indicates potential misuse of the spooling directory, a
    * FlumeException will be thrown.
+   *
    * @throws FlumeException if files do not conform to spooling assumptions
    */
   private void retireCurrentFile() throws IOException {
@@ -400,6 +419,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
 
   /**
    * Rename the given spooled file
+   *
    * @param fileToRoll
    * @throws IOException
    */
@@ -432,13 +452,13 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
         throw new IllegalStateException(message);
       }
 
-    // Dest file exists and not on windows
+      // Dest file exists and not on windows
     } else if (dest.exists()) {
       String message = "File name has been re-used with different" +
           " files. Spooling assumptions violated for " + dest;
       throw new IllegalStateException(message);
 
-    // Destination file does not already exist. We are good to go!
+      // Destination file does not already exist. We are good to go!
     } else {
       boolean renamed = fileToRoll.renameTo(dest);
       if (renamed) {
@@ -460,6 +480,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
 
   /**
    * Delete the given spooled file
+   *
    * @param fileToDelete
    * @throws IOException
    */
@@ -508,7 +529,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
     if (consumeOrder == ConsumeOrder.RANDOM) { // Selected file is random.
       return openFile(selectedFile);
     } else if (consumeOrder == ConsumeOrder.YOUNGEST) {
-      for (File candidateFile: candidateFiles) {
+      for (File candidateFile : candidateFiles) {
         long compare = selectedFile.lastModified() -
             candidateFile.lastModified();
         if (compare == 0) { // ts is same pick smallest lexicographically.
@@ -518,7 +539,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
         }
       }
     } else { // default order is OLDEST
-      for (File candidateFile: candidateFiles) {
+      for (File candidateFile : candidateFiles) {
         long compare = selectedFile.lastModified() -
             candidateFile.lastModified();
         if (compare == 0) { // ts is same pick smallest lexicographically.
@@ -529,6 +550,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
       }
     }
 
+    firstTimeRead = true;
+
     return openFile(selectedFile);
   }
 
@@ -538,13 +561,15 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
     }
     return f2;
   }
+
   /**
    * Opens a file for consuming
+   *
    * @param file
    * @return {@link FileInfo} for the file to consume or absent option if the
    * file does not exists or readable.
    */
-  private Optional<FileInfo> openFile(File file) {    
+  private Optional<FileInfo> openFile(File file) {
     try {
       // roll the meta file, if needed
       String nextPath = file.getPath();

http://git-wip-us.apache.org/repos/asf/flume/blob/a9a775a9/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
index 0182d21..92a698d 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
@@ -120,8 +120,8 @@ public class TestSpoolDirectorySource {
     File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
 
     Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
-            "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
-        f1, Charsets.UTF_8);
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+                f1, Charsets.UTF_8);
 
     context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
         tmpDir.getAbsolutePath());
@@ -156,8 +156,8 @@ public class TestSpoolDirectorySource {
     File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
 
     Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
-            "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
-        f1, Charsets.UTF_8);
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+                f1, Charsets.UTF_8);
 
     context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
         tmpDir.getAbsolutePath());
@@ -256,7 +256,6 @@ public class TestSpoolDirectorySource {
     boolean directoriesCreated = subDir.mkdirs();
     Assert.assertTrue("source directories must be created", directoriesCreated);
 
-
     File f1 = new File(subDir.getAbsolutePath() + "/file1.txt");
 
     Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
@@ -364,9 +363,8 @@ public class TestSpoolDirectorySource {
     File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
 
     Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
-            "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
-        f1, Charsets.UTF_8);
-
+                "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+                 f1, Charsets.UTF_8);
 
     context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
         tmpDir.getAbsolutePath());
@@ -384,8 +382,7 @@ public class TestSpoolDirectorySource {
     }
 
     Assert.assertTrue("Expected to hit ChannelFullException, but did not!",
-        source.didHitChannelFullException());
-
+                      source.didHitChannelFullException());
 
     List<String> dataOut = Lists.newArrayList();
 
@@ -434,8 +431,70 @@ public class TestSpoolDirectorySource {
     Thread.sleep(5000);
 
     Assert.assertFalse("Server did not error", source.hasFatalError());
-    Assert.assertEquals("One message was read",
-        1, source.getSourceCounter().getEventAcceptedCount());
+    Assert.assertEquals("Four messages were read",
+        4, source.getSourceCounter().getEventAcceptedCount());
+    source.stop();
+  }
+
+  @Test
+  public void testWithAllEmptyFiles()
+      throws InterruptedException, IOException {
+    Context context = new Context();
+    File[] f = new File[10];
+    for (int i = 0; i < 10; i++) {
+      f[i] = new File(tmpDir.getAbsolutePath() + "/file" + i);
+      Files.write(new byte[0], f[i]);
+    }
+    context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
+        tmpDir.getAbsolutePath());
+    context.put(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER,
+        "true");
+    context.put(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER_KEY,
+        "fileHeaderKeyTest");
+    Configurables.configure(source, context);
+    source.start();
+    Thread.sleep(10);
+    for (int i = 0; i < 10; i++) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+      Event e = channel.take();
+      Assert.assertNotNull("Event must not be null", e);
+      Assert.assertNotNull("Event headers must not be null", e.getHeaders());
+      Assert.assertNotNull(e.getHeaders().get("fileHeaderKeyTest"));
+      Assert.assertEquals(f[i].getAbsolutePath(),
+          e.getHeaders().get("fileHeaderKeyTest"));
+      Assert.assertArrayEquals(new byte[0], e.getBody());
+      txn.commit();
+      txn.close();
+    }
+    source.stop();
+  }
+
+  @Test
+  public void testWithEmptyAndDataFiles()
+      throws InterruptedException, IOException {
+    Context context = new Context();
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+    Files.write("some data".getBytes(), f1);
+    File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
+    Files.write(new byte[0], f2);
+    context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
+        tmpDir.getAbsolutePath());
+    Configurables.configure(source, context);
+    source.start();
+    Thread.sleep(10);
+    for (int i = 0; i < 2; i++) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+      Event e = channel.take();
+      txn.commit();
+      txn.close();
+    }
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    Assert.assertNull(channel.take());
+    txn.commit();
+    txn.close();
     source.stop();
   }
 }


Mime
View raw message