spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Franklyn Dsouza (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-18589) persist() resolves "java.lang.RuntimeException: Invalid PythonUDF <lambda>(...), requires attributes from more than one child"
Date Mon, 19 Dec 2016 17:29:58 GMT

    [ https://issues.apache.org/jira/browse/SPARK-18589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15761741#comment-15761741
] 

Franklyn Dsouza edited comment on SPARK-18589 at 12/19/16 5:29 PM:
-------------------------------------------------------------------

The sequence of steps that causes this are:

{code}
join two dataframes A and B > make a udf that uses one column from A and another from B
> filter on column produced by udf > java.lang.RuntimeException: Invalid PythonUDF <lambda>(b#1L,
c#6L), requires attributes from more than one child.
{code}

Here are some minimum steps to reproduce this issue in pyspark

{code}
from pyspark.sql import types
from pyspark.sql import functions as F
df1 = sqlCtx.createDataFrame([types.Row(a=1, b=2), types.Row(a=1, b=4)])
df2 = sqlCtx.createDataFrame([types.Row(a=1, c=12)])
joined = df1.join(df2, df1['a'] == df2['a'])
extra = joined.withColumn('sum', F.udf(lambda a,b : a+b, types.IntegerType())(joined['b'],
joined['c']))
filtered = extra.where(extra['sum'] < F.lit(10)).collect()
{code}

*doing extra.cache() before the filtering will fix the issue.*



was (Author: franklyndsouza):
The sequence of steps that causes this are:

{code}
join two dataframes A and B > make a udf that uses one column from A and another from B
> filter on column produced by udf > java.lang.RuntimeException: Invalid PythonUDF <lambda>(b#1L,
c#6L), requires attributes from more than one child.
{code}

Here are some minimum steps to reproduce this issue in pyspark

{code}
from pyspark.sql import types
from pyspark.sql import functions as F
df1 = sqlCtx.createDataFrame([types.Row(a=1, b=2), types.Row(a=1, b=4)])
df2 = sqlCtx.createDataFrame([types.Row(a=1, c=12)])
joined = df1.join(df2, df1['a'] == df2['a'])
extra = joined.withColumn('sum', F.udf(lambda a,b : a+b, types.IntegerType())(joined['b'],
joined['c']))
filtered = extra.where(extra['sum'] < F.lit(10)).collect()
{code}

doing extra.cache() before the filtering will fix the issue.


> persist() resolves "java.lang.RuntimeException: Invalid PythonUDF <lambda>(...),
requires attributes from more than one child"
> ------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18589
>                 URL: https://issues.apache.org/jira/browse/SPARK-18589
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 2.0.2, 2.1.0
>         Environment: Python 3.5, Java 8
>            Reporter: Nicholas Chammas
>            Priority: Minor
>
> Smells like another optimizer bug that's similar to SPARK-17100 and SPARK-18254. I'm
seeing this on 2.0.2 and on master at commit {{fb07bbe575aabe68422fd3a31865101fb7fa1722}}.
> I don't have a minimal repro for this yet, but the error I'm seeing is:
> {code}
> py4j.protocol.Py4JJavaError: An error occurred while calling o247.count.
> : java.lang.RuntimeException: Invalid PythonUDF <...>(...), requires attributes
from more than one child.
>     at scala.sys.package$.error(package.scala:27)
>     at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:150)
>     at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:149)
>     at scala.collection.immutable.Stream.foreach(Stream.scala:594)
>     at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:149)
>     at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
>     at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:113)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312)
>     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:311)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
>     at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:113)
>     at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:93)
>     at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93)
>     at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93)
>     at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>     at scala.collection.immutable.List.foldLeft(List.scala:84)
>     at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:93)
>     at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
>     at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
>     at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2555)
>     at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
>     at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>     at py4j.Gateway.invoke(Gateway.java:280)
>     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>     at py4j.commands.CallCommand.execute(CallCommand.java:79)
>     at py4j.GatewayConnection.run(GatewayConnection.java:214)
>     at java.lang.Thread.run(Thread.java:745)
> {code}
> The extended plan (cleaned of field names) is as follows:
> {code}
> == Parsed Logical Plan ==
> 'Filter NOT ('expected_prediction = 'prediction)
> +- Project [p1, p2, pair_features, rawPrediction, probability, prediction, cast((p1._testing_universal_key
= p2._testing_universal_key) as float) AS expected_prediction]
>    +- Project [p1, p2, pair_features, rawPrediction, probability, UDF(rawPrediction)
AS prediction]
>       +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS probability]
>          +- Project [p1, p2, pair_features, UDF(pair_features) AS rawPrediction]
>             +- Project [p1, p2, <lambda>(p1.person, p2.person) AS pair_features]
>                +- Project [struct(...) AS p1, struct(...) AS p2]
>                   +- Project [_blocking_key, ..., ...]
>                      +- Join Inner, (_blocking_key = _blocking_key)
>                         :- SubqueryAlias p1
>                         :  +- Project [..., <lambda>(dataset_name, primary_key,
person) AS _blocking_key]
>                         :     +- Project [...]
>                         :        +- Project [primary_key, universal_key, _testing_universal_key,
struct(...) AS person]
>                         :           +- Project [...]
>                         :              +- Project [_testing_universal_key, primary_key,
struct(...) AS person]
>                         :                 +- LogicalRDD [...]
>                         +- SubqueryAlias p2
>                            +- Project [..., <lambda>(dataset_name, primary_key,
person) AS _blocking_key]
>                               +- Project [...]
>                                  +- Project [primary_key, universal_key, _testing_universal_key,
struct(...) AS person]
>                                     +- Project [...]
>                                        +- Project [_testing_universal_key, primary_key,
struct(...) AS person]
>                                           +- LogicalRDD [...]
> == Analyzed Logical Plan ==
> p1: struct<...>, p2: struct<...>, pair_features: vector, rawPrediction: vector,
probability: vector, prediction: double, expected_prediction: float
> Filter NOT (cast(expected_prediction as double) = prediction)
> +- Project [p1, p2, pair_features, rawPrediction, probability, prediction, cast((p1._testing_universal_key
= p2._testing_universal_key) as float) AS expected_prediction]
>    +- Project [p1, p2, pair_features, rawPrediction, probability, UDF(rawPrediction)
AS prediction]
>       +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS probability]
>          +- Project [p1, p2, pair_features, UDF(pair_features) AS rawPrediction]
>             +- Project [p1, p2, <lambda>(p1.person, p2.person) AS pair_features]
>                +- Project [struct(...) AS p1, struct(...) AS p2]
>                   +- Project [_blocking_key, ..., ...]
>                      +- Join Inner, (_blocking_key = _blocking_key)
>                         :- SubqueryAlias p1
>                         :  +- Project [..., <lambda>(dataset_name, primary_key,
person) AS _blocking_key]
>                         :     +- Project [...]
>                         :        +- Project [primary_key, universal_key, _testing_universal_key,
struct(...) AS person]
>                         :           +- Project [...]
>                         :              +- Project [_testing_universal_key, primary_key,
struct(...) AS person]
>                         :                 +- LogicalRDD [...]
>                         +- SubqueryAlias p2
>                            +- Project [..., <lambda>(dataset_name, primary_key,
person) AS _blocking_key]
>                               +- Project [...]
>                                  +- Project [primary_key, universal_key, _testing_universal_key,
struct(...) AS person]
>                                     +- Project [...]
>                                        +- Project [_testing_universal_key, primary_key,
struct(...) AS person]
>                                           +- LogicalRDD [...]
> == Optimized Logical Plan ==
> Project [struct(...) AS p1, struct(...) AS p2, <lambda>(struct(...).person, struct(...).person)
AS pair_features, UDF(<lambda>(struct(...).person, struct(...).person)) AS rawPrediction,
UDF(UDF(<lambda>(struct(...).person, struct(...).person))) AS probability, UDF(UDF(<lambda>(struct(...).person,
struct(...).person))) AS prediction, cast((struct(...)._testing_universal_key = struct(...)._testing_universal_key)
as float) AS expected_prediction]
> +- Join Inner, (NOT (cast(cast((struct(...)._testing_universal_key = struct(...)._testing_universal_key)
as float) as double) = UDF(UDF(<lambda>(struct(...).person, struct(...).person)))) &&
(_blocking_key = _blocking_key))
>    :- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key]
>    :  +- Filter isnotnull(<lambda>(dataset_name, primary_key, person))
>    :     +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1 replicas)
>    :        :  +- *Project [primary_key, struct(...) AS person, test_people AS dataset_name]
>    :        :     +- Scan ExistingRDD[...]
>    +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key]
>       +- Filter isnotnull(<lambda>(dataset_name, primary_key, person))
>          +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1 replicas)
>             :  +- *Project [primary_key, struct(...) AS person, test_people AS dataset_name]
>             :     +- Scan ExistingRDD[...]
> == Physical Plan ==
> java.lang.RuntimeException: Invalid PythonUDF <lambda>(struct(...).person, struct(...).person),
requires attributes from more than one child.
> {code}
> Note the error at the end when Spark tries to print the physical plan. I've scrubbed
some Project fields from the plan to simplify the display, but if I've scrubbed anything you
think is important let me know.
> I can get around this problem by adding a {{persist()}} right before the operation that
fails. The failing operation is a filter.
> Any clues on how I can boil this down to a minimal repro? Any clues about where the problem
is?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message