spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Soumabrata Chakraborty (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-18650) race condition in FileScanRDD.scala
Date Tue, 13 Dec 2016 09:43:59 GMT

    [ https://issues.apache.org/jira/browse/SPARK-18650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15744695#comment-15744695
] 

Soumabrata Chakraborty edited comment on SPARK-18650 at 12/13/16 9:43 AM:
--------------------------------------------------------------------------

I am facing the same issue while trying to load a csv. I am using org.apache.spark:spark-core_2.10:2.0.2
and org.apache.spark:spark-sql_2.10:2.0.2 using the following to read the csv
Dataset<Row> dataset = sparkSession.read().schema(schema).options(options).csv(path)
 ;
The options contain "header":"false" and "sep":"|"

The issue is also reproduced on versions 2.0.0 and 2.0.1 in addition to 2.0.2


was (Author: soumabrata):
I am facing the same issue while trying to load a csv. I am using org.apache.spark:spark-core_2.10:2.0.2
and using the following to read the csv
Dataset<Row> dataset = sparkSession.read().schema(schema).options(options).csv(path)
 ;
The options contain "header":"false" and "sep":"|"

> race condition in FileScanRDD.scala
> -----------------------------------
>
>                 Key: SPARK-18650
>                 URL: https://issues.apache.org/jira/browse/SPARK-18650
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.2
>         Environment: scala 2.11
> macos 10.11.6
>            Reporter: Jay Goldman
>
> I am attempting to create a DataSet from a single CSV file :
>  val ss: SparkSession = ....
>  val ddr = ss.read.option("path", path)
> ... (choose between xml vs csv parsing)
>  var df = ddr.option("sep", ",")
>           .option("quote", "\"")
>           .option("escape", "\"") // want to retain backslashes (\) ...
>           .option("delimiter", ",")
>           .option("comment", "#")
>           .option("header", "true")
>           .option("format", "csv")
>            ddr.csv(path)
> df.count() returns 2 times the number of lines in the CSV file - i.e., each line of the
input file shows up as 2 rows in df. 
> moreover df.distinct.count has the correct rows.
> There appears to be a problem in FileScanRDD.compute. I am using spark version 2.0.1
with scala 2.11. I am not going to include the entire contents of FileScanRDD.scala here.
> In FileScanRDD.compute there is the following:
>  private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
> If i put a breakpoint in either FileScanRDD.compute or FIleScanRDD.nextIterator the resulting
dataset has the correct number of rows.
> Moreover, the code in FileScanRDD.scala is:
> private def nextIterator(): Boolean = {
>         updateBytesReadWithFileSize()
>         if (files.hasNext) { // breakpoint here => works
>           currentFile = files.next() // breakpoint here => fails
>           ....
>         }
>         else { .... }
> ....
> }
> if i put a breakpoint on the files.hasNext line all is well; however, if i put a breakpoint
on the files.next() line the code will fail when i continue because the files iterator has
become empty (see stack trace below). Disabling the breakpoint winds up creating a Dataset
with each line of the csv file duplicated.
> So it appears that multiple threads are using the files iterator or the underling split
value (an RDDPartition) and timing wise on my system 2 workers wind up processing the same
file, with the resulting DataSet having 2 copies of each of the input lines.
> This code is not active when parsing an XML file. 
> here is stack trace:
> java.util.NoSuchElementException: next on empty iterator
> 	at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
> 	at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
> 	at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:111)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:86)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 16/11/30 09:31:07 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting
job
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0
(TID 0, localhost): java.util.NoSuchElementException: next on empty iterator
> 	at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
> 	at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
> 	at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:111)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:86)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
> 	at scala.Option.foreach(Option.scala:257)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
> 	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
> 	at org.apache.spark.rdd.RDD.collect(RDD.scala:911)
> 	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
> 	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> 	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
> 	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
> 	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
> 	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2227)
> 	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2226)
> 	at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)
> 	at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
> 	at edu.mit.ll.bb.ingest.LogAsMaps$$anonfun$makeDataset$1.apply(LogAsMaps.scala:275)
> 	at edu.mit.ll.bb.ingest.LogAsMaps$$anonfun$makeDataset$1.apply(LogAsMaps.scala:232)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> 	at edu.mit.ll.bb.ingest.LogAsMaps$.makeDataset(LogAsMaps.scala:232)
> 	at edu.mit.ll.bb.ingest.LogAsMaps$.apply(LogAsMaps.scala:179)
> 	at edu.mit.ll.bb.ingest.LogAsMaps$.makeLog(LogAsMaps.scala:396)
> 	at edu.mit.ll.bb.app.BrowseApp$.delayedEndpoint$edu$mit$ll$bb$app$BrowseApp$1(BrowseApp.scala:103)
> 	at edu.mit.ll.bb.app.BrowseApp$delayedInit$body.apply(BrowseApp.scala:18)
> 	at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> 	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> 	at scala.App$$anonfun$main$1.apply(App.scala:76)
> 	at scala.App$$anonfun$main$1.apply(App.scala:76)
> 	at scala.collection.immutable.List.foreach(List.scala:381)
> 	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> 	at scala.App$class.main(App.scala:76)
> 	at edu.mit.ll.bb.app.SingleEventFileApp.main(SingleEventFileApp.scala:6)
> 	at edu.mit.ll.bb.app.BrowseApp.main(BrowseApp.scala)
> Caused by: java.util.NoSuchElementException: next on empty iterator
> 	at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
> 	at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
> 	at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:111)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:86)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Disconnected from the target VM, address: '127.0.0.1:64555', transport: 'socket'



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message