spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stu (Michael Stewart) (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-23645) pandas_udf can not be called with keyword arguments
Date Sat, 10 Mar 2018 21:20:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Stu (Michael Stewart) updated SPARK-23645:
------------------------------------------
    Description: 
pandas_udf (all python udfs(?)) do not accept keyword arguments because `pyspark/sql/udf.py`
class `UserDefinedFunction` has __call__, and also wrapper utility methods, that only accept args
and not kwargs:

@ line 168:
{code:java}
...

def __call__(self, *cols):
    judf = self._judf
    sc = SparkContext._active_spark_context
    return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))

# This function is for improving the online help system in the interactive interpreter.
# For example, the built-in help / pydoc.help. It wraps the UDF with the docstring and
# argument annotation. (See: SPARK-19161)
def _wrapped(self):
    """
    Wrap this udf with a function and attach docstring from func
    """

    # It is possible for a callable instance without __name__ attribute or/and
    # __module__ attribute to be wrapped here. For example, functools.partial. In this case,
    # we should avoid wrapping the attributes from the wrapped function to the wrapper
    # function. So, we take out these attribute names from the default names to set and
    # then manually assign it after being wrapped.
    assignments = tuple(
        a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__')

    @functools.wraps(self.func, assigned=assignments)
    def wrapper(*args):
        return self(*args)

...{code}
as seen in:
{code:java}
from pyspark.sql import SparkSession

from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit

spark = SparkSession.builder.getOrCreate()

df = spark.range(12).withColumn('b', col('id') * 2)

def ok(a,b): return a*b

df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')('id','b')).show()  # no problems
df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')(a='id',b='b')).show()  # fail with
~no stacktrace thanks to wrapper helper{code}
 

 

*discourse*: it isn't difficult to swap back in the kwargs, allowing the UDF to be called as
such, but the cols tuple that gets passed in the call method:
{code:java}
_to_seq(sc, cols, _to_java_column{code}
 has to be in the right order based on the functions defined argument inputs, or the function
will return incorrect results. so, the challenge here is to:

(a) make sure to reconstruct the proper order of the full args/kwargs

--> args first, and then kwargs (not in the order passed but in the order requested by
the fn)

(b) handle python2 and python3 `inspect` module inconsistencies 

  was:
pandas_udf (all python udfs(?)) do not accept keyword arguments because `pyspark/sql/udf.py`
class `UserDefinedFunction` has __call__, and also wrapper utility methods, that only accept args
and not kwargs:

@ line 168:
{code:java}
...

def __call__(self, *cols):
    judf = self._judf
    sc = SparkContext._active_spark_context
    return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))

# This function is for improving the online help system in the interactive interpreter.
# For example, the built-in help / pydoc.help. It wraps the UDF with the docstring and
# argument annotation. (See: SPARK-19161)
def _wrapped(self):
    """
    Wrap this udf with a function and attach docstring from func
    """

    # It is possible for a callable instance without __name__ attribute or/and
    # __module__ attribute to be wrapped here. For example, functools.partial. In this case,
    # we should avoid wrapping the attributes from the wrapped function to the wrapper
    # function. So, we take out these attribute names from the default names to set and
    # then manually assign it after being wrapped.
    assignments = tuple(
        a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__')

    @functools.wraps(self.func, assigned=assignments)
    def wrapper(*args):
        return self(*args)

...{code}
as seen in:
{code:java}
from pyspark.sql import SparkSession

from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit

spark = SparkSession.builder.getOrCreate()

df = spark.range(12).withColumn('b', col('id') * 2)

def ok(a,b): return a*b

df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')('id','b')).show()  # no problems
df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')(a='id',b='b')).show()  # fail with
~no stacktrace thanks to wrapper helper{code}
discourse: it isn't difficult to swap back in the kwargs, allowing the UDF to be called as
such, but the cols tuple that gets passed in the call method:
{code:java}
_to_seq(sc, cols, _to_java_column{code}
 has to be in the right order based on the functions defined argument inputs, or the function
will return incorrect results. 


> pandas_udf can not be called with keyword arguments
> ---------------------------------------------------
>
>                 Key: SPARK-23645
>                 URL: https://issues.apache.org/jira/browse/SPARK-23645
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark
>    Affects Versions: 2.3.0
>         Environment: python 3.6 | pyspark 2.3.0 | Using Scala version 2.11.8, OpenJDK
64-Bit Server VM, 1.8.0_141
>            Reporter: Stu (Michael Stewart)
>            Priority: Minor
>
> pandas_udf (all python udfs(?)) do not accept keyword arguments because `pyspark/sql/udf.py`
class `UserDefinedFunction` has __call__, and also wrapper utility methods, that only accept args
and not kwargs:
> @ line 168:
> {code:java}
> ...
> def __call__(self, *cols):
>     judf = self._judf
>     sc = SparkContext._active_spark_context
>     return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
> # This function is for improving the online help system in the interactive interpreter.
> # For example, the built-in help / pydoc.help. It wraps the UDF with the docstring and
> # argument annotation. (See: SPARK-19161)
> def _wrapped(self):
>     """
>     Wrap this udf with a function and attach docstring from func
>     """
>     # It is possible for a callable instance without __name__ attribute or/and
>     # __module__ attribute to be wrapped here. For example, functools.partial. In this
case,
>     # we should avoid wrapping the attributes from the wrapped function to the wrapper
>     # function. So, we take out these attribute names from the default names to set and
>     # then manually assign it after being wrapped.
>     assignments = tuple(
>         a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__')
>     @functools.wraps(self.func, assigned=assignments)
>     def wrapper(*args):
>         return self(*args)
> ...{code}
> as seen in:
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit
> spark = SparkSession.builder.getOrCreate()
> df = spark.range(12).withColumn('b', col('id') * 2)
> def ok(a,b): return a*b
> df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')('id','b')).show()  # no problems
> df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')(a='id',b='b')).show()  # fail
with ~no stacktrace thanks to wrapper helper{code}
>  
>  
> *discourse*: it isn't difficult to swap back in the kwargs, allowing the UDF to be
called as such, but the cols tuple that gets passed in the call method:
> {code:java}
> _to_seq(sc, cols, _to_java_column{code}
>  has to be in the right order based on the functions defined argument inputs, or the
function will return incorrect results. so, the challenge here is to:
> (a) make sure to reconstruct the proper order of the full args/kwargs
> --> args first, and then kwargs (not in the order passed but in the order requested
by the fn)
> (b) handle python2 and python3 `inspect` module inconsistencies 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message