spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Saif Addin Ellafi (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-11330) Filter operation on StringType after groupBy PERSISTED brings no results
Date Wed, 28 Oct 2015 17:24:27 GMT

    [ https://issues.apache.org/jira/browse/SPARK-11330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14978528#comment-14978528
] 

Saif Addin Ellafi edited comment on SPARK-11330 at 10/28/15 5:24 PM:
---------------------------------------------------------------------

Hello Cheng Hao, and thank you very much for trying to reproduce. 

You are right, this does not happen on very small data. I started reproducing it with a slightly
more complex +100 rows of data. This is the smallest I can do, I am not sure I understand
the issue completely.

UPDATE: This seems to be related to partitioning/data localty. On a 100 rows database, I could
only reproduce the issue when using a cluster mode shell. Not happening in local mode.

To ensure you can reproduce the issue, please use the json dataframe attached (50k rows),
so you can see the issue even in local[1] mode.

Please use attached dataframe in JSON in spark-shell local mode no options.

val df = sqlContext.read.json("/var/data/Saif/bug_reproduce")

df.groupBy("mdl_loan_state", "filemonth_dt").count.filter("mdl_loan_state = 'PPD00 '").count

df.groupBy("mdl_loan_state", "filemonth_dt").count.persist().filter("mdl_loan_state = 'PPD00
'").count

>>>

The first count returns 129, while the persisted one, returns ~16.
With bigger data, the difference increases exponentially and can randomly change.

Thank you!


was (Author: saif.a.ellafi):
Hello Cheng Hao, and thank you very much for trying to reproduce. 

You are right, this does not happen on very small data. I started reproducing it with a slightly
more complex +100 rows of data. This is the smallest I can do, I am not sure I understand
the issue completely.

1. Put in json file (I have also attached a zip file with the dataset  in case copy paste
broke something)

{"mdl_loan_state":"PPD00 ","filemonth_dt":"01NOV2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JAN2010"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01OCT2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUL2014"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUN2010"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01SEP2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUN2008"}
{"mdl_loan_state":"CF0   ","filemonth_dt":"01OCT2013"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUN2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01DEC2011"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01MAY2011"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUL2010"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01AUG2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUN2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01OCT2010"}
{"mdl_loan_state":"PPD56 ","filemonth_dt":"01NOV2009"}
{"mdl_loan_state":"PPD3  ","filemonth_dt":"01SEP2012"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01OCT2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUL2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUL2012"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUN2011"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01OCT2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01MAR2013"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01MAR2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JAN2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JAN2010"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01DEC2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01MAR2012"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JAN2010"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01MAR2011"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01APR2010"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01SEP2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUN2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01SEP2010"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUL2012"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01SEP2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01DEC2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUN2014"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01SEP2011"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01AUG2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01FEB2014"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01FEB2013"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01MAY2011"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01DEC2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01DEC2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01FEB2011"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01MAR2010"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01MAY2010"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01FEB2011"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01DEC2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01OCT2013"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JAN2014"}
{"mdl_loan_state":"CF0   ","filemonth_dt":"01NOV2013"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01OCT2014"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01APR2012"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01SEP2011"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUN2011"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JAN2010"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JAN2013"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01DEC2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01NOV2010"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01OCT2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01OCT2011"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JAN2010"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01AUG2011"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01FEB2012"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01OCT2013"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JAN2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01APR2010"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01SEP2013"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01FEB2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01APR2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01FEB2012"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01MAR2010"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01MAR2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JAN2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01MAY2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01FEB2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01AUG2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JAN2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JAN2011"}
{"mdl_loan_state":"PPD01 ","filemonth_dt":"01JUL2011"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JAN2010"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01DEC2011"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01APR2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01APR2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01APR2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01OCT2012"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01FEB2012"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01FEB2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUL2008"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01MAR2011"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUL2013"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUN2014"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01FEB2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JAN2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01SEP2013"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01JUL2008"}
{"mdl_loan_state":"PPD3  ","filemonth_dt":"01AUG2009"}
{"mdl_loan_state":"PPD00 ","filemonth_dt":"01SEP2011"}

val df = sqlContext.read.json("/var/CMOR/data/Saif/bug_reproduce")

df.groupBy("mdl_loan_state", "filemonth_dt").count.filter("mdl_loan_state = 'PPD00 '").count

df.groupBy("mdl_loan_state", "filemonth_dt").count.persist().filter("mdl_loan_state = 'PPD00
'").count

>>>

The first count returns 54, while the second one, returns 53~52. Try a couple times to see
different results.
With bigger data, the difference increases exponentially.

Thank you!

> Filter operation on StringType after groupBy PERSISTED brings no results
> ------------------------------------------------------------------------
>
>                 Key: SPARK-11330
>                 URL: https://issues.apache.org/jira/browse/SPARK-11330
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.5.1
>         Environment: Stand alone Cluster of five servers (happens as well in local mode).
sqlContext instance of HiveContext (happens as well with SQLContext)
> No special options other than driver memory and executor memory.
> Parquet partitions are 512 where there are 160 cores. Happens as well with other partitioning
> Data is nearly 2 billion rows.
>            Reporter: Saif Addin Ellafi
>            Priority: Blocker
>         Attachments: bug_reproduce.zip
>
>
> ONLY HAPPENS WHEN PERSIST() IS CALLED
> val data = sqlContext.read.parquet("/var/data/Saif/ppnr_pqt")
> data.groupBy("vintages").count.select("vintages").filter("vintages = '2007-01-01'").first
> >>> res9: org.apache.spark.sql.Row = [2007-01-01]
> data.groupBy("vintages").count.persist.select("vintages").filter("vintages = '2007-01-01'").first
> >>> Exception on empty iterator stuff
> This does not happen if using another type of field, eg IntType
> data.groupBy("yyyymm").count.persist.select("yyyymm").filter("yyyymm = 200805").first
>>> res13: org.apache.spark.sql.Row = [200805]
> NOTE: I have no idea whether I used KRYO serializer when stored this parquet.
> NOTE2: If setting the persist after the filtering, it works fine. But this is not a good
enough workaround since any filter operation afterwards will break results.
> NOTE3: I have reproduced the issue with several different datasets.
> NOTE4: The only real workaround is to store the groupBy dataframe in database and reload
it as a new dataframe.
> Query to raw-data works fine:
> data.select("vintages").filter("vintages = '2007-01-01'").first >>> res4: org.apache.spark.sql.Row
= [2007-01-01]
> Originally, the issue happened with a larger aggregation operation, the result was that
data was inconsistent bringing different results every call.
> Reducing the operation step by step, I got into this issue.
> In any case, the original operation was:
> val data = sqlContext.read.parquet("/var/Saif/data_pqt")
> val res = data.groupBy("product", "band", "age", "vint", "mb", "yyyymm").agg(count($"account_id").as("N"),
sum($"balance").as("balance_eom"), sum($"balancec").as("balance_eoc"), sum($"spend").as("spend"),
sum($"payment").as("payment"), sum($"feoc").as("feoc"), sum($"cfintbal").as("cfintbal"), count($"newacct"
=== 1).as("newacct")).persist()
> val z = res.select("vint", "yyyymm").filter("vint = '2007-01-01'").select("yyyymm").distinct.collect
> z.length
> >>> res0: Int = 102
> res.unpersist()
> val z = res.select("vint", "yyyymm").filter("vint = '2007-01-01'").select("yyyymm").distinct.collect
> z.length
> >>> res1: Int = 103



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message