spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Steve Loughran (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-24492) Endless attempted task when TaskCommitDenied exception writing to S3A
Date Fri, 08 Jun 2018 13:17:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16506009#comment-16506009
] 

Steve Loughran commented on SPARK-24492:
----------------------------------------

the retry problem looks like something with the commit protocol's error handling; probably
merits closer look

But why is the exception being raised? That's SPARK-18883: S3 list inconsistency is making
your task attempt directory not appearing in S3 LIST calls, so the mimiced rename (list everything,
COPY *, DELETE *) is failing fast.

You cannot safely use S3 via the S3A connector as a safe destination of work without a consistency
layer (HADOOP-13345) or an S3-specific committer (SPARK-23977, HADOOP-13786). Even when the
task appears to succeed, the listing may have missed newly created files, so the input is
incorrect.

Without those or some other consistency layer (e.g consistent emrfs), you need to commit to
a consistent store (e.g. HDFS) then copy the results to s3 after. 


> Endless attempted task when TaskCommitDenied exception writing to S3A
> ---------------------------------------------------------------------
>
>                 Key: SPARK-24492
>                 URL: https://issues.apache.org/jira/browse/SPARK-24492
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Yu-Jhe Li
>            Priority: Critical
>         Attachments: retry_stage.png, 螢幕快照 2018-05-16 上午11.10.46.png, 螢幕快照
2018-05-16 上午11.10.57.png
>
>
> Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and output file
to S3, some tasks endless retry and all of them failed with TaskCommitDenied exception. This
happened when we run Spark application on some network issue instances. (it runs well on healthy
spot instances)
> Sorry, I can find a easy way to reproduce this issue, here's all I can provide.
> The Spark UI shows (in attachments) one task of stage 112 failed due to FetchFailedException
(it is network issue) and attempt to retry a new stage 112 (retry 1). But in stage 112 (retry
1), all task failed due to TaskCommitDenied exception, and keep retry (it never succeed and
cause lots of S3 requests).
> On the other side, driver logs shows:
>  # task 123.0 in stage 112.0 failed due to FetchFailedException (network issue cause
corrupted file)
>  # warning message from OutputCommitCoordinator
>  # task 92.0 in stage 112.1 failed when writing rows
>  # keep retry the failed tasks, but never succeed
> {noformat}
> 2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 (TID 42909,
10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 10.235.164.113, 60758, None), shuffleId=39,
mapId=59, reduceId=123, message=
> org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
>         at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
>         at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
>         at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
>         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>         at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
>         at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:80)
>         at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
>         at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Stream is corrupted
>         at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
>         at org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
>         at org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
>         at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
>         at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
>         at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
>         at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
>         at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
>         ... 31 more
> Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 11718 of input buffer
>         at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
>         at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:206)
>         ... 39 more
> )
> 2018-05-16 02:39:058 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=10; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:39:059 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=3; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:002 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=4; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:002 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=9; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:004 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=15; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:005 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=12; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:005 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=20; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:005 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=22; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:005 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=25; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:006 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=16; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:006 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=7; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:007 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=0; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:007 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=17; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:008 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=1; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:008 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=13; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:008 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=5; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:008 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=34; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:008 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=6; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:008 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=8; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:009 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=26; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:009 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=19; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:009 WARN  OutputCommitCoordinator:66 - Authorizing duplicate request
to commit for attemptNumber=0 to commit for stage=112, partition=14; existingCommitter = 0.
This can indicate dropped network traffic.
> 2018-05-16 02:40:026 WARN  TaskSetManager:66 - Lost task 92.0 in stage 112.1 (TID 43815,
10.124.39.217, executor 29): org.apache.spark.SparkException: Task failed while writing rows
>         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
>         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
>         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: No such file or directory: s3a://xxx/1526437970755/_temporary/0/_temporary/attempt_20180516023940_0112_m_000092_0
>         at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:993)
>         at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:734)
>         at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:426)
>         at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:539)
>         at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:502)
>         at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
>         at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:76)
>         at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:153)
>         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:260)
>         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
>         at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
>         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
>         ... 8 more
> 2018-05-16 02:40:026 WARN  JobProgressListener:66 - Task start for unknown stage 112
> 2018-05-16 02:40:026 WARN  TaskSetManager:66 - Lost task 51.0 in stage 112.1 (TID 43774,
10.26.158.82, executor 8): TaskCommitDenied (Driver denied task commit) for job: 112, partition:
51, attemptNumber: 0
> 2018-05-16 02:40:026 WARN  TaskSetManager:66 - Lost task 125.0 in stage 112.1 (TID 43848,
10.124.42.170, executor 84): TaskCommitDenied (Driver denied task commit) for job: 112, partition:
125, attemptNumber: 0
> 2018-05-16 02:40:026 WARN  TaskSetManager:66 - Lost task 122.0 in stage 112.1 (TID 43845,
172.31.18.157, executor 134): TaskCommitDenied (Driver denied task commit) for job: 112, partition:
122, attemptNumber: 0
> 2018-05-16 02:40:027 WARN  TaskSetManager:66 - Lost task 118.0 in stage 112.1 (TID 43841,
10.95.1.104, executor 100): TaskCommitDenied (Driver denied task commit) for job: 112, partition:
118, attemptNumber: 0
> 2018-05-16 02:40:027 WARN  TaskSetManager:66 - Lost task 79.0 in stage 112.1 (TID 43802,
172.31.22.115, executor 94): TaskCommitDenied (Driver denied task commit) for job: 112, partition:
79, attemptNumber: 0
> 2018-05-16 02:40:027 WARN  TaskSetManager:66 - Lost task 58.0 in stage 112.1 (TID 43781,
10.26.158.82, executor 8): TaskCommitDenied (Driver denied task commit) for job: 112, partition:
58, attemptNumber: 0
> 2018-05-16 02:41:008 WARN  TaskSetManager:66 - Lost task 51.1 in stage 112.1 (TID 46941,
172.31.26.185, executor 57): TaskCommitDenied (Driver denied task commit) for job: 112, partition:
51, attemptNumber: 1
> 2018-05-16 02:41:010 WARN  TaskSetManager:66 - Lost task 92.1 in stage 112.1 (TID 46937,
10.92.157.108, executor 153): TaskCommitDenied (Driver denied task commit) for job: 112, partition:
92, attemptNumber: 1
> 2018-05-16 02:41:010 WARN  TaskSetManager:66 - Lost task 125.1 in stage 112.1 (TID 46951,
10.31.223.165, executor 149): TaskCommitDenied (Driver denied task commit) for job: 112, partition:
125, attemptNumber: 1
> 2018-05-16 02:41:011 WARN  TaskSetManager:66 - Lost task 79.1 in stage 112.1 (TID 46975,
10.26.158.82, executor 8): TaskCommitDenied (Driver denied task commit) for job: 112, partition:
79, attemptNumber: 1
> 2018-05-16 02:41:013 WARN  TaskSetManager:66 - Lost task 58.1 in stage 112.1 (TID 46976,
10.29.28.124, executor 118): TaskCommitDenied (Driver denied task commit) for job: 112, partition:
58, attemptNumber: 1
> {noformat}
> I think we should have a way to avoid the endless attempt.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message