Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C445D100A2 for ; Thu, 17 Sep 2015 14:31:42 +0000 (UTC) Received: (qmail 9850 invoked by uid 500); 17 Sep 2015 14:31:34 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 9785 invoked by uid 500); 17 Sep 2015 14:31:34 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 9776 invoked by uid 99); 17 Sep 2015 14:31:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Sep 2015 14:31:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6FE71E1086; Thu, 17 Sep 2015 14:31:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jlowe@apache.org To: common-commits@hadoop.apache.org Message-Id: <03c01e081adb43bb96450fbd8dce5dc1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: MAPREDUCE-6481. LineRecordReader may give incomplete record and wrong position/key information for uncompressed input sometimes. Contributed by Zhihai Xu Date: Thu, 17 Sep 2015 14:31:34 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/trunk 6c6e734f0 -> 58d1a02b8 MAPREDUCE-6481. LineRecordReader may give incomplete record and wrong position/key information for uncompressed input sometimes. Contributed by Zhihai Xu Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/58d1a02b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/58d1a02b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/58d1a02b Branch: refs/heads/trunk Commit: 58d1a02b8d66b1d2a6ac2158be32bd35ad2e69bd Parents: 6c6e734 Author: Jason Lowe Authored: Thu Sep 17 14:30:18 2015 +0000 Committer: Jason Lowe Committed: Thu Sep 17 14:30:18 2015 +0000 ---------------------------------------------------------------------- .../java/org/apache/hadoop/util/LineReader.java | 17 +- hadoop-mapreduce-project/CHANGES.txt | 4 + .../lib/input/UncompressedSplitLineReader.java | 31 +--- .../hadoop/mapred/TestLineRecordReader.java | 138 ++++++++++++++++ .../lib/input/TestLineRecordReader.java | 161 +++++++++++++++++++ 5 files changed, 316 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d1a02b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java index 1d1b569..900215a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java @@ -303,7 +303,10 @@ public class LineReader implements Closeable { startPosn = bufferPosn = 0; bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0); if (bufferLength <= 0) { - str.append(recordDelimiterBytes, 0, ambiguousByteCount); + if (ambiguousByteCount > 0) { + str.append(recordDelimiterBytes, 0, ambiguousByteCount); + bytesConsumed += ambiguousByteCount; + } break; // EOF } } @@ -325,13 +328,13 @@ public class LineReader implements Closeable { if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } + bytesConsumed += ambiguousByteCount; + if (appendLength >= 0 && ambiguousByteCount > 0) { + //appending the ambiguous characters (refer case 2.2) + str.append(recordDelimiterBytes, 0, ambiguousByteCount); + ambiguousByteCount = 0; + } if (appendLength > 0) { - if (ambiguousByteCount > 0) { - str.append(recordDelimiterBytes, 0, ambiguousByteCount); - //appending the ambiguous characters (refer case 2.2) - bytesConsumed += ambiguousByteCount; - ambiguousByteCount=0; - } str.append(buffer, startPosn, appendLength); txtLength += appendLength; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d1a02b/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 669fee5..cde6d92 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -566,6 +566,10 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6452. NPE when intermediate encrypt enabled for LocalRunner. (Zhihai Xu) + MAPREDUCE-6481. LineRecordReader may give incomplete record and wrong + position/key information for uncompressed input sometimes. (Zhihai Xu via + jlowe) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d1a02b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java index 52fb7b0..38491b0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java @@ -40,8 +40,6 @@ public class UncompressedSplitLineReader extends SplitLineReader { private long totalBytesRead = 0; private boolean finished = false; private boolean usingCRLF; - private int unusedBytes = 0; - private int lastBytesRead = 0; public UncompressedSplitLineReader(FSDataInputStream in, Configuration conf, byte[] recordDelimiterBytes, long splitLength) throws IOException { @@ -59,7 +57,6 @@ public class UncompressedSplitLineReader extends SplitLineReader { (int)(splitLength - totalBytesRead)); } int bytesRead = in.read(buffer, 0, maxBytesToRead); - lastBytesRead = bytesRead; // If the split ended in the middle of a record delimiter then we need // to read one additional record, as the consumer of the next split will @@ -83,39 +80,17 @@ public class UncompressedSplitLineReader extends SplitLineReader { @Override public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { - long bytesRead = 0; + int bytesRead = 0; if (!finished) { // only allow at most one more record to be read after the stream // reports the split ended if (totalBytesRead > splitLength) { finished = true; } - bytesRead = totalBytesRead; - int bytesConsumed = super.readLine(str, maxLineLength, maxBytesToConsume); - bytesRead = totalBytesRead - bytesRead; - // No records left. - if (bytesConsumed == 0 && bytesRead == 0) { - return 0; - } - - int bufferSize = getBufferSize(); - - // Add the remaining buffer size not used for the last call - // of fillBuffer method. - if (lastBytesRead <= 0) { - bytesRead += bufferSize; - } else if (bytesRead > 0) { - bytesRead += bufferSize - lastBytesRead; - } - - // Adjust the size of the buffer not used for this record. - // The size is carried over for the next calculation. - bytesRead += unusedBytes; - unusedBytes = bufferSize - getBufferPosn(); - bytesRead -= unusedBytes; + bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume); } - return (int) bytesRead; + return bytesRead; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d1a02b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java index a5c9933..d33a614 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapred; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -357,4 +358,141 @@ public class TestLineRecordReader { } } } + + @Test + public void testUncompressedInputCustomDelimiterPosValue() + throws Exception { + Configuration conf = new Configuration(); + String inputData = "1234567890ab12ab345"; + Path inputFile = createInputFile(conf, inputData); + conf.setInt("io.file.buffer.size", 10); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + String delimiter = "ab"; + byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + LineRecordReader reader = new LineRecordReader(conf, split, + recordDelimiterBytes); + LongWritable key = new LongWritable(); + Text value = new Text(); + reader.next(key, value); + // Get first record:"1234567890" + assertEquals(10, value.getLength()); + // Position should be 12 right after "1234567890ab" + assertEquals(12, reader.getPos()); + reader.next(key, value); + // Get second record:"12" + assertEquals(2, value.getLength()); + // Position should be 16 right after "1234567890ab12ab" + assertEquals(16, reader.getPos()); + reader.next(key, value); + // Get third record:"345" + assertEquals(3, value.getLength()); + // Position should be 19 right after "1234567890ab12ab345" + assertEquals(19, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(19, reader.getPos()); + + split = new FileSplit(inputFile, 15, 4, (String[])null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + // No record is in the second split because the second split dropped + // the first record, which was already reported by the first split. + // The position should be 19 right after "1234567890ab12ab345" + assertEquals(19, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(19, reader.getPos()); + + inputData = "123456789aab"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 12, (String[])null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + reader.next(key, value); + // Get first record:"123456789a" + assertEquals(10, value.getLength()); + // Position should be 12 right after "123456789aab" + assertEquals(12, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(12, reader.getPos()); + + inputData = "123456789a"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 10, (String[])null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + reader.next(key, value); + // Get first record:"123456789a" + assertEquals(10, value.getLength()); + // Position should be 10 right after "123456789a" + assertEquals(10, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(10, reader.getPos()); + + inputData = "123456789ab"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 11, (String[])null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + reader.next(key, value); + // Get first record:"123456789" + assertEquals(9, value.getLength()); + // Position should be 11 right after "123456789ab" + assertEquals(11, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(11, reader.getPos()); + } + + @Test + public void testUncompressedInputDefaultDelimiterPosValue() + throws Exception { + Configuration conf = new Configuration(); + String inputData = "1234567890\r\n12\r\n345"; + Path inputFile = createInputFile(conf, inputData); + conf.setInt("io.file.buffer.size", 10); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + LineRecordReader reader = new LineRecordReader(conf, split, + null); + LongWritable key = new LongWritable(); + Text value = new Text(); + reader.next(key, value); + // Get first record:"1234567890" + assertEquals(10, value.getLength()); + // Position should be 12 right after "1234567890\r\n" + assertEquals(12, reader.getPos()); + reader.next(key, value); + // Get second record:"12" + assertEquals(2, value.getLength()); + // Position should be 16 right after "1234567890\r\n12\r\n" + assertEquals(16, reader.getPos()); + assertFalse(reader.next(key, value)); + + split = new FileSplit(inputFile, 15, 4, (String[])null); + reader = new LineRecordReader(conf, split, null); + // The second split dropped the first record "\n" + // The position should be 16 right after "1234567890\r\n12\r\n" + assertEquals(16, reader.getPos()); + reader.next(key, value); + // Get third record:"345" + assertEquals(3, value.getLength()); + // Position should be 19 right after "1234567890\r\n12\r\n345" + assertEquals(19, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(19, reader.getPos()); + + inputData = "123456789\r\r\n"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 12, (String[])null); + reader = new LineRecordReader(conf, split, null); + reader.next(key, value); + // Get first record:"123456789" + assertEquals(9, value.getLength()); + // Position should be 10 right after "123456789\r" + assertEquals(10, reader.getPos()); + reader.next(key, value); + // Get second record:"" + assertEquals(0, value.getLength()); + // Position should be 12 right after "123456789\r\r\n" + assertEquals(12, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(12, reader.getPos()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d1a02b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java index 3c1f28f..dfe8b5d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.lib.input; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -37,6 +38,8 @@ import org.apache.commons.io.Charsets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.Decompressor; @@ -341,4 +344,162 @@ public class TestLineRecordReader { } } } + + @Test + public void testUncompressedInputCustomDelimiterPosValue() + throws Exception { + Configuration conf = new Configuration(); + String inputData = "1234567890ab12ab345"; + Path inputFile = createInputFile(conf, inputData); + conf.setInt("io.file.buffer.size", 10); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + String delimiter = "ab"; + byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, + new TaskAttemptID()); + LineRecordReader reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + LongWritable key; + Text value; + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"1234567890" + assertEquals(10, value.getLength()); + assertEquals(0, key.get()); + reader.nextKeyValue(); + // Get second record:"12" + assertEquals(2, value.getLength()); + // Key should be 12 right after "1234567890ab" + assertEquals(12, key.get()); + reader.nextKeyValue(); + // Get third record:"345" + assertEquals(3, value.getLength()); + // Key should be 16 right after "1234567890ab12ab" + assertEquals(16, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 19 right after "1234567890ab12ab345" + assertEquals(19, key.get()); + + split = new FileSplit(inputFile, 15, 4, (String[])null); + reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + // No record is in the second split because the second split dropped + // the first record, which was already reported by the first split. + assertFalse(reader.nextKeyValue()); + + inputData = "123456789aab"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 12, (String[])null); + reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"123456789a" + assertEquals(10, value.getLength()); + assertEquals(0, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 12 right after "123456789aab" + assertEquals(12, key.get()); + + inputData = "123456789a"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 10, (String[])null); + reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"123456789a" + assertEquals(10, value.getLength()); + assertEquals(0, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 10 right after "123456789a" + assertEquals(10, key.get()); + + inputData = "123456789ab"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 11, (String[])null); + reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"123456789" + assertEquals(9, value.getLength()); + assertEquals(0, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 11 right after "123456789ab" + assertEquals(11, key.get()); + } + + @Test + public void testUncompressedInputDefaultDelimiterPosValue() + throws Exception { + Configuration conf = new Configuration(); + String inputData = "1234567890\r\n12\r\n345"; + Path inputFile = createInputFile(conf, inputData); + conf.setInt("io.file.buffer.size", 10); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, + new TaskAttemptID()); + LineRecordReader reader = new LineRecordReader(null); + reader.initialize(split, context); + LongWritable key; + Text value; + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"1234567890" + assertEquals(10, value.getLength()); + assertEquals(0, key.get()); + reader.nextKeyValue(); + // Get second record:"12" + assertEquals(2, value.getLength()); + // Key should be 12 right after "1234567890\r\n" + assertEquals(12, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 16 right after "1234567890\r\n12\r\n" + assertEquals(16, key.get()); + + split = new FileSplit(inputFile, 15, 4, (String[])null); + reader = new LineRecordReader(null); + reader.initialize(split, context); + // The second split dropped the first record "\n" + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get third record:"345" + assertEquals(3, value.getLength()); + // Key should be 16 right after "1234567890\r\n12\r\n" + assertEquals(16, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 19 right after "1234567890\r\n12\r\n345" + assertEquals(19, key.get()); + + inputData = "123456789\r\r\n"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 12, (String[])null); + reader = new LineRecordReader(null); + reader.initialize(split, context); + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"123456789" + assertEquals(9, value.getLength()); + assertEquals(0, key.get()); + reader.nextKeyValue(); + // Get second record:"" + assertEquals(0, value.getLength()); + // Key should be 10 right after "123456789\r" + assertEquals(10, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 12 right after "123456789\r\r\n" + assertEquals(12, key.get()); + } }