Return-Path: X-Original-To: apmail-spark-issues-archive@minotaur.apache.org Delivered-To: apmail-spark-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3F62517ECC for ; Thu, 7 May 2015 14:14:02 +0000 (UTC) Received: (qmail 76396 invoked by uid 500); 7 May 2015 14:14:01 -0000 Delivered-To: apmail-spark-issues-archive@spark.apache.org Received: (qmail 76308 invoked by uid 500); 7 May 2015 14:14:01 -0000 Mailing-List: contact issues-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@spark.apache.org Received: (qmail 76143 invoked by uid 99); 7 May 2015 14:14:01 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 May 2015 14:14:01 +0000 Date: Thu, 7 May 2015 14:14:00 +0000 (UTC) From: "Guillaume E.B. (JIRA)" To: issues@spark.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (SPARK-4105) FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/SPARK-4105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14532692#comment-14532692 ] Guillaume E.B. edited comment on SPARK-4105 at 5/7/15 2:13 PM: --------------------------------------------------------------- I have been able to reproduce this bug several times on my cluster composed of 4 VMs (6 proc, 32GB ram) using the classes I join. These classes are based on my business use case on which I first ran into the issue and the example given on this page, it is basically joining two flows with massive spilling on disk. I reproduce the bug starting from 8000 mappers generating each 100000 objects (first two arguments). This is equivalent to about 36GB of spilled data. With 2000 mappers, the bug did not happen. This issue occurs clearly for me when a massive volume of data is involved. The job takes 1 hour to run on my cluster with these parameters. Other paremeters : 18 executors, parallelism level = 36. was (Author: geynard): I have been able to reproduce this bug several times on my cluster composed of 4 VMs (6 proc, 32GB ram) using the classes I join. These classes are based on my business use case on which I first ran into the issue and the example given on this page, it is basically joining two flows with massive spilling on disk. I reproduce the bug starting from 8000 mappers generating each 100000 objects (first two arguments). This is equivalent to about 36GB of spilled data. With 2000 mappers, the bug did not happen. This issue occurs clearly for me when a massive volume of data is involved. > FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle > --------------------------------------------------------------------------------- > > Key: SPARK-4105 > URL: https://issues.apache.org/jira/browse/SPARK-4105 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core > Affects Versions: 1.2.0, 1.2.1, 1.3.0 > Reporter: Josh Rosen > Assignee: Josh Rosen > Priority: Blocker > Attachments: JavaObjectToSerialize.java, SparkFailedToUncompressGenerator.scala > > > We have seen non-deterministic {{FAILED_TO_UNCOMPRESS(5)}} errors during shuffle read. Here's a sample stacktrace from an executor: > {code} > 14/10/23 18:34:11 ERROR Executor: Exception in task 1747.3 in stage 11.0 (TID 33053) > java.io.IOException: FAILED_TO_UNCOMPRESS(5) > at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78) > at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) > at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:391) > at org.xerial.snappy.Snappy.uncompress(Snappy.java:427) > at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127) > at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) > at org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58) > at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) > at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1090) > at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:116) > at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:115) > at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243) > at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) > at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129) > at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) > at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) > at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {code} > Here's another occurrence of a similar error: > {code} > java.io.IOException: failed to read chunk > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:348) > org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159) > org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142) > java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310) > java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:2712) > java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2742) > java.io.ObjectInputStream.readArray(ObjectInputStream.java:1687) > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129) > org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) > org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > org.apache.spark.scheduler.Task.run(Task.scala:56) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > java.lang.Thread.run(Thread.java:745) > {code} > The first stacktrace was reported by a Spark user. The second stacktrace occurred when running > {code} > import java.util.Random > val numKeyValPairs=1000 > val numberOfMappers=200 > val keySize=10000 > for (i <- 0 to 19) { > val pairs1 = sc.parallelize(0 to numberOfMappers, numberOfMappers).flatMap(p=>{ > val randGen = new Random > val arr1 = new Array[(Int, Array[Byte])](numKeyValPairs) > for (i <- 0 until numKeyValPairs){ > val byteArr = new Array[Byte](keySize) > randGen.nextBytes(byteArr) > arr1(i) = (randGen.nextInt(Int.MaxValue),byteArr) > } > arr1 > }) > pairs1.groupByKey(numberOfMappers).count > } > {code} > This job frequently runs without any problems, but when it fails it seem that every post-shuffle task fails with either PARSING_ERROR(2), FAILED_TO_UNCOMPRESS(5), or some other decompression error. I've seen reports of similar problems when using LZF compression, so I think that this is caused by some sort of general stream corruption issue. > This issue has been observed even when no spilling occurs, so I don't believe that this is due to a bug in spilling code. > I was unable to reproduce this when running this code in a fresh Spark EC2 cluster and we've been having a hard time finding a deterministic reproduction. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org For additional commands, e-mail: issues-help@spark.apache.org