Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5B6AFEB05 for ; Tue, 19 Mar 2013 15:48:58 +0000 (UTC) Received: (qmail 66679 invoked by uid 500); 19 Mar 2013 15:48:58 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 66641 invoked by uid 500); 19 Mar 2013 15:48:58 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 66633 invoked by uid 99); 19 Mar 2013 15:48:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Mar 2013 15:48:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Mar 2013 15:48:56 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A543123888EA; Tue, 19 Mar 2013 15:48:36 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1458351 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src... Date: Tue, 19 Mar 2013 15:48:36 -0000 To: mapreduce-commits@hadoop.apache.org From: jeagles@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130319154836.A543123888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jeagles Date: Tue Mar 19 15:48:36 2013 New Revision: 1458351 URL: http://svn.apache.org/r1458351 Log: MAPREDUCE-5053. java.lang.InternalError from decompression codec cause reducer to fail (Robert Parker via jeagles) Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1458351&r1=1458350&r2=1458351&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue Mar 19 15:48:36 2013 @@ -650,6 +650,9 @@ Release 0.23.7 - UNRELEASED MAPREDUCE-5042. Reducer unable to fetch for a map task that was recovered (Jason Lowe via bobby) + MAPREDUCE-5053. java.lang.InternalError from decompression codec cause + reducer to fail (Robert Parker via jeagles) + Release 0.23.6 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1458351&r1=1458350&r2=1458351&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Tue Mar 19 15:48:36 2013 @@ -357,13 +357,20 @@ class Fetcher extends Thread { return EMPTY_ATTEMPT_ID_ARRAY; } - // Go! - LOG.info("fetcher#" + id + " about to shuffle output of map " + - mapOutput.getMapId() + " decomp: " + - decompressedLength + " len: " + compressedLength + " to " + - mapOutput.getDescription()); - mapOutput.shuffle(host, input, compressedLength, decompressedLength, - metrics, reporter); + // The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError + // on decompression failures. Catching and re-throwing as IOException + // to allow fetch failure logic to be processed + try { + // Go! + LOG.info("fetcher#" + id + " about to shuffle output of map " + + mapOutput.getMapId() + " decomp: " + decompressedLength + + " len: " + compressedLength + " to " + mapOutput.getDescription()); + mapOutput.shuffle(host, input, compressedLength, decompressedLength, + metrics, reporter); + } catch (java.lang.InternalError e) { + LOG.warn("Failed to shuffle for fetcher#"+id, e); + throw new IOException(e); + } // Inform the shuffle scheduler long endTime = System.currentTimeMillis(); Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1458351&r1=1458350&r2=1458351&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Tue Mar 19 15:48:36 2013 @@ -25,6 +25,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.InputStream; import java.net.HttpURLConnection; import java.net.SocketTimeoutException; import java.net.URL; @@ -233,4 +234,62 @@ public class TestFetcher { verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); } -} + @SuppressWarnings("unchecked") + @Test(timeout=10000) + public void testCopyFromHostCompressFailure() throws Exception { + LOG.info("testCopyFromHostCompressFailure"); + JobConf job = new JobConf(); + TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1"); + ShuffleScheduler ss = mock(ShuffleScheduler.class); + MergeManagerImpl mm = mock(MergeManagerImpl.class); + InMemoryMapOutput immo = mock(InMemoryMapOutput.class); + Reporter r = mock(Reporter.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); + ExceptionReporter except = mock(ExceptionReporter.class); + SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0}); + HttpURLConnection connection = mock(HttpURLConnection.class); + + Counters.Counter allErrs = mock(Counters.Counter.class); + when(r.getCounter(anyString(), anyString())) + .thenReturn(allErrs); + + Fetcher underTest = new FakeFetcher(job, id, ss, mm, + r, metrics, except, key, connection); + + + MapHost host = new MapHost("localhost", "http://localhost:8080/"); + + ArrayList maps = new ArrayList(1); + TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1"); + maps.add(map1ID); + TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1"); + maps.add(map2ID); + when(ss.getMapsForHost(host)).thenReturn(maps); + + String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg="; + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) + .thenReturn(replyHash); + ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + header.write(new DataOutputStream(bout)); + ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); + when(connection.getInputStream()).thenReturn(in); + when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) + .thenReturn(immo); + + doThrow(new java.lang.InternalError()) + .when(immo) + .shuffle(any(MapHost.class), any(InputStream.class), anyLong(), + anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class)); + + underTest.copyFromHost(host); + + verify(connection) + .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, + encHash); + verify(ss, times(1)).copyFailed(map1ID, host, true, false); + } +} \ No newline at end of file