Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7DA2117AD0 for ; Tue, 28 Apr 2015 20:20:51 +0000 (UTC) Received: (qmail 30295 invoked by uid 500); 28 Apr 2015 20:20:51 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 30225 invoked by uid 500); 28 Apr 2015 20:20:51 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 30215 invoked by uid 99); 28 Apr 2015 20:20:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Apr 2015 20:20:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 27563E0613; Tue, 28 Apr 2015 20:20:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jlowe@apache.org To: common-commits@hadoop.apache.org Message-Id: <24e3a5fc147c469fadf6194ca506e823@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon IOException during InMemoryMapOutput shuffle handler. Contributed by Eric Payne (cherry picked from commit bc1bd7e5c4047b374420683d36a8c30eda6d75b6) Date: Tue, 28 Apr 2015 20:20:51 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2.7 6b20b325f -> 30d0f1045 MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon IOException during InMemoryMapOutput shuffle handler. Contributed by Eric Payne (cherry picked from commit bc1bd7e5c4047b374420683d36a8c30eda6d75b6) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/30d0f104 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/30d0f104 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/30d0f104 Branch: refs/heads/branch-2.7 Commit: 30d0f10458fbe0e3a85ea2e22a1bb3d4454fd896 Parents: 6b20b32 Author: Jason Lowe Authored: Tue Apr 28 20:17:52 2015 +0000 Committer: Jason Lowe Committed: Tue Apr 28 20:20:32 2015 +0000 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapreduce/task/reduce/Fetcher.java | 6 ++-- .../mapreduce/task/reduce/TestFetcher.java | 34 ++++++++++++++++++++ 3 files changed, 41 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/30d0f104/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 85c6fea..10491f9 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -23,6 +23,9 @@ Release 2.7.1 - UNRELEASED MAPREDUCE-6252. JobHistoryServer should not fail when encountering a missing directory. (Craig Welch via devaraj) + MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon + IOException during InMemoryMapOutput shuffle handler (Eric Payne via jlowe) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/30d0f104/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index d867e4b..4b80dc9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -553,7 +553,10 @@ class Fetcher extends Thread { metrics.successFetch(); return null; } catch (IOException ioe) { - + if (mapOutput != null) { + mapOutput.abort(); + } + if (canRetry) { checkTimeoutOrRetry(host, ioe); } @@ -574,7 +577,6 @@ class Fetcher extends Thread { " from " + host.getHostName(), ioe); // Inform the shuffle-scheduler - mapOutput.abort(); metrics.failedFetch(); return new TaskAttemptID[] {mapId}; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/30d0f104/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index 723df17..a9cd33e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -628,6 +628,40 @@ public class TestFetcher { verify(odmo).abort(); } + @SuppressWarnings("unchecked") + @Test(timeout=10000) + public void testCopyFromHostWithRetryUnreserve() throws Exception { + InMemoryMapOutput immo = mock(InMemoryMapOutput.class); + Fetcher underTest = new FakeFetcher(jobWithRetry, + id, ss, mm, r, metrics, except, key, connection); + + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) + .thenReturn(replyHash); + ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + header.write(new DataOutputStream(bout)); + ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); + when(connection.getInputStream()).thenReturn(in); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + + // Verify that unreserve occurs if an exception happens after shuffle + // buffer is reserved. + when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) + .thenReturn(immo); + doThrow(new IOException("forced error")).when(immo).shuffle( + any(MapHost.class), any(InputStream.class), anyLong(), + anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class)); + + underTest.copyFromHost(host); + verify(immo).abort(); + } + public static class FakeFetcher extends Fetcher { // If connection need to be reopen.