Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9A52D1722C for ; Thu, 6 Nov 2014 21:56:22 +0000 (UTC) Received: (qmail 21610 invoked by uid 500); 6 Nov 2014 21:56:22 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 21542 invoked by uid 500); 6 Nov 2014 21:56:22 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 21532 invoked by uid 99); 6 Nov 2014 21:56:22 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Nov 2014 21:56:22 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id AE5B792843C; Thu, 6 Nov 2014 21:56:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kihwal@apache.org To: common-commits@hadoop.apache.org Message-Id: <19dfc2e238ed401cb05eee19c7b29836@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: MAPREDUCE-5958. Wrong reduce task progress if map output is compressed. Contributed by Emilio Coppa and Jason Lowe. (cherry picked from commit 8f701ae07a0b1dc70b8e1eb8d4a5c35c0a1e76da) Date: Thu, 6 Nov 2014 21:56:21 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2 d0101f1ce -> a19123df9 MAPREDUCE-5958. Wrong reduce task progress if map output is compressed. Contributed by Emilio Coppa and Jason Lowe. (cherry picked from commit 8f701ae07a0b1dc70b8e1eb8d4a5c35c0a1e76da) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a19123df Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a19123df Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a19123df Branch: refs/heads/branch-2 Commit: a19123df9abea6095bdf11b333a94a52c2500ba5 Parents: d0101f1 Author: Kihwal Lee Authored: Thu Nov 6 15:55:32 2014 -0600 Committer: Kihwal Lee Committed: Thu Nov 6 15:56:10 2014 -0600 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../java/org/apache/hadoop/mapred/Merger.java | 12 +-- .../mapreduce/task/reduce/TestMerger.java | 80 ++++++++++++++------ 3 files changed, 64 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a19123df/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c9f9ed2..51909bd 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -243,6 +243,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5960. JobSubmitter's check whether job.jar is local is incorrect with no authority in job jar path. (Gera Shegalov via jlowe) + MAPREDUCE-5958. Wrong reduce task progress if map output is compressed + (Emilio Coppa and jlowe via kihwal) + Release 2.5.2 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/a19123df/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java index 9285516..b44e742 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java @@ -515,9 +515,9 @@ public class Merger { } private void adjustPriorityQueue(Segment reader) throws IOException{ - long startPos = reader.getPosition(); + long startPos = reader.getReader().bytesRead; boolean hasNext = reader.nextRawKey(); - long endPos = reader.getPosition(); + long endPos = reader.getReader().bytesRead; totalBytesProcessed += endPos - startPos; mergeProgress.set(totalBytesProcessed * progPerByte); if (hasNext) { @@ -543,7 +543,7 @@ public class Merger { } } minSegment = top(); - long startPos = minSegment.getPosition(); + long startPos = minSegment.getReader().bytesRead; key = minSegment.getKey(); if (!minSegment.inMemory()) { //When we load the value from an inmemory segment, we reset @@ -560,7 +560,7 @@ public class Merger { } else { minSegment.getValue(value); } - long endPos = minSegment.getPosition(); + long endPos = minSegment.getReader().bytesRead; totalBytesProcessed += endPos - startPos; mergeProgress.set(totalBytesProcessed * progPerByte); return true; @@ -638,9 +638,9 @@ public class Merger { // Initialize the segment at the last possible moment; // this helps in ensuring we don't use buffers until we need them segment.init(readsCounter); - long startPos = segment.getPosition(); + long startPos = segment.getReader().bytesRead; boolean hasNext = segment.nextRawKey(); - long endPos = segment.getPosition(); + long endPos = segment.getReader().bytesRead; if (hasNext) { startBytes += endPos - startPos; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a19123df/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java index c5ab420..651dd38 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java @@ -18,13 +18,12 @@ package org.apache.hadoop.mapreduce.task.reduce; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.doAnswer; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -32,9 +31,8 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import org.apache.hadoop.fs.FSDataInputStream; -import org.junit.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; @@ -43,14 +41,15 @@ import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Counters.Counter; -import org.apache.hadoop.mapred.IFile.Reader; import org.apache.hadoop.mapred.IFile; +import org.apache.hadoop.mapred.IFile.Reader; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MROutputFiles; import org.apache.hadoop.mapred.Merger; import org.apache.hadoop.mapred.Merger.Segment; import org.apache.hadoop.mapred.RawKeyValueIterator; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -58,21 +57,17 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.security.TokenCache; -import org.apache.hadoop.mapreduce.CryptoUtils; -import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl; import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; -import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.google.common.collect.Lists; - public class TestMerger { private Configuration conf; @@ -254,7 +249,7 @@ public class TestMerger { testMergeShouldReturnProperProgress(getUncompressedSegments()); } - @SuppressWarnings( { "deprecation", "unchecked" }) + @SuppressWarnings( { "unchecked" }) public void testMergeShouldReturnProperProgress( List> segments) throws IOException { Path tmpDir = new Path("localpath"); @@ -267,7 +262,38 @@ public class TestMerger { RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass, valueClass, segments, 2, tmpDir, comparator, getReporter(), readsCounter, writesCounter, mergePhase); - Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), 0.0f); + final float epsilon = 0.00001f; + + // Reading 6 keys total, 3 each in 2 segments, so each key read moves the + // progress forward 1/6th of the way. Initially the first keys from each + // segment have been read as part of the merge setup, so progress = 2/6. + Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon); + + // The first next() returns one of the keys already read during merge setup + Assert.assertTrue(mergeQueue.next()); + Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon); + + // Subsequent next() calls should read one key and move progress + Assert.assertTrue(mergeQueue.next()); + Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon); + Assert.assertTrue(mergeQueue.next()); + Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon); + + // At this point we've exhausted all of the keys in one segment + // so getting the next key will return the already cached key from the + // other segment + Assert.assertTrue(mergeQueue.next()); + Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon); + + // Subsequent next() calls should read one key and move progress + Assert.assertTrue(mergeQueue.next()); + Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon); + Assert.assertTrue(mergeQueue.next()); + Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon); + + // Now there should be no more input + Assert.assertFalse(mergeQueue.next()); + Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon); } private Progressable getReporter() { @@ -281,7 +307,7 @@ public class TestMerger { private List> getUncompressedSegments() throws IOException { List> segments = new ArrayList>(); - for (int i = 1; i < 1; i++) { + for (int i = 0; i < 2; i++) { segments.add(getUncompressedSegment(i)); } return segments; @@ -289,44 +315,51 @@ public class TestMerger { private List> getCompressedSegments() throws IOException { List> segments = new ArrayList>(); - for (int i = 1; i < 1; i++) { + for (int i = 0; i < 2; i++) { segments.add(getCompressedSegment(i)); } return segments; } private Segment getUncompressedSegment(int i) throws IOException { - return new Segment(getReader(i), false); + return new Segment(getReader(i, false), false); } private Segment getCompressedSegment(int i) throws IOException { - return new Segment(getReader(i), false, 3000l); + return new Segment(getReader(i, true), false, 3000l); } @SuppressWarnings("unchecked") - private Reader getReader(int i) throws IOException { + private Reader getReader(int i, boolean isCompressedInput) + throws IOException { Reader readerMock = mock(Reader.class); + when(readerMock.getLength()).thenReturn(30l); when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn( 20l); when( readerMock.nextRawKey(any(DataInputBuffer.class))) - .thenAnswer(getKeyAnswer("Segment" + i)); + .thenAnswer(getKeyAnswer("Segment" + i, isCompressedInput)); doAnswer(getValueAnswer("Segment" + i)).when(readerMock).nextRawValue( any(DataInputBuffer.class)); return readerMock; } - private Answer getKeyAnswer(final String segmentName) { + private Answer getKeyAnswer(final String segmentName, + final boolean isCompressedInput) { return new Answer() { int i = 0; + @SuppressWarnings("unchecked") public Boolean answer(InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - DataInputBuffer key = (DataInputBuffer) args[0]; - if (i++ == 2) { + if (i++ == 3) { return false; } + Reader mock = (Reader) invocation.getMock(); + int multiplier = isCompressedInput ? 100 : 1; + mock.bytesRead += 10 * multiplier; + Object[] args = invocation.getArguments(); + DataInputBuffer key = (DataInputBuffer) args[0]; key.reset(("Segment Key " + segmentName + i).getBytes(), 20); return true; } @@ -340,9 +373,6 @@ public class TestMerger { public Void answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); DataInputBuffer key = (DataInputBuffer) args[0]; - if (i++ == 2) { - return null; - } key.reset(("Segment Value " + segmentName + i).getBytes(), 20); return null; }