tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject tez git commit: TEZ-3196. java.lang.InternalError from decompression codec is fatal to a task during shuffle (jlowe)
Date Fri, 08 Apr 2016 14:12:33 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 1ba277c1c -> ad3065134


TEZ-3196. java.lang.InternalError from decompression codec is fatal to a task during shuffle
(jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ad306513
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ad306513
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ad306513

Branch: refs/heads/branch-0.7
Commit: ad3065134af65a84da408d332d6165357bd487c0
Parents: 1ba277c
Author: Jason Lowe <jlowe@apache.org>
Authored: Fri Apr 8 14:12:12 2016 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Fri Apr 8 14:12:12 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../library/common/shuffle/ShuffleUtils.java    |  9 ++++++
 .../common/shuffle/TestShuffleUtils.java        | 31 ++++++++++++++++++++
 3 files changed, 41 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ad306513/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4b204d0..77d0b5d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES:
+  TEZ-3196. java.lang.InternalError from decompression codec is fatal to a task during shuffle
   TEZ-3177. Non-DAG events should use the session domain or no domain if the data does not
need protection.
   TEZ-3192. IFile#checkState creating unnecessary objects though auto-boxing
   TEZ-3189. Pre-warm dags should not be counted in submitted dags count by DAGAppMaster.

http://git-wip-us.apache.org/repos/asf/tez/blob/ad306513/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index a4e95f7..60e53f8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -124,6 +124,15 @@ public class ShuffleUtils {
       ioCleanup(input);
       // Re-throw
       throw ioe;
+    } catch (InternalError e) {
+      // Close the streams
+      LOG.info("Failed to read data to memory for " + identifier + ". len=" + compressedLength
+
+          ", decomp=" + decompressedLength + ". ExceptionMessage=" + e.getMessage());
+      ioCleanup(input);
+      // The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
+      // on decompression failures. Catching and re-throwing as IOException
+      // to allow fetch failure logic to be processed.
+      throw new IOException(e);
     }
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/ad306513/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index 9f9cd59..4ac1bca 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -2,11 +2,15 @@ package org.apache.tez.runtime.library.common.shuffle;
 
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
@@ -26,13 +30,18 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Random;
 
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -252,4 +261,26 @@ public class TestShuffleUtils {
         .cardinality(), emptyPartitionsBitSet.cardinality() == 10);
 
   }
+
+  @Test
+  public void testInternalErrorTranslation() throws Exception {
+    String codecErrorMsg = "codec failure";
+    CompressionInputStream mockCodecStream = mock(CompressionInputStream.class);
+    when(mockCodecStream.read(any(byte[].class), anyInt(), anyInt()))
+        .thenThrow(new InternalError(codecErrorMsg));
+    Decompressor mockDecoder = mock(Decompressor.class);
+    CompressionCodec mockCodec = mock(CompressionCodec.class);
+    when(mockCodec.createDecompressor()).thenReturn(mockDecoder);
+    when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class)))
+        .thenReturn(mockCodecStream);
+    byte[] header = new byte[] { (byte) 'T', (byte) 'I', (byte) 'F', (byte) 1};
+    try {
+      ShuffleUtils.shuffleToMemory(new byte[1024], new ByteArrayInputStream(header),
+          1024, 128, mockCodec, false, 0, mock(Logger.class), "identifier");
+      Assert.fail("shuffle was supposed to throw!");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getCause() instanceof InternalError);
+      Assert.assertTrue(e.getMessage().contains(codecErrorMsg));
+    }
+  }
 }


Mime
View raw message