Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1650F18954 for ; Thu, 26 Nov 2015 01:05:51 +0000 (UTC) Received: (qmail 95764 invoked by uid 500); 26 Nov 2015 01:05:50 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 95703 invoked by uid 500); 26 Nov 2015 01:05:50 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 95694 invoked by uid 99); 26 Nov 2015 01:05:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Nov 2015 01:05:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A6705E18F9; Thu, 26 Nov 2015 01:05:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rkanter@apache.org To: common-commits@hadoop.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause duplicate records (wilfreds via rkanter) Date: Thu, 26 Nov 2015 01:05:50 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/trunk e556c35b0 -> 7fd00b3db MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause duplicate records (wilfreds via rkanter) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7fd00b3d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7fd00b3d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7fd00b3d Branch: refs/heads/trunk Commit: 7fd00b3db4b7d73afd41276ba9a06ec06a0e1762 Parents: e556c35 Author: Robert Kanter Authored: Wed Nov 25 17:03:38 2015 -0800 Committer: Robert Kanter Committed: Wed Nov 25 17:03:38 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/hadoop/util/LineReader.java | 9 + hadoop-mapreduce-project/CHANGES.txt | 3 + .../lib/input/UncompressedSplitLineReader.java | 5 + .../hadoop/mapred/TestLineRecordReader.java | 230 +++++++++++++----- .../lib/input/TestLineRecordReader.java | 237 ++++++++++++++----- 5 files changed, 361 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fd00b3d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java index 900215a..153953d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java @@ -333,6 +333,10 @@ public class LineReader implements Closeable { //appending the ambiguous characters (refer case 2.2) str.append(recordDelimiterBytes, 0, ambiguousByteCount); ambiguousByteCount = 0; + // since it is now certain that the split did not split a delimiter we + // should not read the next record: clear the flag otherwise duplicate + // records could be generated + unsetNeedAdditionalRecordAfterSplit(); } if (appendLength > 0) { str.append(buffer, startPosn, appendLength); @@ -380,4 +384,9 @@ public class LineReader implements Closeable { protected int getBufferSize() { return bufferSize; } + + protected void unsetNeedAdditionalRecordAfterSplit() { + // needed for custom multi byte line delimiters only + // see MAPREDUCE-6549 for details + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fd00b3d/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c6e80e7..503e687 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -650,6 +650,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6557. Tests in mapreduce-client-app are writing outside of target. (Akira AJISAKA via junping_du) + MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause + duplicate records (wilfreds via rkanter) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fd00b3d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java index 38491b0..6d495ef 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java @@ -97,4 +97,9 @@ public class UncompressedSplitLineReader extends SplitLineReader { public boolean needAdditionalRecordAfterSplit() { return !finished && needAdditionalRecord; } + + @Override + protected void unsetNeedAdditionalRecordAfterSplit() { + needAdditionalRecord = false; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fd00b3d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java index d33a614..f0cf9f5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java @@ -334,12 +334,72 @@ public class TestLineRecordReader { @Test public void testUncompressedInput() throws Exception { Configuration conf = new Configuration(); - String inputData = "abc+++def+++ghi+++" - + "jkl+++mno+++pqr+++stu+++vw +++xyz"; + // single char delimiter, best case + String inputData = "abc+def+ghi+jkl+mno+pqr+stu+vw +xyz"; Path inputFile = createInputFile(conf, inputData); - conf.set("textinputformat.record.delimiter", "+++"); - for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { - for(int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.set("textinputformat.record.delimiter", "+"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter, best case + inputData = "abc|+|def|+|ghi|+|jkl|+|mno|+|pqr|+|stu|+|vw |+|xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "|+|"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // single char delimiter with empty records + inputData = "abc+def++ghi+jkl++mno+pqr++stu+vw ++xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with empty records + inputData = "abc|+||+|defghi|+|jkl|+||+|mno|+|pqr|+||+|stu|+|vw |+||+|xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "|+|"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with starting part of the delimiter in the data + inputData = "abc+def+-ghi+jkl+-mno+pqr+-stu+vw +-xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+-"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with newline as start of the delimiter + inputData = "abc\n+def\n+ghi\n+jkl\n+mno"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "\n+"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with newline in delimiter and in data + inputData = "abc\ndef+\nghi+\njkl\nmno"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+\n"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { conf.setInt("io.file.buffer.size", bufferSize); testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); } @@ -363,80 +423,126 @@ public class TestLineRecordReader { public void testUncompressedInputCustomDelimiterPosValue() throws Exception { Configuration conf = new Configuration(); - String inputData = "1234567890ab12ab345"; - Path inputFile = createInputFile(conf, inputData); conf.setInt("io.file.buffer.size", 10); conf.setInt(org.apache.hadoop.mapreduce.lib.input. LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); - String delimiter = "ab"; + String inputData = "abcdefghij++kl++mno"; + Path inputFile = createInputFile(conf, inputData); + String delimiter = "++"; byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); - FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + // the first split must contain two records to make sure that it also pulls + // in the record from the 2nd split + int splitLength = 15; + FileSplit split = new FileSplit(inputFile, 0, splitLength, (String[]) null); LineRecordReader reader = new LineRecordReader(conf, split, recordDelimiterBytes); LongWritable key = new LongWritable(); Text value = new Text(); - reader.next(key, value); - // Get first record:"1234567890" - assertEquals(10, value.getLength()); - // Position should be 12 right after "1234567890ab" - assertEquals(12, reader.getPos()); - reader.next(key, value); - // Get second record:"12" - assertEquals(2, value.getLength()); - // Position should be 16 right after "1234567890ab12ab" - assertEquals(16, reader.getPos()); - reader.next(key, value); - // Get third record:"345" - assertEquals(3, value.getLength()); - // Position should be 19 right after "1234567890ab12ab345" - assertEquals(19, reader.getPos()); + // Get first record: "abcdefghij" + assertTrue("Expected record got nothing", reader.next(key, value)); + assertEquals("Wrong length for record value", 10, value.getLength()); + // Position should be 12 right after "abcdefghij++" + assertEquals("Wrong position after record read", 12, reader.getPos()); + // Get second record: "kl" + assertTrue("Expected record got nothing", reader.next(key, value)); + assertEquals("Wrong length for record value", 2, value.getLength()); + // Position should be 16 right after "abcdefghij++kl++" + assertEquals("Wrong position after record read", 16, reader.getPos()); + // Get third record: "mno" + assertTrue("Expected record got nothing", reader.next(key, value)); + assertEquals("Wrong length for record value", 3, value.getLength()); + // Position should be 19 right after "abcdefghij++kl++mno" + assertEquals("Wrong position after record read", 19, reader.getPos()); assertFalse(reader.next(key, value)); - assertEquals(19, reader.getPos()); - - split = new FileSplit(inputFile, 15, 4, (String[])null); - reader = new LineRecordReader(conf, split, recordDelimiterBytes); - // No record is in the second split because the second split dropped + assertEquals("Wrong position after record read", 19, reader.getPos()); + reader.close(); + // No record is in the second split because the second split will drop // the first record, which was already reported by the first split. - // The position should be 19 right after "1234567890ab12ab345" - assertEquals(19, reader.getPos()); - assertFalse(reader.next(key, value)); - assertEquals(19, reader.getPos()); - - inputData = "123456789aab"; - inputFile = createInputFile(conf, inputData); - split = new FileSplit(inputFile, 0, 12, (String[])null); + split = new FileSplit(inputFile, splitLength, + inputData.length() - splitLength, (String[]) null); reader = new LineRecordReader(conf, split, recordDelimiterBytes); - reader.next(key, value); - // Get first record:"123456789a" - assertEquals(10, value.getLength()); - // Position should be 12 right after "123456789aab" - assertEquals(12, reader.getPos()); - assertFalse(reader.next(key, value)); - assertEquals(12, reader.getPos()); + // The position should be 19 right after "abcdefghij++kl++mno" and should + // not change + assertEquals("Wrong position after record read", 19, reader.getPos()); + assertFalse("Unexpected record returned", reader.next(key, value)); + assertEquals("Wrong position after record read", 19, reader.getPos()); + reader.close(); - inputData = "123456789a"; + // multi char delimiter with starting part of the delimiter in the data + inputData = "abcd+efgh++ijk++mno"; inputFile = createInputFile(conf, inputData); - split = new FileSplit(inputFile, 0, 10, (String[])null); + splitLength = 5; + split = new FileSplit(inputFile, 0, splitLength, (String[]) null); reader = new LineRecordReader(conf, split, recordDelimiterBytes); - reader.next(key, value); - // Get first record:"123456789a" - assertEquals(10, value.getLength()); - // Position should be 10 right after "123456789a" - assertEquals(10, reader.getPos()); + // Get first record: "abcd+efgh" + assertTrue("Expected record got nothing", reader.next(key, value)); + assertEquals("Wrong position after record read", 11, reader.getPos()); + assertEquals("Wrong length for record value", 9, value.getLength()); + // should have jumped over the delimiter, no record + assertFalse("Unexpected record returned", reader.next(key, value)); + assertEquals("Wrong position after record read", 11, reader.getPos()); + reader.close(); + // next split: check for duplicate or dropped records + split = new FileSplit(inputFile, splitLength, + inputData.length() - splitLength, (String[]) null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + // Get second record: "ijk" first in this split + assertTrue("Expected record got nothing", reader.next(key, value)); + assertEquals("Wrong position after record read", 16, reader.getPos()); + assertEquals("Wrong length for record value", 3, value.getLength()); + // Get third record: "mno" second in this split + assertTrue("Expected record got nothing", reader.next(key, value)); + assertEquals("Wrong position after record read", 19, reader.getPos()); + assertEquals("Wrong length for record value", 3, value.getLength()); + // should be at the end of the input assertFalse(reader.next(key, value)); - assertEquals(10, reader.getPos()); + assertEquals("Wrong position after record read", 19, reader.getPos()); + reader.close(); - inputData = "123456789ab"; + inputData = "abcd|efgh|+|ij|kl|+|mno|pqr"; inputFile = createInputFile(conf, inputData); - split = new FileSplit(inputFile, 0, 11, (String[])null); - reader = new LineRecordReader(conf, split, recordDelimiterBytes); - reader.next(key, value); - // Get first record:"123456789" - assertEquals(9, value.getLength()); - // Position should be 11 right after "123456789ab" - assertEquals(11, reader.getPos()); - assertFalse(reader.next(key, value)); - assertEquals(11, reader.getPos()); + delimiter = "|+|"; + recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + // walking over the buffer and split sizes checks for proper processing + // of the ambiguous bytes of the delimiter + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + split = new FileSplit(inputFile, 0, bufferSize, (String[]) null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + // Get first record: "abcd|efgh" always possible + assertTrue("Expected record got nothing", reader.next(key, value)); + assertTrue("abcd|efgh".equals(value.toString())); + assertEquals("Wrong position after record read", 9, value.getLength()); + // Position should be 12 right after "|+|" + int recordPos = 12; + assertEquals("Wrong position after record read", recordPos, + reader.getPos()); + // get the next record: "ij|kl" if the split/buffer allows it + if (reader.next(key, value)) { + // check the record info: "ij|kl" + assertTrue("ij|kl".equals(value.toString())); + // Position should be 20 right after "|+|" + recordPos = 20; + assertEquals("Wrong position after record read", recordPos, + reader.getPos()); + } + // get the third record: "mno|pqr" if the split/buffer allows it + if (reader.next(key, value)) { + // check the record info: "mno|pqr" + assertTrue("mno|pqr".equals(value.toString())); + // Position should be 27 at the end of the string now + recordPos = inputData.length(); + assertEquals("Wrong position after record read", recordPos, + reader.getPos()); + } + // no more records can be read we should still be at the last position + assertFalse("Unexpected record returned", reader.next(key, value)); + assertEquals("Wrong position after record read", recordPos, + reader.getPos()); + reader.close(); + } + } } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fd00b3d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java index dfe8b5d..6819af7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java @@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.lib.input; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.File; @@ -320,16 +321,76 @@ public class TestLineRecordReader { @Test public void testUncompressedInput() throws Exception { Configuration conf = new Configuration(); - String inputData = "abc+++def+++ghi+++" - + "jkl+++mno+++pqr+++stu+++vw +++xyz"; + // single char delimiter, best case + String inputData = "abc+def+ghi+jkl+mno+pqr+stu+vw +xyz"; Path inputFile = createInputFile(conf, inputData); - conf.set("textinputformat.record.delimiter", "+++"); + conf.set("textinputformat.record.delimiter", "+"); for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { for(int splitSize = 1; splitSize < inputData.length(); splitSize++) { conf.setInt("io.file.buffer.size", bufferSize); testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); } } + // multi char delimiter, best case + inputData = "abc|+|def|+|ghi|+|jkl|+|mno|+|pqr|+|stu|+|vw |+|xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "|+|"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // single char delimiter with empty records + inputData = "abc+def++ghi+jkl++mno+pqr++stu+vw ++xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with empty records + inputData = "abc|+||+|defghi|+|jkl|+||+|mno|+|pqr|+||+|stu|+|vw |+||+|xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "|+|"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with starting part of the delimiter in the data + inputData = "abc+def+-ghi+jkl+-mno+pqr+-stu+vw +-xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+-"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with newline as start of the delimiter + inputData = "abc\n+def\n+ghi\n+jkl\n+mno"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "\n+"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with newline in delimiter and in data + inputData = "abc\ndef+\nghi+\njkl\nmno"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+\n"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } } @Test @@ -349,91 +410,145 @@ public class TestLineRecordReader { public void testUncompressedInputCustomDelimiterPosValue() throws Exception { Configuration conf = new Configuration(); - String inputData = "1234567890ab12ab345"; - Path inputFile = createInputFile(conf, inputData); conf.setInt("io.file.buffer.size", 10); conf.setInt(org.apache.hadoop.mapreduce.lib.input. LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); - String delimiter = "ab"; + String inputData = "abcdefghij++kl++mno"; + Path inputFile = createInputFile(conf, inputData); + String delimiter = "++"; byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); - FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + int splitLength = 15; + FileSplit split = new FileSplit(inputFile, 0, splitLength, (String[])null); TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); LineRecordReader reader = new LineRecordReader(recordDelimiterBytes); reader.initialize(split, context); - LongWritable key; - Text value; - reader.nextKeyValue(); - key = reader.getCurrentKey(); - value = reader.getCurrentValue(); - // Get first record:"1234567890" - assertEquals(10, value.getLength()); - assertEquals(0, key.get()); - reader.nextKeyValue(); - // Get second record:"12" - assertEquals(2, value.getLength()); - // Key should be 12 right after "1234567890ab" - assertEquals(12, key.get()); - reader.nextKeyValue(); - // Get third record:"345" - assertEquals(3, value.getLength()); - // Key should be 16 right after "1234567890ab12ab" - assertEquals(16, key.get()); + // Get first record: "abcdefghij" + assertTrue("Expected record got nothing", reader.nextKeyValue()); + LongWritable key = reader.getCurrentKey(); + Text value = reader.getCurrentValue(); + assertEquals("Wrong length for record value", 10, value.getLength()); + assertEquals("Wrong position after record read", 0, key.get()); + // Get second record: "kl" + assertTrue("Expected record got nothing", reader.nextKeyValue()); + assertEquals("Wrong length for record value", 2, value.getLength()); + // Key should be 12 right after "abcdefghij++" + assertEquals("Wrong position after record read", 12, key.get()); + // Get third record: "mno" + assertTrue("Expected record got nothing", reader.nextKeyValue()); + assertEquals("Wrong length for record value", 3, value.getLength()); + // Key should be 16 right after "abcdefghij++kl++" + assertEquals("Wrong position after record read", 16, key.get()); assertFalse(reader.nextKeyValue()); - // Key should be 19 right after "1234567890ab12ab345" - assertEquals(19, key.get()); - - split = new FileSplit(inputFile, 15, 4, (String[])null); + // Key should be 19 right after "abcdefghij++kl++mno" + assertEquals("Wrong position after record read", 19, key.get()); + // after refresh should be empty + key = reader.getCurrentKey(); + assertNull("Unexpected key returned", key); + reader.close(); + split = new FileSplit(inputFile, splitLength, + inputData.length() - splitLength, (String[])null); reader = new LineRecordReader(recordDelimiterBytes); reader.initialize(split, context); // No record is in the second split because the second split dropped // the first record, which was already reported by the first split. - assertFalse(reader.nextKeyValue()); + assertFalse("Unexpected record returned", reader.nextKeyValue()); + key = reader.getCurrentKey(); + assertNull("Unexpected key returned", key); + reader.close(); - inputData = "123456789aab"; + // multi char delimiter with starting part of the delimiter in the data + inputData = "abcd+efgh++ijk++mno"; inputFile = createInputFile(conf, inputData); - split = new FileSplit(inputFile, 0, 12, (String[])null); + splitLength = 5; + split = new FileSplit(inputFile, 0, splitLength, (String[])null); reader = new LineRecordReader(recordDelimiterBytes); reader.initialize(split, context); - reader.nextKeyValue(); + // Get first record: "abcd+efgh" + assertTrue("Expected record got nothing", reader.nextKeyValue()); key = reader.getCurrentKey(); value = reader.getCurrentValue(); - // Get first record:"123456789a" - assertEquals(10, value.getLength()); - assertEquals(0, key.get()); + assertEquals("Wrong position after record read", 0, key.get()); + assertEquals("Wrong length for record value", 9, value.getLength()); + // should have jumped over the delimiter, no record assertFalse(reader.nextKeyValue()); - // Key should be 12 right after "123456789aab" - assertEquals(12, key.get()); - - inputData = "123456789a"; - inputFile = createInputFile(conf, inputData); - split = new FileSplit(inputFile, 0, 10, (String[])null); + assertEquals("Wrong position after record read", 11, key.get()); + // after refresh should be empty + key = reader.getCurrentKey(); + assertNull("Unexpected key returned", key); + reader.close(); + // next split: check for duplicate or dropped records + split = new FileSplit(inputFile, splitLength, + inputData.length () - splitLength, (String[])null); reader = new LineRecordReader(recordDelimiterBytes); reader.initialize(split, context); - reader.nextKeyValue(); + assertTrue("Expected record got nothing", reader.nextKeyValue()); key = reader.getCurrentKey(); value = reader.getCurrentValue(); - // Get first record:"123456789a" - assertEquals(10, value.getLength()); - assertEquals(0, key.get()); + // Get second record: "ijk" first in this split + assertEquals("Wrong position after record read", 11, key.get()); + assertEquals("Wrong length for record value", 3, value.getLength()); + // Get third record: "mno" second in this split + assertTrue("Expected record got nothing", reader.nextKeyValue()); + assertEquals("Wrong position after record read", 16, key.get()); + assertEquals("Wrong length for record value", 3, value.getLength()); + // should be at the end of the input assertFalse(reader.nextKeyValue()); - // Key should be 10 right after "123456789a" - assertEquals(10, key.get()); + assertEquals("Wrong position after record read", 19, key.get()); + reader.close(); - inputData = "123456789ab"; + inputData = "abcd|efgh|+|ij|kl|+|mno|pqr"; inputFile = createInputFile(conf, inputData); - split = new FileSplit(inputFile, 0, 11, (String[])null); - reader = new LineRecordReader(recordDelimiterBytes); - reader.initialize(split, context); - reader.nextKeyValue(); - key = reader.getCurrentKey(); - value = reader.getCurrentValue(); - // Get first record:"123456789" - assertEquals(9, value.getLength()); - assertEquals(0, key.get()); - assertFalse(reader.nextKeyValue()); - // Key should be 11 right after "123456789ab" - assertEquals(11, key.get()); + delimiter = "|+|"; + recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + // walking over the buffer and split sizes checks for proper processing + // of the ambiguous bytes of the delimiter + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + // track where we are in the inputdata + int keyPosition = 0; + conf.setInt("io.file.buffer.size", bufferSize); + split = new FileSplit(inputFile, 0, bufferSize, (String[]) null); + reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + // Get the first record: "abcd|efgh" always possible + assertTrue("Expected record got nothing", reader.nextKeyValue()); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + assertTrue("abcd|efgh".equals(value.toString())); + // Position should be 0 right at the start + assertEquals("Wrong position after record read", keyPosition, + key.get()); + // Position should be 12 right after the first "|+|" + keyPosition = 12; + // get the next record: "ij|kl" if the split/buffer allows it + if (reader.nextKeyValue()) { + // check the record info: "ij|kl" + assertTrue("ij|kl".equals(value.toString())); + assertEquals("Wrong position after record read", keyPosition, + key.get()); + // Position should be 20 after the second "|+|" + keyPosition = 20; + } + // get the third record: "mno|pqr" if the split/buffer allows it + if (reader.nextKeyValue()) { + // check the record info: "mno|pqr" + assertTrue("mno|pqr".equals(value.toString())); + assertEquals("Wrong position after record read", keyPosition, + key.get()); + // Position should be the end of the input + keyPosition = inputData.length(); + } + assertFalse("Unexpected record returned", reader.nextKeyValue()); + // no more records can be read we should be at the last position + assertEquals("Wrong position after record read", keyPosition, + key.get()); + // after refresh should be empty + key = reader.getCurrentKey(); + assertNull("Unexpected key returned", key); + reader.close(); + } + } } @Test