flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-2309. Spooling directory should not always consume the oldest file first.
Date Thu, 27 Mar 2014 19:31:39 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk 09f0a5136 -> 61b9bcbb6


FLUME-2309. Spooling directory should not always consume the oldest file first.

(Muhammad Ehsan ul Haque via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/61b9bcbb
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/61b9bcbb
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/61b9bcbb

Branch: refs/heads/trunk
Commit: 61b9bcbb69ae3d19f72276b3aaa78ff3679cecfc
Parents: 09f0a51
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Thu Mar 27 12:30:29 2014 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Thu Mar 27 12:30:29 2014 -0700

----------------------------------------------------------------------
 .../avro/ReliableSpoolingFileEventReader.java   | 176 +++++++++--------
 .../flume/source/SpoolDirectorySource.java      |  23 ++-
 ...olDirectorySourceConfigurationConstants.java |   7 +
 .../TestReliableSpoolingFileEventReader.java    | 187 ++++++++++++++++++-
 .../flume/source/TestSpoolDirectorySource.java  |  26 +++
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   8 +
 6 files changed, 335 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/61b9bcbb/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index a88ed6e..1818250 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -19,38 +19,31 @@
 
 package org.apache.flume.client.avro;
 
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.regex.Pattern;
-
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.IOFileFilter;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
-import org.apache.flume.serialization.DecodeErrorPolicy;
-import org.apache.flume.serialization.DurablePositionTracker;
-import org.apache.flume.serialization.EventDeserializer;
-import org.apache.flume.serialization.EventDeserializerFactory;
-import org.apache.flume.serialization.PositionTracker;
-import org.apache.flume.serialization.ResettableFileInputStream;
-import org.apache.flume.serialization.ResettableInputStream;
+import org.apache.flume.serialization.*;
 import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
+import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.ConsumeOrder;
 import org.apache.flume.tools.PlatformDetect;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.io.Files;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.*;
+import java.util.regex.Pattern;
 
 /**
  * <p/>A {@link ReliableEventReader} which reads log data from files stored
@@ -98,7 +91,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
   private final String deletePolicy;
   private final Charset inputCharset;
   private final DecodeErrorPolicy decodeErrorPolicy;
-
+  private final ConsumeOrder consumeOrder;    
+  
   private Optional<FileInfo> currentFile = Optional.absent();
   /** Always contains the last file from which lines have been read. **/
   private Optional<FileInfo> lastFileRead = Optional.absent();
@@ -113,7 +107,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
       boolean annotateBaseName, String baseNameHeader,
       String deserializerType, Context deserializerContext,
       String deletePolicy, String inputCharset,
-      DecodeErrorPolicy decodeErrorPolicy) throws IOException {
+      DecodeErrorPolicy decodeErrorPolicy, 
+      ConsumeOrder consumeOrder) throws IOException {
 
     // Sanity checks
     Preconditions.checkNotNull(spoolDirectory);
@@ -173,6 +168,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
     this.deletePolicy = deletePolicy;
     this.inputCharset = Charset.forName(inputCharset);
     this.decodeErrorPolicy = Preconditions.checkNotNull(decodeErrorPolicy);
+    this.consumeOrder = Preconditions.checkNotNull(consumeOrder);    
 
     File trackerDirectory = new File(trackerDirPath);
 
@@ -403,9 +399,16 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
   }
 
   /**
-   * Find and open the oldest file in the chosen directory. If two or more
-   * files are equally old, the file name with lower lexicographical value is
-   * returned. If the directory is empty, this will return an absent option.
+   * Returns the next file to be consumed from the chosen directory.
+   * If the directory is empty or the chosen file is not readable,
+   * this will return an absent option.
+   * If the {@link #consumeOrder} variable is {@link ConsumeOrder#OLDEST}
+   * then returns the oldest file. If the {@link #consumeOrder} variable
+   * is {@link ConsumeOrder#YOUNGEST} then returns the youngest file.
+   * If two or more files are equally old/young, then the file name with
+   * lower lexicographical value is returned.
+   * If the {@link #consumeOrder} variable is {@link ConsumeOrder#RANDOM}
+   * then returns any arbitrary file in the directory.
    */
   private Optional<FileInfo> getNextFile() {
     /* Filter to exclude finished or hidden files */
@@ -421,55 +424,72 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
         return true;
       }
     };
