spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Harish (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-19243) Error when selecting from DataFrame containing parsed data from files larger than 1MB
Date Wed, 03 May 2017 19:53:04 GMT

    [ https://issues.apache.org/jira/browse/SPARK-19243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995489#comment-15995489
] 

Harish edited comment on SPARK-19243 at 5/3/17 7:52 PM:
--------------------------------------------------------

i am getting the same error in spark 2.1.0. 
I have 10 node cluster with 109GB each.
My data set is just 30K rows with 60 columns. I see total 72 partitions after loading the
orc file to DF. then re-partitioned to 2001. No luck.

[~srowen]  did any one raised the similar issue?

Regards,
 Harish


was (Author: harishk15):
i am getting the same error in spark 2.1.0. 
I have 10 node cluster with 109GB each.
My data set is just 30K rows with 60 columns. I see total 72 partitions after loading the
orc file to DF. then re-partitioned to 2001. No luck.

Regards,
 Harish

> Error when selecting from DataFrame containing parsed data from files larger than 1MB
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-19243
>                 URL: https://issues.apache.org/jira/browse/SPARK-19243
>             Project: Spark
>          Issue Type: Bug
>            Reporter: Ben
>
> I hope I can describe the problem clearly. This error happens with Spark 2.0.1. However,
I tried with Spark 2.1.0 on my test PC and it worked there, none of the issues below, but
I can't try it on the test cluster because Spark needs to be upgraded there. I'm opening this
ticket because if it's a bug, maybe something is still partially present in Spark 2.1.0.
> Initially I though it was my script's problem so I tried to debug, until I found why
this is happening.
> Step by step, I load XML files through spark-xml into a DataFrame. In my case, the rowTag
is the root tag, so each XML file creates a row. The XML structure is fairly complex, which
are converted to nested columns or arrays inside the DF. Since I need to flatten the whole
table, and since the output is not fixed but I dynamically select what I want as output, in
case I need to output columns that have been parsed as arrays, then I explode them with explode()
only when needed.
> Normally I can select various columns that don't have many entries without a problem.

> I select a column that has a lot of entries into a new DF, e.g. simply through
> {noformat}
> df2 = df.select(...)
> {noformat}
> and then if I try to do a count() or first() or anything, Spark behaves two ways:
> 1. If the source file was smaller than 1MB, it works.
> 2. If the source file was larger than 1MB, the following error occurs:
> {noformat}
> Traceback (most recent call last):
>   File \"/myCode.py\", line 71, in main
>     df.count()
>   File \"/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/sql/dataframe.py\",
line 299, in count
>     return int(self._jdf.count())
>   File \"/usr/hdp/current/spark-client/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\",
line 1133, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File \"/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/sql/utils.py\",
line 63, in deco
>     return f(*a, **kw)
>   File \"/usr/hdp/current/spark-client/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py\",
line 319, in get_return_value
>     format(target_id, \".\", name), value)
> Py4JJavaError: An error occurred while calling o180.count.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage
3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 6, compname): java.lang.IllegalArgumentException:
Size exceeds Integer.MAX_VALUE
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
>   at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
>   at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
>   at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438)
>   at org.apache.spark.storage.BlockManager.get(BlockManager.scala:606)
>   at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:663)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
>   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>   at scala.Option.foreach(Option.scala:257)
>   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
>   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
>   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
>   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
>   at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
>   at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
>   at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2526)
>   at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523)
>   at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523)
>   at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>   at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
>   at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2523)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at py4j.Gateway.invoke(Gateway.java:280)
>   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at py4j.GatewayConnection.run(GatewayConnection.java:214)
>   at java.lang.Thread.run(Thread.java:745)
>   
> Caused by: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
>   at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
>   at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
>   at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438)
>   at org.apache.spark.storage.BlockManager.get(BlockManager.scala:606)
>   at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:663)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more
> {noformat}
> It doesn't happen only when calling count(), but anything.
> As mentioned, this only happens when there are many entries (This only happens in my
case in only one configuration, when the column with the most entries is selected, a big difference
from the others, and these entries were in an array before, and have been exploded), otherwise
it works fine independently of the file size.
> The same thing happens if the source is an archive containing the XML files. If the archive
itself is larger than 1MB, error; smaller, works.
> The Spark log shows that the error is when calling e.g. count(), although in my script's
log I get this error after starting the select(...) command, but maybe that's because of the
parallel processing. Consequently, I'm not sure whether the error happens during the explode(),
select(), or for some reason after the DF has been prepared and I call e.g. count().
> I have allocated more than enough memory. On another system I got a `Java heap space`
error, I guess on the way to getting the actual error.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message