spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cheng Lian (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-9627) SQL job failed if the dataframe with string columns is cached
Date Tue, 18 Aug 2015 12:12:45 GMT

    [ https://issues.apache.org/jira/browse/SPARK-9627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14701167#comment-14701167
] 

Cheng Lian commented on SPARK-9627:
-----------------------------------

[~davies] I tried to reproduce this issue locally at a smaller scale with master revision
dd0614fd618ad28cb77aecfbd49bb319b98fdba0, and it works fine. Could you help verifying whether
it's still a problem now? I replaced {{1 << 24}} with {{1 << 20}} and used {{--driver-memory
4g --executor-memory 4g}} while starting PySpark.

> SQL job failed if the dataframe with string columns is cached
> -------------------------------------------------------------
>
>                 Key: SPARK-9627
>                 URL: https://issues.apache.org/jira/browse/SPARK-9627
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.0
>            Reporter: Davies Liu
>            Assignee: Cheng Lian
>            Priority: Blocker
>
> {code}
> r = random.Random()
> def gen(i):
>     d = date.today() - timedelta(r.randint(0, 5000))
>     cat = str(r.randint(0, 20)) * 5
>     c = r.randint(0, 1000)
>     price = decimal.Decimal(r.randint(0, 100000)) / 100
>     return (d, cat, c, price)
> schema = StructType().add('date', DateType()).add('cat', StringType()).add('count', ShortType()).add('price',
DecimalType(5, 2))
> #df = sqlContext.createDataFrame(sc.range(1<<24).map(gen), schema)
> #df.show()
> #df.write.parquet('sales4')
> df = sqlContext.read.parquet('sales4')
> df.cache()
> df.count()
> df.show()
> print df.schema
> raw_input()
> r = df.groupBy(df.date, df.cat).agg(sum(df['count'] * df.price))
> print r.explain(True)
> r.show()
> {code}
> {code}
> StructType(List(StructField(date,DateType,true),StructField(cat,StringType,true),StructField(count,ShortType,true),StructField(price,DecimalType(5,2),true)))
> == Parsed Logical Plan ==
> 'Aggregate [date#0,cat#1], [date#0,cat#1,sum((count#2 * price#3)) AS sum((count * price))#70]
>  Relation[date#0,cat#1,count#2,price#3] org.apache.spark.sql.parquet.ParquetRelation@5ec8f315
> == Analyzed Logical Plan ==
> date: date, cat: string, sum((count * price)): decimal(21,2)
> Aggregate [date#0,cat#1], [date#0,cat#1,sum((change_decimal_precision(CAST(CAST(count#2,
DecimalType(5,0)), DecimalType(11,2))) * change_decimal_precision(CAST(price#3, DecimalType(11,2)))))
AS sum((count * price))#70]
>  Relation[date#0,cat#1,count#2,price#3] org.apache.spark.sql.parquet.ParquetRelation@5ec8f315
> == Optimized Logical Plan ==
> Aggregate [date#0,cat#1], [date#0,cat#1,sum((change_decimal_precision(CAST(CAST(count#2,
DecimalType(5,0)), DecimalType(11,2))) * change_decimal_precision(CAST(price#3, DecimalType(11,2)))))
AS sum((count * price))#70]
>  InMemoryRelation [date#0,cat#1,count#2,price#3], true, 10000, StorageLevel(true, true,
false, true, 1), (PhysicalRDD [date#0,cat#1,count#2,price#3], MapPartitionsRDD[3] at), None
> == Physical Plan ==
> NewAggregate with SortBasedAggregationIterator List(date#0, cat#1) ArrayBuffer((sum((change_decimal_precision(CAST(CAST(count#2,
DecimalType(5,0)), DecimalType(11,2))) * change_decimal_precision(CAST(price#3, DecimalType(11,2)))))2,mode=Final,isDistinct=false))
>  TungstenSort [date#0 ASC,cat#1 ASC], false, 0
>   ConvertToUnsafe
>    Exchange hashpartitioning(date#0,cat#1)
>     NewAggregate with SortBasedAggregationIterator List(date#0, cat#1) ArrayBuffer((sum((change_decimal_precision(CAST(CAST(count#2,
DecimalType(5,0)), DecimalType(11,2))) * change_decimal_precision(CAST(price#3, DecimalType(11,2)))))2,mode=Partial,isDistinct=false))
>      TungstenSort [date#0 ASC,cat#1 ASC], false, 0
>       ConvertToUnsafe
>        InMemoryColumnarTableScan [date#0,cat#1,count#2,price#3], (InMemoryRelation [date#0,cat#1,count#2,price#3],
true, 10000, StorageLevel(true, true, false, true, 1), (PhysicalRDD [date#0,cat#1,count#2,price#3],
MapPartitionsRDD[3] at), None)
> Code Generation: true
> == RDD ==
> None
> 15/08/04 23:21:53 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting
job
> Traceback (most recent call last):
>   File "t.py", line 34, in <module>
>     r.show()
>   File "/Users/davies/work/spark/python/pyspark/sql/dataframe.py", line 258, in show
>     print(self._jdf.showString(n, truncate))
>   File "/Users/davies/work/spark/python/lib/py4j/java_gateway.py", line 538, in __call__
>     self.target_id, self.name)
>   File "/Users/davies/work/spark/python/pyspark/sql/utils.py", line 36, in deco
>     return f(*a, **kw)
>   File "/Users/davies/work/spark/python/lib/py4j/protocol.py", line 300, in get_return_value
>     format(target_id, '.', name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o36.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage
4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 10, localhost): java.lang.UnsupportedOperationException:
tail of empty list
> 	at scala.collection.immutable.Nil$.tail(List.scala:339)
> 	at scala.collection.immutable.Nil$.tail(List.scala:334)
> 	at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172)
> 	at scala.reflect.internal.Symbols$Symbol.typeParams(Symbols.scala:1491)
> 	at scala.reflect.internal.Types$NoArgsTypeRef.typeParams(Types.scala:2144)
> 	at scala.reflect.internal.Types$TypeRef.initializedTypeParams(Types.scala:2408)
> 	at scala.reflect.internal.Types$TypeRef.typeParamsMatchArgs(Types.scala:2409)
> 	at scala.reflect.internal.Types$AliasTypeRef$class.dealias(Types.scala:2232)
> 	at scala.reflect.internal.Types$TypeRef$$anon$3.dealias(Types.scala:2539)
> 	at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1256)
> 	at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:202)
> 	at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:65)
> 	at org.apache.spark.sql.columnar.compression.DictionaryEncoding$Decoder.<init>(compressionSchemes.scala:277)
> 	at org.apache.spark.sql.columnar.compression.DictionaryEncoding$.decoder(compressionSchemes.scala:185)
> 	at org.apache.spark.sql.columnar.compression.DictionaryEncoding$.decoder(compressionSchemes.scala:177)
> 	at org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor$class.initialize(CompressibleColumnAccessor.scala:31)
> 	at org.apache.spark.sql.columnar.NativeColumnAccessor.initialize(ColumnAccessor.scala:64)
> 	at org.apache.spark.sql.columnar.ColumnAccessor$class.$init$(ColumnAccessor.scala:33)
> 	at org.apache.spark.sql.columnar.BasicColumnAccessor.<init>(ColumnAccessor.scala:44)
> 	at org.apache.spark.sql.columnar.NativeColumnAccessor.<init>(ColumnAccessor.scala:64)
> 	at org.apache.spark.sql.columnar.StringColumnAccessor.<init>(ColumnAccessor.scala:92)
> 	at org.apache.spark.sql.columnar.ColumnAccessor$.apply(ColumnAccessor.scala:130)
> 	at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anonfun$15.apply(InMemoryColumnarTableScan.scala:300)
> 	at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anonfun$15.apply(InMemoryColumnarTableScan.scala:299)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.immutable.List.foreach(List.scala:318)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> 	at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14.apply(InMemoryColumnarTableScan.scala:299)
> 	at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14.apply(InMemoryColumnarTableScan.scala:297)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:173)
> 	at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:146)
> 	at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:126)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:88)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message