From issues-return-198923-archive-asf-public=cust-asf.ponee.io@spark.apache.org Tue Aug 14 17:27:05 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3C523180654 for ; Tue, 14 Aug 2018 17:27:05 +0200 (CEST) Received: (qmail 52806 invoked by uid 500); 14 Aug 2018 15:27:04 -0000 Mailing-List: contact issues-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@spark.apache.org Received: (qmail 52797 invoked by uid 99); 14 Aug 2018 15:27:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Aug 2018 15:27:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id DF9B0C1A1C for ; Tue, 14 Aug 2018 15:27:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.501 X-Spam-Level: X-Spam-Status: No, score=-109.501 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 7gN01gPOLglj for ; Tue, 14 Aug 2018 15:27:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 78EB55F3AF for ; Tue, 14 Aug 2018 15:27:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id A50DFE0177 for ; Tue, 14 Aug 2018 15:27:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 5F8D023F99 for ; Tue, 14 Aug 2018 15:27:00 +0000 (UTC) Date: Tue, 14 Aug 2018 15:27:00 +0000 (UTC) From: "Li Jin (JIRA)" To: issues@spark.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (SPARK-24721) Failed to use PythonUDF with literal inputs in filter with data sources MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/SPARK-24721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Jin updated SPARK-24721: --------------------------- Summary: Failed to use PythonUDF with literal inputs in filter with data sources (was: Failed to call PythonUDF whose input is the output of another PythonUDF) > Failed to use PythonUDF with literal inputs in filter with data sources > ----------------------------------------------------------------------- > > Key: SPARK-24721 > URL: https://issues.apache.org/jira/browse/SPARK-24721 > Project: Spark > Issue Type: Sub-task > Components: PySpark > Affects Versions: 2.3.1 > Reporter: Xiao Li > Priority: Major > > {code} > import random > from pyspark.sql.functions import * > from pyspark.sql.types import * > def random_probability(label): > if label == 1.0: > return random.uniform(0.5, 1.0) > else: > return random.uniform(0.0, 0.4999) > def randomize_label(ratio): > > if random.random() >= ratio: > return 1.0 > else: > return 0.0 > random_probability = udf(random_probability, DoubleType()) > randomize_label = udf(randomize_label, DoubleType()) > spark.range(10).write.mode("overwrite").format('csv').save("/tmp/tab3") > babydf = spark.read.csv("/tmp/tab3") > data_modified_label = babydf.withColumn( > 'random_label', randomize_label(lit(1 - 0.1)) > ) > data_modified_random = data_modified_label.withColumn( > 'random_probability', > random_probability(col('random_label')) > ) > data_modified_label.filter(col('random_label') == 0).show() > {code} > The above code will generate the following exception: > {code} > Py4JJavaError: An error occurred while calling o446.showString. > : java.lang.RuntimeException: Invalid PythonUDF randomize_label(0.9), requires attributes from more than one child. > at scala.sys.package$.error(package.scala:27) > at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:166) > at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:165) > at scala.collection.immutable.Stream.foreach(Stream.scala:594) > at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:165) > at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:116) > at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:112) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:310) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:310) > at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:77) > at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:309) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:307) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:307) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:327) > at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:208) > at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:325) > at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:307) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:307) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:327) > at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:208) > at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:325) > at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:307) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:307) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:327) > at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:208) > at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:325) > at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:307) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:307) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:327) > at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:208) > at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:325) > at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307) > at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:112) > at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:92) > at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:119) > at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:119) > at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:119) > at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:109) > at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:109) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3016) > at org.apache.spark.sql.Dataset.head(Dataset.scala:2216) > at org.apache.spark.sql.Dataset.take(Dataset.scala:2429) > at org.apache.spark.sql.Dataset.showString(Dataset.scala:248) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) > at py4j.Gateway.invoke(Gateway.java:293) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:226) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org For additional commands, e-mail: issues-help@spark.apache.org