spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Davies Liu <dav...@databricks.com>
Subject Re: pyspark: results differ based on whether persist() has been called
Date Mon, 19 Oct 2015 17:40:49 GMT
This should be fixed by
https://github.com/apache/spark/commit/a367840834b97cd6a9ecda568bb21ee6dc35fcde

Will be released as 1.5.2 soon.

On Mon, Oct 19, 2015 at 9:04 AM, peay2 <peay168@yahoo.fr> wrote:
> Hi,
>
> I am getting some very strange results, where I get different results based
> on whether or not I call persist() on a data frame or not before
> materialising it.
>
> There's probably something obvious I am missing, as only very simple
> operations are involved here. Any help with this would be greatly
> appreciated. I have a simple data-frame with IDs and values:
>
>     data_dict = {'id': {k: str(k) for k in range(99)}, 'value':
> dict(enumerate(['A'] * 4 + ['B'] * 46 + ['C'] * 49))}
>     df_small = pd.DataFrame(data_dict)
>     records = sqlContext.createDataFrame(df_small)
>     records.printSchema()
>
>     # root
>     # |-- id: string (nullable = true)
>     # |-- value: string (nullable = true)
>
> Now, I left outer join over the IDs -- here, using a dummy constant column
> on the right instead of a separate data-frame (enough to reproduce my
> issue):
>
>     unique_ids = records.select("id").dropDuplicates()
>     id_names = unique_ids.select(F.col("id").alias("id_join"),
> F.lit("xxx").alias("id_name"))
>
>     df_joined = records.join(id_names, records['id'] == id_names['id_join'],
> "left_outer").drop("id_join")
>
> At this point, *doing a show on df_joined* indicates all is fine: all
> records are there as expected, for instance:
>
>     df_joined[(df_joined['id'] > 60) & (df_joined['id'] < 70)].show()
>     +---+-----+-------+
>     | id|value|id_name|
>     +---+-----+-------+
>     | 61|    C|    xxx|
>     | 62|    C|    xxx|
>     | 63|    C|    xxx|
>     | 64|    C|    xxx|
>     ...
>
> However, if I filter for a given value and then group by ID, I do not get
> back all of the groups:
>
>     def print_unique_ids(df):
>        filtered = df[df["value"] == "C"]
>        plan = filtered.groupBy("id").count().select("id")
>        unique_ids = list(plan.toPandas()["id"])
>
>        print "{0} IDs: {1}\n".format(len(unique_ids), sorted(unique_ids))
>        print plan.rdd.toDebugString() + "\n"
>
>     print_unique_ids(df_joined.unpersist())
>     print_unique_ids(df_joined.persist())
>
>     49 IDs: [u'50', u'51', u'52', u'53', u'54', u'55', u'56', u'57', u'58',
> u'59', u'60', u'61', u'62', u'63', u'64', u'65', u'66', u'67', u'68', u'69',
> u'70', u'71', u'72', u'73', u'74', u'75', u'76', u'77', u'78', u'79', u'80',
> u'81', u'82', u'83', u'84', u'85', u'86', u'87', u'88', u'89', u'90', u'91',
> u'92', u'93', u'94', u'95', u'96', u'97', u'98']
>
>     46 IDs: [u'50', u'51', u'52', u'53', u'54', u'55', u'56', u'57', u'58',
> u'59', u'60', u'61', u'62', u'66', u'67', u'68', u'69', u'70', u'71', u'72',
> u'73', u'74', u'75', u'76', u'77', u'78', u'79', u'80', u'81', u'82', u'83',
> u'84', u'85', u'86', u'87', u'88', u'89', u'90', u'91', u'92', u'93', u'94',
> u'95', u'96', u'97', u'98']
>
> Note how here IDs 43, 44, 45 are missing when persist() has been called. The
> output is correct if the data-frame has not been marked for persistance, but
> incorrect after the call to persist.
>
> When persist() has been called, Tungsten seems to be involved, but not if
> the data-frame has not been persisted. I am including the full outputs of
> toDebugString below.
>
> Has anyone any idea what is going on here?
>
> In case this helps: I see no issue if I don't do the dummy join, or if I
> don't filter for value == "C". I have a default spark config, besides
> "spark.shuffle.consolidateFiles=true", and spark 1.5.1.
>
> Thanks a lot!
>
> - Without persist:
>
> (200) MapPartitionsRDD[26] at javaToPython at
> NativeMethodAccessorImpl.java:-2 []
>   |   MapPartitionsRDD[25] at javaToPython at
> NativeMethodAccessorImpl.java:-2 []
>   |   MapPartitionsWithPreparationRDD[22] at toPandas at
> <ipython-input-2-xxx>:25 []
>   |   MapPartitionsWithPreparationRDD[21] at toPandas at
> <ipython-input-2-xxx>:25 []
>   |   MapPartitionsRDD[20] at toPandas at <ipython-input-2-xxx>:25 []
>   |   ZippedPartitionsRDD2[19] at toPandas at <ipython-input-2-xxx>:25 []
>   |   MapPartitionsWithPreparationRDD[9] at toPandas at
> <ipython-input-2-xxx>:25 []
>   |   ShuffledRowRDD[8] at toPandas at <ipython-input-2-xxx>:25 []
>   +-(2) MapPartitionsRDD[7] at toPandas at <ipython-input-2-xxx>:25 []
>      |  MapPartitionsRDD[6] at toPandas at <ipython-input-2-xxx>:25 []
>      |  MapPartitionsRDD[5] at toPandas at <ipython-input-2-xxx>:25 []
>      |  MapPartitionsRDD[4] at applySchemaToPythonRDD at
> NativeMethodAccessorImpl.java:-2 []
>      |  MapPartitionsRDD[3] at map at SerDeUtil.scala:100 []
>      |  MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:147 []
>      |  PythonRDD[1] at RDD at PythonRDD.scala:43 []
>      |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423 []
>   |   MapPartitionsWithPreparationRDD[18] at toPandas at
> <ipython-input-2-xxx>:25 []
>   |   ShuffledRowRDD[17] at toPandas at <ipython-input-2-xxx>:25 []
>   +-(200) MapPartitionsRDD[16] at toPandas at <ipython-input-2-xxx>:25 []
>       |   MapPartitionsRDD[15] at toPandas at <ipython-input-2-xxx>:25 []
>       |   MapPartitionsWithPreparationRDD[14] at toPandas at
> <ipython-input-2-xxx>:25 []
>       |   ShuffledRowRDD[13] at toPandas at <ipython-input-2-xxx>:25 []
>       +-(2) MapPartitionsRDD[12] at toPandas at <ipython-input-2-xxx>:25 []
>          |  MapPartitionsWithPreparationRDD[11] at toPandas at
> <ipython-input-2-xxx>:25 []
>          |  MapPartitionsRDD[10] at toPandas at <ipython-input-2-xxx>:25 []
>          |  MapPartitionsRDD[4] at applySchemaToPythonRDD at
> NativeMethodAccessorImpl.java:-2 []
>          |  MapPartitionsRDD[3] at map at SerDeUtil.scala:100 []
>          |  MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:147 []
>          |  PythonRDD[1] at RDD at PythonRDD.scala:43 []
>          |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423
> []
>
> - With persist:
>
> (200) MapPartitionsRDD[52] at javaToPython at
> NativeMethodAccessorImpl.java:-2 []
>   |   MapPartitionsRDD[51] at javaToPython at
> NativeMethodAccessorImpl.java:-2 []
>   |   MapPartitionsWithPreparationRDD[48] at toPandas at
> <ipython-input-2-xxx>:25 []
>   |   ShuffledRowRDD[47] at toPandas at <ipython-input-2-xxx>:25 []
>   +-(200) MapPartitionsRDD[46] at toPandas at <ipython-input-2-xxx>:25 []
>       |   MapPartitionsWithPreparationRDD[45] at toPandas at
> <ipython-input-2-xxx>:25 []
>       |   MapPartitionsRDD[44] at toPandas at <ipython-input-2-xxx>:25 []
>       |   MapPartitionsRDD[43] at toPandas at <ipython-input-2-xxx>:25 []
>       |   TungstenProject [id#0,value#1,id_name#3]
>  SortMergeOuterJoin [id#0], [id_join#2], LeftOuter, None
>   TungstenSort [id#0 ASC], false, 0
>    TungstenExchange hashpartitioning(id#0)
>     ConvertToUnsafe
>      Scan PhysicalRDD[id#0,value#1]
>   TungstenSort [id_join#2 ASC], false, 0
>    TungstenExchange hashpartitioning(id_join#2)
>     TungstenProject [id#0 AS id_join#2,xxx AS id_name#3]
>      TungstenAggregate(key=[id#0], functions=[], output=[id#0])
>       TungstenExchange hashpartitioning(id#0)
>        TungstenAggregate(key=[id#0], functions=[], output=[id#0])
>         TungstenProject [id#0]
>          Scan PhysicalRDD[id#0,value#1]
>  MapPartitionsRDD[42] at persist at NativeMethodAccessorImpl.java:-2 []
>       |       CachedPartitions: 200; MemorySize: 54.0 KB;
> ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
>       |   MapPartitionsRDD[41] at persist at
> NativeMethodAccessorImpl.java:-2 []
>       |   ZippedPartitionsRDD2[40] at persist at
> NativeMethodAccessorImpl.java:-2 []
>       |   MapPartitionsWithPreparationRDD[30] at persist at
> NativeMethodAccessorImpl.java:-2 []
>       |   ShuffledRowRDD[29] at persist at NativeMethodAccessorImpl.java:-2
> []
>       +-(2) MapPartitionsRDD[28] at persist at
> NativeMethodAccessorImpl.java:-2 []
>          |  MapPartitionsRDD[27] at persist at
> NativeMethodAccessorImpl.java:-2 []
>          |  MapPartitionsRDD[4] at applySchemaToPythonRDD at
> NativeMethodAccessorImpl.java:-2 []
>          |  MapPartitionsRDD[3] at map at SerDeUtil.scala:100 []
>          |  MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:147 []
>          |  PythonRDD[1] at RDD at PythonRDD.scala:43 []
>          |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423
> []
>       |   MapPartitionsWithPreparationRDD[39] at persist at
> NativeMethodAccessorImpl.java:-2 []
>       |   ShuffledRowRDD[38] at persist at NativeMethodAccessorImpl.java:-2
> []
>       +-(200) MapPartitionsRDD[37] at persist at
> NativeMethodAccessorImpl.java:-2 []
>           |   MapPartitionsRDD[36] at persist at
> NativeMethodAccessorImpl.java:-2 []
>           |   MapPartitionsWithPreparationRDD[35] at persist at
> NativeMethodAccessorImpl.java:-2 []
>           |   ShuffledRowRDD[34] at persist at
> NativeMethodAccessorImpl.java:-2 []
>           +-(2) MapPartitionsRDD[33] at persist at
> NativeMethodAccessorImpl.java:-2 []
>              |  MapPartitionsWithPreparationRDD[32] at persist at
> NativeMethodAccessorImpl.java:-2 []
>              |  MapPartitionsRDD[31] at persist at
> NativeMethodAccessorImpl.java:-2 []
>              |  MapPartitionsRDD[4] at applySchemaToPythonRDD at
> NativeMethodAccessorImpl.java:-2 []
>              |  MapPartitionsRDD[3] at map at SerDeUtil.scala:100 []
>              |  MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:147
> []
>              |  PythonRDD[1] at RDD at PythonRDD.scala:43 []
>              |  ParallelCollectionRDD[0] at parallelize at
> PythonRDD.scala:423 []
>
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-results-differ-based-on-whether-persist-has-been-called-tp25121.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message