spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ben (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-19243) Error when selecting from DataFrame containing parsed data from files larger than 1MB
Date Mon, 16 Jan 2017 12:25:26 GMT
Ben created SPARK-19243:
---------------------------

             Summary: Error when selecting from DataFrame containing parsed data from files
larger than 1MB
                 Key: SPARK-19243
                 URL: https://issues.apache.org/jira/browse/SPARK-19243
             Project: Spark
          Issue Type: Bug
            Reporter: Ben


I hope I can describe the problem clearly. This error happens with Spark 2.0.1. However, I
tried with Spark 2.1.0 on my test PC and it worked there, none of the issues below, but I
can't try it on the test cluster because Spark needs to be upgraded there. I'm opening this
ticket because if it's a bug, maybe something is still partially present in Spark 2.1.0.

Initially I though it was my script's problem so I tried to debug, until I found why this
is happening.

Step by step, I load XML files through spark-xml into a DataFrame. In my case, the rowTag
is the root tag, so each XML file creates a row. The XML structure is fairly complex, which
are converted to nested columns or arrays inside the DF. Since I need to flatten the whole
table, and since the output is not fixed but I dynamically select what I want as output, in
case I need to output columns that have been parsed as arrays, then I explode them with explode()
only when needed.

Normally I can select various columns that don't have many entries without a problem. 
I select a column that has a lot of entries into a new DF, e.g. simply through
{noformat}
df2 = df.select(...)
{noformat}
and then if I try to do a count() or first() or anything, Spark behaves two ways:
1. If the source file was smaller than 1MB, it works.
2. If the source file was larger than 1MB, the following error occurs:

{noformat}
Traceback (most recent call last):
  File \"/myCode.py\", line 71, in main
    df.count()
  File \"/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/sql/dataframe.py\",
line 299, in count
    return int(self._jdf.count())
  File \"/usr/hdp/current/spark-client/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\",
line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File \"/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/sql/utils.py\", line
63, in deco
    return f(*a, **kw)
  File \"/usr/hdp/current/spark-client/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py\",
line 319, in get_return_value
    format(target_id, \".\", name), value)
Py4JJavaError: An error occurred while calling o180.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed
4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 6, compname): java.lang.IllegalArgumentException:
Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
  at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
  at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
  at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
  at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438)
  at org.apache.spark.storage.BlockManager.get(BlockManager.scala:606)
  at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:663)
  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
  at org.apache.spark.scheduler.Task.run(Task.scala:86)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
  at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2526)
  at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523)
  at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
  at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2523)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  at py4j.Gateway.invoke(Gateway.java:280)
  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  at py4j.commands.CallCommand.execute(CallCommand.java:79)
  at py4j.GatewayConnection.run(GatewayConnection.java:214)
  at java.lang.Thread.run(Thread.java:745)
  
Caused by: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
  at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
  at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
  at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
  at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438)
  at org.apache.spark.storage.BlockManager.get(BlockManager.scala:606)
  at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:663)
  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
  at org.apache.spark.scheduler.Task.run(Task.scala:86)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  ... 1 more
{noformat}

It doesn't happen only when calling count(), but anything.
As mentioned, this only happens when there are many entries (This only happens in my case
in only one configuration, when the column with the most entries is selected, a big difference
from the others, and these entries were in an array before, and have been exploded), otherwise
it works fine independently of the file size.

The same thing happens if the source is an archive containing the XML files. If the archive
itself is larger than 1MB, error; smaller, works.

The Spark log shows that the error is when calling e.g. count(), although in my script's log
I get this error after starting the select(...) command, but maybe that's because of the parallel
processing. Consequently, I'm not sure whether the error happens during the explode(), select(),
or for some reason after the DF has been prepared and I call e.g. count().
I have allocated more than enough memory. On another system I got a `Java heap space` error,
I guess on the way to getting the actual error.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message