-    List<File> candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter));
-    if (candidateFiles.isEmpty()) {
+    List<File> candidateFiles = Arrays.asList(
+      spoolDirectory.listFiles(filter));
+    if (candidateFiles.isEmpty()) { // No matching file in spooling directory.
       return Optional.absent();
-    } else {
-      Collections.sort(candidateFiles, new Comparator<File>() {
-        public int compare(File a, File b) {
-          int timeComparison = new Long(a.lastModified()).compareTo(
-              new Long(b.lastModified()));
-          if (timeComparison != 0) {
-            return timeComparison;
-          }
-          else {
-            return a.getName().compareTo(b.getName());
-          }
+    }
+    
+    File selectedFile = candidateFiles.get(0); // Select the first random file.
+    if (consumeOrder == ConsumeOrder.RANDOM) { // Selected file is random.
+      return openFile(selectedFile);
+    } else if (consumeOrder == ConsumeOrder.YOUNGEST) {
+      for (File candidateFile: candidateFiles) {
+        if (candidateFile.lastModified() >
+          selectedFile.lastModified()) {
+          selectedFile = candidateFile;
         }
-      });
-      File nextFile = candidateFiles.get(0);
-      try {
-        // roll the meta file, if needed
-        String nextPath = nextFile.getPath();
-        PositionTracker tracker =
-            DurablePositionTracker.getInstance(metaFile, nextPath);
-        if (!tracker.getTarget().equals(nextPath)) {
-          tracker.close();
-          deleteMetaFile();
-          tracker = DurablePositionTracker.getInstance(metaFile, nextPath);
+      }
+    } else { // default order is OLDEST
+      for (File candidateFile: candidateFiles) {
+        if (candidateFile.lastModified() <
+          selectedFile.lastModified()) {
+          selectedFile = candidateFile;
         }
+      }
+    }
 
-        // sanity check
-        Preconditions.checkState(tracker.getTarget().equals(nextPath),
-            "Tracker target %s does not equal expected filename %s",
-            tracker.getTarget(), nextPath);
-
-        ResettableInputStream in =
-            new ResettableFileInputStream(nextFile, tracker,
-                ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset,
-                decodeErrorPolicy);
-        EventDeserializer deserializer = EventDeserializerFactory.getInstance
-            (deserializerType, deserializerContext, in);
-
-        return Optional.of(new FileInfo(nextFile, deserializer));
-      } catch (FileNotFoundException e) {
-        // File could have been deleted in the interim
-        logger.warn("Could not find file: " + nextFile, e);
-        return Optional.absent();
-      } catch (IOException e) {
-        logger.error("Exception opening file: " + nextFile, e);
-        return Optional.absent();
+    return openFile(selectedFile);
+  }
+  
+  /**
+   * Opens a file for consuming
+   * @param file
+   * @return {@link #FileInfo} for the file to consume or absent option if the
+   * file does not exists or readable.
+   */
+  private Optional<FileInfo> openFile(File file) {    
+    try {
+      // roll the meta file, if needed
+      String nextPath = file.getPath();
+      PositionTracker tracker =
+          DurablePositionTracker.getInstance(metaFile, nextPath);
+      if (!tracker.getTarget().equals(nextPath)) {
+        tracker.close();
+        deleteMetaFile();
+        tracker = DurablePositionTracker.getInstance(metaFile, nextPath);
       }
+
+      // sanity check
+      Preconditions.checkState(tracker.getTarget().equals(nextPath),
+          "Tracker target %s does not equal expected filename %s",
+          tracker.getTarget(), nextPath);
+
+      ResettableInputStream in =
+          new ResettableFileInputStream(file, tracker,
+              ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset,
+              decodeErrorPolicy);
+      EventDeserializer deserializer = EventDeserializerFactory.getInstance
+          (deserializerType, deserializerContext, in);
+
+      return Optional.of(new FileInfo(file, deserializer));
+    } catch (FileNotFoundException e) {
+      // File could have been deleted in the interim
+      logger.warn("Could not find file: " + file, e);
+      return Optional.absent();
+    } catch (IOException e) {
+      logger.error("Exception opening file: " + file, e);
+      return Optional.absent();
     }
   }
 
@@ -536,7 +556,9 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
     private DecodeErrorPolicy decodeErrorPolicy = DecodeErrorPolicy.valueOf(
         SpoolDirectorySourceConfigurationConstants.DEFAULT_DECODE_ERROR_POLICY
             .toUpperCase());
-
+    private ConsumeOrder consumeOrder = 
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_CONSUME_ORDER;    
+    
     public Builder spoolDirectory(File directory) {
       this.spoolDirectory = directory;
       return this;
@@ -601,12 +623,18 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader
{
       this.decodeErrorPolicy = decodeErrorPolicy;
       return this;
     }
-
+    
+    public Builder consumeOrder(ConsumeOrder consumeOrder) {
+      this.consumeOrder = consumeOrder;
+      return this;
+    }        
+    
     public ReliableSpoolingFileEventReader build() throws IOException {
       return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
           ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
           annotateBaseName, baseNameHeader, deserializerType,
-          deserializerContext, deletePolicy, inputCharset, decodeErrorPolicy);
+          deserializerContext, deletePolicy, inputCharset, decodeErrorPolicy,
+          consumeOrder);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/61b9bcbb/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 49d8826..d0c2e99 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -17,14 +17,8 @@
 
 package org.apache.flume.source;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import org.apache.flume.*;
 import org.apache.flume.client.avro.ReliableSpoolingFileEventReader;
@@ -35,10 +29,14 @@ import org.apache.flume.serialization.LineDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
-import static org.apache.flume.source
-    .SpoolDirectorySourceConfigurationConstants.*;
+import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.*;
 
 public class SpoolDirectorySource extends AbstractSource implements
 Configurable, EventDrivenSource {
@@ -72,6 +70,7 @@ Configurable, EventDrivenSource {
   private boolean backoff = true;
   private boolean hitChannelException = false;
   private int maxBackoff;
+  private ConsumeOrder consumeOrder;
 
   @Override
   public synchronized void start() {
@@ -96,6 +95,7 @@ Configurable, EventDrivenSource {
           .deletePolicy(deletePolicy)
           .inputCharset(inputCharset)
           .decodeErrorPolicy(decodeErrorPolicy)
+          .consumeOrder(consumeOrder)
           .build();
     } catch (IOException ioe) {
       throw new FlumeException("Error instantiating spooling event parser",
@@ -163,6 +163,9 @@ Configurable, EventDrivenSource {
     deserializerType = context.getString(DESERIALIZER, DEFAULT_DESERIALIZER);
     deserializerContext = new Context(context.getSubProperties(DESERIALIZER +
         "."));
+    
+    consumeOrder = ConsumeOrder.valueOf(context.getString(CONSUME_ORDER, 
+        DEFAULT_CONSUME_ORDER.toString()).toUpperCase());
 
     // "Hack" to support backwards compatibility with previous generation of
     // spooling directory source, which did not support deserializers

http://git-wip-us.apache.org/repos/asf/flume/blob/61b9bcbb/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index 83522c0..895433e 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -86,4 +86,11 @@ public class SpoolDirectorySourceConfigurationConstants {
   public static final String MAX_BACKOFF = "maxBackoff";
 
   public static final Integer DEFAULT_MAX_BACKOFF = 4000;
+  
+  /** Consume order. */
+  public enum ConsumeOrder {
+    OLDEST, YOUNGEST, RANDOM
+  }
+  public static final String CONSUME_ORDER = "consumeOrder";
+  public static final ConsumeOrder DEFAULT_CONSUME_ORDER = ConsumeOrder.OLDEST;    
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/61b9bcbb/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index 9d708c1..0b07e7a 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -19,14 +19,16 @@ package org.apache.flume.client.avro;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import junit.framework.Assert;
+import org.apache.commons.io.FileUtils;
 import org.apache.flume.Event;
-import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
 import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.DeletePolicy;
+import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
+import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.ConsumeOrder;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,7 +36,8 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
-import java.util.List;
+import java.util.*;
+import java.util.Map.Entry;
 
 public class TestReliableSpoolingFileEventReader {
 
@@ -67,9 +70,12 @@ public class TestReliableSpoolingFileEventReader {
 
   @After
   public void tearDown() {
+    deleteDir(WORK_DIR);
+  }
 
+  private void deleteDir(File dir) {
     // delete all the files & dirs we created
-    File[] files = WORK_DIR.listFiles();
+    File[] files = dir.listFiles();
     for (File f : files) {
       if (f.isDirectory()) {
         File[] subDirFiles = f.listFiles();
@@ -87,10 +93,9 @@ public class TestReliableSpoolingFileEventReader {
         }
       }
     }
-    if (!WORK_DIR.delete()) {
-      logger.warn("Cannot delete work directory {}", WORK_DIR.getAbsolutePath());
+    if (!dir.delete()) {
+      logger.warn("Cannot delete work directory {}", dir.getAbsolutePath());
     }
-
   }
 
   @Test
@@ -188,6 +193,173 @@ public class TestReliableSpoolingFileEventReader {
         trackerFiles.size());
   }
 
+  @Test(expected = NullPointerException.class)
+  public void testNullConsumeOrder() throws IOException {
+    new ReliableSpoolingFileEventReader.Builder()
+    .spoolDirectory(WORK_DIR)
+    .consumeOrder(null)
+    .build();
+  }
+  
+  @Test
+  public void testConsumeFileRandomly() throws IOException {
+    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
+    .spoolDirectory(WORK_DIR)
+    .consumeOrder(ConsumeOrder.RANDOM)
+    .build();
+    File fileName = new File(WORK_DIR, "new-file");
+    FileUtils.write(fileName, "New file created in the end. Shoud be read randomly.\n");
+    Set<String> actual = Sets.newHashSet();     
+    readEventsForFilesInDir(WORK_DIR, reader, actual);      
+    Set<String> expected = Sets.newHashSet();
+    createExpectedFromFilesInSetup(expected);
+    expected.add("");
+    expected.add("New file created in the end. Shoud be read randomly.");
+    Assert.assertEquals(expected, actual);    
+  }
+
+
+  @Test
+  public void testConsumeFileOldest() throws IOException, InterruptedException {
+    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
+    .spoolDirectory(WORK_DIR)
+    .consumeOrder(ConsumeOrder.OLDEST)
+    .build();
+    File file1 = new File(WORK_DIR, "new-file1");   
+    File file2 = new File(WORK_DIR, "new-file2");    
+    File file3 = new File(WORK_DIR, "new-file3");
+    FileUtils.write(file2, "New file2 created.\n"); // file2 becoming older than file1 &
file3
+    Thread.sleep(1000L);
+    FileUtils.write(file1, "New file1 created.\n"); // file1 becoming older than file3
+    FileUtils.write(file3, "New file3 created.\n");
+    
+    List<String> actual = Lists.newLinkedList();    
+    readEventsForFilesInDir(WORK_DIR, reader, actual);        
+    List<String> expected = Lists.newLinkedList();
+    createExpectedFromFilesInSetup(expected);
+    expected.add(""); // Empty file was added in the last in setup.
+    expected.add("New file2 created.");
+    expected.add("New file1 created.");
+    expected.add("New file3 created.");    
+    Assert.assertEquals(expected, actual);
+  }
+  
+  @Test
+  public void testConsumeFileYoungest() throws IOException, InterruptedException {
+    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
+    .spoolDirectory(WORK_DIR)
+    .consumeOrder(ConsumeOrder.YOUNGEST)
+    .build();
+    Thread.sleep(1000L);
+    File file1 = new File(WORK_DIR, "new-file1");   
+    File file2 = new File(WORK_DIR, "new-file2");    
+    File file3 = new File(WORK_DIR, "new-file3");
+    FileUtils.write(file2, "New file2 created.\n"); // file2 is oldest among file1 &
file3.
+    Thread.sleep(1000L);      
+    FileUtils.write(file3, "New file3 created.\n"); // file3 becomes youngest then file2
but older from file1. 
+    FileUtils.write(file1, "New file1 created.\n"); // file1 becomes youngest in file2 &
file3.
+    List<String> actual = Lists.newLinkedList();    
+    readEventsForFilesInDir(WORK_DIR, reader, actual);        
+    List<String> expected = Lists.newLinkedList();
+    createExpectedFromFilesInSetup(expected);
+    Collections.sort(expected);
+    expected.add(0, ""); // Empty Line file was added in the last in Setup.
+    expected.add(0, "New file2 created.");    
+    expected.add(0, "New file3 created.");
+    expected.add(0, "New file1 created.");
+        
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test public void testLargeNumberOfFilesOLDEST() throws IOException {    
+    templateTestForLargeNumberOfFiles(ConsumeOrder.OLDEST, null, 1000);
+  }
+  @Test public void testLargeNumberOfFilesYOUNGEST() throws IOException {    
+    templateTestForLargeNumberOfFiles(ConsumeOrder.YOUNGEST, new Comparator<Long>()
{
+
+      @Override
+      public int compare(Long o1, Long o2) {
+        return o2.compareTo(o1);
+      }
+    }, 1000);
+  }
+  @Test public void testLargeNumberOfFilesRANDOM() throws IOException {    
+    templateTestForLargeNumberOfFiles(ConsumeOrder.RANDOM, null, 1000);
+  }
+  private void templateTestForLargeNumberOfFiles(ConsumeOrder order, 
+      Comparator<Long> comparator,
+      int N) throws IOException {
+    File dir = null;
+    try {
+      dir = new File("target/test/work/" + this.getClass().getSimpleName()+ "_large");
+      Files.createParentDirs(new File(dir, "dummy"));
+      ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
+      .spoolDirectory(dir).consumeOrder(order).build();
+      Map<Long, List<String>> expected;
+      if (comparator == null) {
+        expected = new TreeMap<Long, List<String>>();
+      } else {
+        expected = new TreeMap<Long, List<String>>(comparator);
+      }
+      for (int i = 0; i < N; i++) {
+        File f = new File(dir, "file-" + i);
+        String data = "file-" + i;
+        Files.write(data, f, Charsets.UTF_8);
+        if (expected.containsKey(f.lastModified())) {
+          expected.get(f.lastModified()).add(data);
+        } else {
+          expected.put(f.lastModified(), Lists.newArrayList(data));
+        }
+      }
+      Collection<String> expectedList;
+      if (order == ConsumeOrder.RANDOM) {
+        expectedList = Sets.newHashSet();
+      } else {
+        expectedList = Lists.newArrayList();
+      }
+      for (Entry<Long, List<String>> entry : expected.entrySet()) {
+        Collections.sort(entry.getValue());
+        expectedList.addAll(entry.getValue());
+      }
+      for (int i = 0; i < N; i++) {
+        List<Event> events;
+        events = reader.readEvents(10);
+        for (Event e : events) {
+          if (order == ConsumeOrder.RANDOM) {            
+            Assert.assertTrue(expectedList.remove(new String(e.getBody())));
+          } else {
+            Assert.assertEquals(((ArrayList<String>)expectedList).get(0), new String(e.getBody()));
           
+            ((ArrayList<String>)expectedList).remove(0);
+          }
+        }
+        reader.commit();        
+      }
+    } finally {
+      deleteDir(dir);
+    }
+  }
+    
+  /* Read events, one for each file in the given directory. */
+  private void readEventsForFilesInDir(File dir, ReliableEventReader reader, 
+      Collection<String> actual) throws IOException {
+    List<Event> events;
+    for (int i=0; i < listFiles(dir).size(); i++) {
+      events = reader.readEvents(10);
+      for (Event e: events) {
+        actual.add(new String(e.getBody()));
+      }
+      reader.commit();
+    }
+  }    
+  /* Create expected results out of the files created in the setup method. */
+  private void createExpectedFromFilesInSetup(Collection<String> expected) {
+    for (int i = 0; i < 4; i++) {      
+      for (int j = 0; j < i; j++) {        
+        expected.add("file" + i + "line" + j);
+      }      
+    }
+  }
+  
   private static List<File> listFiles(File dir) {
     List<File> files = Lists.newArrayList(dir.listFiles(new FileFilter
         () {
@@ -198,5 +370,4 @@ public class TestReliableSpoolingFileEventReader {
     }));
     return files;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/61b9bcbb/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
index 503ab4d..89e7c8c 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
@@ -75,6 +75,32 @@ public class TestSpoolDirectorySource {
     tmpDir.delete();
   }
 
+  @Test (expected = IllegalArgumentException.class)
+  public void testInvalidSortOrder() {
+    Context context = new Context();
+    context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, 
+        tmpDir.getAbsolutePath());
+    context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, 
+        "undefined");
+    Configurables.configure(source, context);    
+  }
+  
+  @Test
+  public void testValidSortOrder() {
+    Context context = new Context();
+    context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, 
+        tmpDir.getAbsolutePath());
+    context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, 
+        "oLdESt");
+    Configurables.configure(source, context);
+    context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, 
+        "yoUnGest");
+    Configurables.configure(source, context);
+    context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, 
+        "rAnDom");
+    Configurables.configure(source, context);    
+  }
+  
   @Test
   public void testPutFilenameHeader() throws IOException, InterruptedException {
     Context context = new Context();

http://git-wip-us.apache.org/repos/asf/flume/blob/61b9bcbb/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 4bcd8a2..7b918ed 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -953,6 +953,14 @@ basenameHeaderKey     basename        Header Key to use when appending
 basename
 ignorePattern         ^$              Regular expression specifying which files to ignore
(skip)
 trackerDir            .flumespool     Directory to store metadata related to processing of
files.
                                       If this path is not an absolute path, then it is interpreted
as relative to the spoolDir.
+consumeOrder          oldest          In which order files in the spooling directory will
be consumed ``oldest``,
+                                      ``youngest`` and ``random``. In case of ``oldest``
and ``youngest``, the last modified
+                                      time of the files will be used to compare the files.
In case of a tie, the file
+                                      with smallest laxicographical order will be consumed
first. In case of ``random`` any
+                                      file will be picked randomly. When using ``oldest``
and ``youngest`` the whole
+                                      directory will be scanned to pick the oldest/youngest
file, which might be slow if there
+                                      are a large number of files, while using ``random``
may cause old files to be consumed
+                                      very late if new files keep coming in the spooling
directory.
 maxBackoff            4000            The maximum time (in millis) to wait between consecutive
attempts to write to the channel(s) if the channel is full. The source will start at a low
backoff and increase it exponentially each time the channel throws a ChannelException, upto
the value specified by this parameter.
 batchSize             100             Granularity at which to batch transfer to the channel
 inputCharset          UTF-8           Character set used by deserializers that treat the
input file as text.


Mime
View raw message