Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 20BE118C17 for ; Mon, 19 Oct 2015 16:05:05 +0000 (UTC) Received: (qmail 71844 invoked by uid 500); 19 Oct 2015 16:05:01 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 71763 invoked by uid 500); 19 Oct 2015 16:05:01 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 71753 invoked by uid 99); 19 Oct 2015 16:05:00 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Oct 2015 16:05:00 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 74C0C180E26 for ; Mon, 19 Oct 2015 16:05:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.417 X-Spam-Level: *** X-Spam-Status: No, score=3.417 tagged_above=-999 required=6.31 tests=[DKIM_ADSP_CUSTOM_MED=0.001, FREEMAIL_ENVFROM_END_DIGIT=0.25, NML_ADSP_CUSTOM_MED=1.2, SPF_NEUTRAL=0.652, URIBL_BLOCKED=0.001, URI_HEX=1.313] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id vu_OiX0l7ies for ; Mon, 19 Oct 2015 16:04:52 +0000 (UTC) Received: from mwork.nabble.com (mwork.nabble.com [162.253.133.43]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTP id 89A12439C4 for ; Mon, 19 Oct 2015 16:04:52 +0000 (UTC) Received: from mben.nabble.com (unknown [162.253.133.72]) by mwork.nabble.com (Postfix) with ESMTP id 7E41F2B833D4 for ; Mon, 19 Oct 2015 09:05:43 -0700 (PDT) Date: Mon, 19 Oct 2015 09:04:52 -0700 (MST) From: peay2 To: user@spark.apache.org Message-ID: <1445270692135-25121.post@n3.nabble.com> Subject: pyspark: results differ based on whether persist() has been called MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit Hi, I am getting some very strange results, where I get different results based on whether or not I call persist() on a data frame or not before materialising it. There's probably something obvious I am missing, as only very simple operations are involved here. Any help with this would be greatly appreciated. I have a simple data-frame with IDs and values: data_dict = {'id': {k: str(k) for k in range(99)}, 'value': dict(enumerate(['A'] * 4 + ['B'] * 46 + ['C'] * 49))} df_small = pd.DataFrame(data_dict) records = sqlContext.createDataFrame(df_small) records.printSchema() # root # |-- id: string (nullable = true) # |-- value: string (nullable = true) Now, I left outer join over the IDs -- here, using a dummy constant column on the right instead of a separate data-frame (enough to reproduce my issue): unique_ids = records.select("id").dropDuplicates() id_names = unique_ids.select(F.col("id").alias("id_join"), F.lit("xxx").alias("id_name")) df_joined = records.join(id_names, records['id'] == id_names['id_join'], "left_outer").drop("id_join") At this point, *doing a show on df_joined* indicates all is fine: all records are there as expected, for instance: df_joined[(df_joined['id'] > 60) & (df_joined['id'] < 70)].show() +---+-----+-------+ | id|value|id_name| +---+-----+-------+ | 61| C| xxx| | 62| C| xxx| | 63| C| xxx| | 64| C| xxx| ... However, if I filter for a given value and then group by ID, I do not get back all of the groups: def print_unique_ids(df): filtered = df[df["value"] == "C"] plan = filtered.groupBy("id").count().select("id") unique_ids = list(plan.toPandas()["id"]) print "{0} IDs: {1}\n".format(len(unique_ids), sorted(unique_ids)) print plan.rdd.toDebugString() + "\n" print_unique_ids(df_joined.unpersist()) print_unique_ids(df_joined.persist()) 49 IDs: [u'50', u'51', u'52', u'53', u'54', u'55', u'56', u'57', u'58', u'59', u'60', u'61', u'62', u'63', u'64', u'65', u'66', u'67', u'68', u'69', u'70', u'71', u'72', u'73', u'74', u'75', u'76', u'77', u'78', u'79', u'80', u'81', u'82', u'83', u'84', u'85', u'86', u'87', u'88', u'89', u'90', u'91', u'92', u'93', u'94', u'95', u'96', u'97', u'98'] 46 IDs: [u'50', u'51', u'52', u'53', u'54', u'55', u'56', u'57', u'58', u'59', u'60', u'61', u'62', u'66', u'67', u'68', u'69', u'70', u'71', u'72', u'73', u'74', u'75', u'76', u'77', u'78', u'79', u'80', u'81', u'82', u'83', u'84', u'85', u'86', u'87', u'88', u'89', u'90', u'91', u'92', u'93', u'94', u'95', u'96', u'97', u'98'] Note how here IDs 43, 44, 45 are missing when persist() has been called. The output is correct if the data-frame has not been marked for persistance, but incorrect after the call to persist. When persist() has been called, Tungsten seems to be involved, but not if the data-frame has not been persisted. I am including the full outputs of toDebugString below. Has anyone any idea what is going on here? In case this helps: I see no issue if I don't do the dummy join, or if I don't filter for value == "C". I have a default spark config, besides "spark.shuffle.consolidateFiles=true", and spark 1.5.1. Thanks a lot! - Without persist: (200) MapPartitionsRDD[26] at javaToPython at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsRDD[25] at javaToPython at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsWithPreparationRDD[22] at toPandas at :25 [] | MapPartitionsWithPreparationRDD[21] at toPandas at :25 [] | MapPartitionsRDD[20] at toPandas at :25 [] | ZippedPartitionsRDD2[19] at toPandas at :25 [] | MapPartitionsWithPreparationRDD[9] at toPandas at :25 [] | ShuffledRowRDD[8] at toPandas at :25 [] +-(2) MapPartitionsRDD[7] at toPandas at :25 [] | MapPartitionsRDD[6] at toPandas at :25 [] | MapPartitionsRDD[5] at toPandas at :25 [] | MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsRDD[3] at map at SerDeUtil.scala:100 [] | MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:147 [] | PythonRDD[1] at RDD at PythonRDD.scala:43 [] | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423 [] | MapPartitionsWithPreparationRDD[18] at toPandas at :25 [] | ShuffledRowRDD[17] at toPandas at :25 [] +-(200) MapPartitionsRDD[16] at toPandas at :25 [] | MapPartitionsRDD[15] at toPandas at :25 [] | MapPartitionsWithPreparationRDD[14] at toPandas at :25 [] | ShuffledRowRDD[13] at toPandas at :25 [] +-(2) MapPartitionsRDD[12] at toPandas at :25 [] | MapPartitionsWithPreparationRDD[11] at toPandas at :25 [] | MapPartitionsRDD[10] at toPandas at :25 [] | MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsRDD[3] at map at SerDeUtil.scala:100 [] | MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:147 [] | PythonRDD[1] at RDD at PythonRDD.scala:43 [] | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423 [] - With persist: (200) MapPartitionsRDD[52] at javaToPython at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsRDD[51] at javaToPython at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsWithPreparationRDD[48] at toPandas at :25 [] | ShuffledRowRDD[47] at toPandas at :25 [] +-(200) MapPartitionsRDD[46] at toPandas at :25 [] | MapPartitionsWithPreparationRDD[45] at toPandas at :25 [] | MapPartitionsRDD[44] at toPandas at :25 [] | MapPartitionsRDD[43] at toPandas at :25 [] | TungstenProject [id#0,value#1,id_name#3] SortMergeOuterJoin [id#0], [id_join#2], LeftOuter, None TungstenSort [id#0 ASC], false, 0 TungstenExchange hashpartitioning(id#0) ConvertToUnsafe Scan PhysicalRDD[id#0,value#1] TungstenSort [id_join#2 ASC], false, 0 TungstenExchange hashpartitioning(id_join#2) TungstenProject [id#0 AS id_join#2,xxx AS id_name#3] TungstenAggregate(key=[id#0], functions=[], output=[id#0]) TungstenExchange hashpartitioning(id#0) TungstenAggregate(key=[id#0], functions=[], output=[id#0]) TungstenProject [id#0] Scan PhysicalRDD[id#0,value#1] MapPartitionsRDD[42] at persist at NativeMethodAccessorImpl.java:-2 [] | CachedPartitions: 200; MemorySize: 54.0 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B | MapPartitionsRDD[41] at persist at NativeMethodAccessorImpl.java:-2 [] | ZippedPartitionsRDD2[40] at persist at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsWithPreparationRDD[30] at persist at NativeMethodAccessorImpl.java:-2 [] | ShuffledRowRDD[29] at persist at NativeMethodAccessorImpl.java:-2 [] +-(2) MapPartitionsRDD[28] at persist at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsRDD[27] at persist at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsRDD[3] at map at SerDeUtil.scala:100 [] | MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:147 [] | PythonRDD[1] at RDD at PythonRDD.scala:43 [] | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423 [] | MapPartitionsWithPreparationRDD[39] at persist at NativeMethodAccessorImpl.java:-2 [] | ShuffledRowRDD[38] at persist at NativeMethodAccessorImpl.java:-2 [] +-(200) MapPartitionsRDD[37] at persist at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsRDD[36] at persist at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsWithPreparationRDD[35] at persist at NativeMethodAccessorImpl.java:-2 [] | ShuffledRowRDD[34] at persist at NativeMethodAccessorImpl.java:-2 [] +-(2) MapPartitionsRDD[33] at persist at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsWithPreparationRDD[32] at persist at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsRDD[31] at persist at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2 [] | MapPartitionsRDD[3] at map at SerDeUtil.scala:100 [] | MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:147 [] | PythonRDD[1] at RDD at PythonRDD.scala:43 [] | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423 [] -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-results-differ-based-on-whether-persist-has-been-called-tp25121.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional commands, e-mail: user-help@spark.apache.org