Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B6161107EB for ; Sat, 28 Sep 2013 00:16:59 +0000 (UTC) Received: (qmail 65749 invoked by uid 500); 28 Sep 2013 00:16:59 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 65727 invoked by uid 500); 28 Sep 2013 00:16:59 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 65718 invoked by uid 99); 28 Sep 2013 00:16:59 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 28 Sep 2013 00:16:59 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1CA7F90CA92; Sat, 28 Sep 2013 00:16:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: mpercy@apache.org To: commits@flume.apache.org Message-Id: <1cc5a131503543bb960a85f5796e11e8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-2182. Spooling Directory Source will not ingest data completely when a wide character appears at the edge of a buffer Date: Sat, 28 Sep 2013 00:16:59 +0000 (UTC) Updated Branches: refs/heads/trunk 0f4a66fb0 -> ffa706429 FLUME-2182. Spooling Directory Source will not ingest data completely when a wide character appears at the edge of a buffer (Sven Meys via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ffa70642 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ffa70642 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ffa70642 Branch: refs/heads/trunk Commit: ffa706429186df2cf8ad04fd9dcba37b6a35d7f1 Parents: 0f4a66f Author: Mike Percy Authored: Fri Sep 27 17:01:41 2013 -0700 Committer: Mike Percy Committed: Fri Sep 27 17:01:41 2013 -0700 ---------------------------------------------------------------------- .../ResettableFileInputStream.java | 9 +++- .../TestResettableFileInputStream.java | 46 ++++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ffa70642/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java index 09f490f..ecea5e2 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java @@ -64,6 +64,7 @@ public class ResettableFileInputStream extends ResettableInputStream private final CharsetDecoder decoder; private long position; private long syncPosition; + private int maxCharWidth; /** * @@ -112,6 +113,7 @@ public class ResettableFileInputStream extends ResettableInputStream this.decoder = charset.newDecoder(); this.position = 0; this.syncPosition = 0; + this.maxCharWidth = (int)Math.ceil(charset.newEncoder().maxBytesPerChar()); seek(tracker.getPosition()); } @@ -152,7 +154,12 @@ public class ResettableFileInputStream extends ResettableInputStream @Override public synchronized int readChar() throws IOException { - if (!buf.hasRemaining()) { + // The decoder can have issues with multi-byte characters. + // This check ensures that there are at least maxCharWidth bytes in the buffer + // before reaching EOF. + if (buf.remaining() < maxCharWidth) { + buf.clear(); + buf.flip(); refillBuf(); } http://git-wip-us.apache.org/repos/asf/flume/blob/ffa70642/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java index 5ad6a0a..066765c 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java @@ -18,6 +18,7 @@ package org.apache.flume.serialization; import com.google.common.base.Charsets; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.io.Files; import junit.framework.Assert; @@ -87,6 +88,27 @@ public class TestResettableFileInputStream { } /** + * Ensure that we can process lines that contain multi byte characters in weird places + * such as at the end of a buffer. + * @throws IOException + */ + @Test + public void testWideCharRead() throws IOException { + String output = wideCharFileInit(file, Charsets.UTF_8); + + PositionTracker tracker = new DurablePositionTracker(meta, file.getPath()); + ResettableInputStream in = new ResettableFileInputStream(file, tracker); + + String result = readLine(in, output.length()); + assertEquals(output, result); + + String afterEOF = readLine(in, output.length()); + assertNull(afterEOF); + + in.close(); + } + + /** * Ensure a reset() brings us back to the default mark (beginning of file) * @throws IOException */ @@ -230,6 +252,30 @@ public class TestResettableFileInputStream { } /** + * Helper method that generates a line to test if parts of multi-byte characters on the + * edge of a buffer are handled properly. + */ + private static String generateWideCharLine(){ + String s = "éllo Wörld!\n"; + int size = (ResettableFileInputStream.DEFAULT_BUF_SIZE - 1) + s.length(); + return Strings.padStart(s, size , 'H'); + } + + /** + * Creates a file that contains a line that contains wide characters + * @param file + * @param charset + * @return + * @throws IOException + */ + private static String wideCharFileInit(File file, Charset charset) + throws IOException { + String output = generateWideCharLine(); + Files.write(output.getBytes(charset), file); + return output; + } + + /** * Helper function to read a line from a character stream. * @param in * @param maxLength