flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-2182. Spooling Directory Source will not ingest data completely when a wide character appears at the edge of a buffer
Date Sat, 28 Sep 2013 00:16:59 GMT
Updated Branches:
  refs/heads/trunk 0f4a66fb0 -> ffa706429


FLUME-2182. Spooling Directory Source will not ingest data completely when a wide character
appears at the edge of a buffer

(Sven Meys via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ffa70642
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ffa70642
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ffa70642

Branch: refs/heads/trunk
Commit: ffa706429186df2cf8ad04fd9dcba37b6a35d7f1
Parents: 0f4a66f
Author: Mike Percy <mpercy@cloudera.com>
Authored: Fri Sep 27 17:01:41 2013 -0700
Committer: Mike Percy <mpercy@cloudera.com>
Committed: Fri Sep 27 17:01:41 2013 -0700

----------------------------------------------------------------------
 .../ResettableFileInputStream.java              |  9 +++-
 .../TestResettableFileInputStream.java          | 46 ++++++++++++++++++++
 2 files changed, 54 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/ffa70642/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
index 09f490f..ecea5e2 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
@@ -64,6 +64,7 @@ public class ResettableFileInputStream extends ResettableInputStream
   private final CharsetDecoder decoder;
   private long position;
   private long syncPosition;
+  private int maxCharWidth;
 
   /**
    *
@@ -112,6 +113,7 @@ public class ResettableFileInputStream extends ResettableInputStream
     this.decoder = charset.newDecoder();
     this.position = 0;
     this.syncPosition = 0;
+    this.maxCharWidth = (int)Math.ceil(charset.newEncoder().maxBytesPerChar());
 
     seek(tracker.getPosition());
   }
@@ -152,7 +154,12 @@ public class ResettableFileInputStream extends ResettableInputStream
 
   @Override
   public synchronized int readChar() throws IOException {
-    if (!buf.hasRemaining()) {
+    // The decoder can have issues with multi-byte characters.
+    // This check ensures that there are at least maxCharWidth bytes in the buffer
+    // before reaching EOF.
+    if (buf.remaining() < maxCharWidth) {
+      buf.clear();
+      buf.flip();
       refillBuf();
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/ffa70642/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
index 5ad6a0a..066765c 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
@@ -18,6 +18,7 @@
 package org.apache.flume.serialization;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import junit.framework.Assert;
@@ -87,6 +88,27 @@ public class TestResettableFileInputStream {
   }
 
   /**
+   * Ensure that we can process lines that contain multi byte characters in weird places
+   * such as at the end of a buffer.
+   * @throws IOException
+   */
+  @Test
+  public void testWideCharRead() throws IOException {
+    String output = wideCharFileInit(file, Charsets.UTF_8);
+
+    PositionTracker tracker = new DurablePositionTracker(meta, file.getPath());
+    ResettableInputStream in = new ResettableFileInputStream(file,  tracker);
+
+    String result = readLine(in, output.length());
+    assertEquals(output, result);
+
+    String afterEOF = readLine(in, output.length());
+    assertNull(afterEOF);
+
+    in.close();
+  }
+
+  /**
    * Ensure a reset() brings us back to the default mark (beginning of file)
    * @throws IOException
    */
@@ -230,6 +252,30 @@ public class TestResettableFileInputStream {
   }
 
   /**
+   * Helper method that generates a line to test if parts of multi-byte characters on the
+   * edge of a buffer are handled properly.
+   */
+  private static String generateWideCharLine(){
+    String s = "éllo Wörld!\n";
+    int size = (ResettableFileInputStream.DEFAULT_BUF_SIZE - 1) + s.length();
+    return Strings.padStart(s, size , 'H');
+  }
+
+  /**
+   * Creates a file that contains a line that contains wide characters
+   * @param file
+   * @param charset
+   * @return
+   * @throws IOException
+   */
+  private static String wideCharFileInit(File file, Charset charset)
+      throws IOException {
+    String output = generateWideCharLine();
+    Files.write(output.getBytes(charset), file);
+    return output;
+  }
+
+  /**
    * Helper function to read a line from a character stream.
    * @param in
    * @param maxLength


Mime
View raw message