From issues-return-241539-archive-asf-public=cust-asf.ponee.io@spark.apache.org Mon Nov 18 22:08:03 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 01C35180660 for ; Mon, 18 Nov 2019 23:08:02 +0100 (CET) Received: (qmail 59547 invoked by uid 500); 18 Nov 2019 22:08:02 -0000 Mailing-List: contact issues-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@spark.apache.org Received: (qmail 59529 invoked by uid 99); 18 Nov 2019 22:08:02 -0000 Received: from mailrelay1-us-west.apache.org (HELO mailrelay1-us-west.apache.org) (209.188.14.139) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Nov 2019 22:08:02 +0000 Received: from jira-he-de.apache.org (static.172.67.40.188.clients.your-server.de [188.40.67.172]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 0FB30E3069 for ; Mon, 18 Nov 2019 22:08:00 +0000 (UTC) Received: from jira-he-de.apache.org (localhost.localdomain [127.0.0.1]) by jira-he-de.apache.org (ASF Mail Server at jira-he-de.apache.org) with ESMTP id 228EB7804F8 for ; Mon, 18 Nov 2019 22:08:00 +0000 (UTC) Date: Mon, 18 Nov 2019 22:08:00 +0000 (UTC) From: "koba (Jira)" To: issues@spark.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (SPARK-29952) Pandas UDFs do not support vectors as input MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 koba created SPARK-29952: ---------------------------- Summary: Pandas UDFs do not support vectors as input Key: SPARK-29952 URL: https://issues.apache.org/jira/browse/SPARK-29952 Project: Spark Issue Type: Improvement Components: PySpark, SQL Affects Versions: 2.4.3 Reporter: koba Currently, pandas udfs do not support columns of vectors as input. Only col= umns of arrays. This means that feature columns that contain Dense- or Spar= se vectors generated by CountVectorizer for example are not supported by pa= ndas udfs out of the box. One needs to convert vectors into arrays first. I= t was not documented anywhere and I had to find out by trial and error. Bel= ow is an example.=C2=A0 =C2=A0 {code:java} from pyspark.sql.functions import udf, pandas_udf import pyspark.sql.functions as F from pyspark.ml.linalg import DenseVector, Vectors, VectorUDT from pyspark.sql.types import * import numpy as np columns =3D ['features','id'] vals =3D [ (DenseVector([1, 2, 1, 3]),1), (DenseVector([2, 2, 1, 3]),2) ] sdf =3D spark.createDataFrame(vals,columns) sdf.show() +-----------------+---+ | features| id| +-----------------+---+ |[1.0,2.0,1.0,3.0]| 1| |[2.0,2.0,1.0,3.0]| 2| +-----------------+---+ {code} {code:java} @udf(returnType=3DArrayType(FloatType())) def vector_to_array(v): # convert column of vectors into column of arrays a =3D v.values.tolist() return a sdf =3D sdf.withColumn('features_array',vector_to_array('features')) sdf.show() sdf.dtypes +-----------------+---+--------------------+ | features| id| features_array| +-----------------+---+--------------------+ |[1.0,2.0,1.0,3.0]| 1|[1.0, 2.0, 1.0, 3.0]| |[2.0,2.0,1.0,3.0]| 2|[2.0, 2.0, 1.0, 3.0]| +-----------------+---+--------------------+ [('features', 'vector'), ('id', 'bigint'), ('features_array', 'array= ')] {code} {code:java} import pandas as pd @pandas_udf(LongType()) def _pandas_udf(v): res =3D [] for i in v: res.append(i.mean()) return pd.Series(res) sdf.select(_pandas_udf('features_array')).show() +---------------------------+ |_pandas_udf(features_array)| +---------------------------+ | 1| | 2| +---------------------------+ {code} But If I use the vector column I get the following error. {code:java} sdf.select(_pandas_udf('features')).show() --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in 13=20 14=20 ---> 15 sdf.select(_pandas_udf('features')).show() ~/.pyenv/versions/anaconda3-5.3.1/lib/python3.7/site-packages/pyspark/sql/d= ataframe.py in show(self, n, truncate, vertical) 376 """ 377 if isinstance(truncate, bool) and truncate: --> 378 print(self._jdf.showString(n, 20, vertical)) 379 else: 380 print(self._jdf.showString(n, int(truncate), vertical)) ~/.pyenv/versions/3.4.4/lib/python3.4/site-packages/pyspark/python/lib/py4j= -0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args) 1255 answer =3D self.gateway_client.send_command(command) 1256 return_value =3D get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258=20 1259 for temp_arg in temp_args: ~/.pyenv/versions/anaconda3-5.3.1/lib/python3.7/site-packages/pyspark/sql/u= tils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s =3D e.java_exception.toString() ~/.pyenv/versions/3.4.4/lib/python3.4/site-packages/pyspark/python/lib/py4j= -0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client= , target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( Py4JJavaError: An error occurred while calling o2635.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0= in stage 156.0 failed 1 times, most recent failure: Lost task 0.0 in stage= 156.0 (TID 606, localhost, executor driver): java.lang.UnsupportedOperatio= nException: Unsupported data type: struct,values:array> =09at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowType(ArrowUti= ls.scala:56) =09at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowField(ArrowUt= ils.scala:92) =09at org.apache.spark.sql.execution.arrow.ArrowUtils$$anonfun$toArrowSchem= a$1.apply(ArrowUtils.scala:116) =09at org.apache.spark.sql.execution.arrow.ArrowUtils$$anonfun$toArrowSchem= a$1.apply(ArrowUtils.scala:115) =09at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike= .scala:234) =09at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike= .scala:234) =09at scala.collection.Iterator$class.foreach(Iterator.scala:891) =09at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) =09at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) =09at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99) =09at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) =09at org.apache.spark.sql.types.StructType.map(StructType.scala:99) =09at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowSchema(ArrowU= tils.scala:115) =09at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.write= IteratorToStream(ArrowPythonRunner.scala:71) =09at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$ru= n$1.apply(PythonRunner.scala:345) =09at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) =09at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonR= unner.scala:194) Driver stacktrace: =09at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DA= GScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889) =09at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(D= AGScheduler.scala:1877) =09at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(D= AGScheduler.scala:1876) =09at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.= scala:59) =09at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) =09at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala= :1876) =09at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$= 1.apply(DAGScheduler.scala:926) =09at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$= 1.apply(DAGScheduler.scala:926) =09at scala.Option.foreach(Option.scala:257) =09at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGSchedu= ler.scala:926) =09at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(D= AGScheduler.scala:2110) =09at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAG= Scheduler.scala:2059) =09at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAG= Scheduler.scala:2048) =09at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) =09at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737= ) =09at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) =09at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) =09at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) =09at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:= 365) =09at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.= scala:38) =09at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFro= mPlan(Dataset.scala:3383) =09at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544= ) =09at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544= ) =09at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) =09at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecution= Id$1.apply(SQLExecution.scala:78) =09at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQ= LExecution.scala:125) =09at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLEx= ecution.scala:73) =09at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363) =09at org.apache.spark.sql.Dataset.head(Dataset.scala:2544) =09at org.apache.spark.sql.Dataset.take(Dataset.scala:2758) =09at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254) =09at org.apache.spark.sql.Dataset.showString(Dataset.scala:291) =09at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source) =09at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces= sorImpl.java:43) =09at java.lang.reflect.Method.invoke(Method.java:498) =09at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) =09at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) =09at py4j.Gateway.invoke(Gateway.java:282) =09at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) =09at py4j.commands.CallCommand.execute(CallCommand.java:79) =09at py4j.GatewayConnection.run(GatewayConnection.java:238) =09at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.UnsupportedOperationException: Unsupported data type: = struct,values:array> =09at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowType(ArrowUti= ls.scala:56) =09at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowField(ArrowUt= ils.scala:92) =09at org.apache.spark.sql.execution.arrow.ArrowUtils$$anonfun$toArrowSchem= a$1.apply(ArrowUtils.scala:116) =09at org.apache.spark.sql.execution.arrow.ArrowUtils$$anonfun$toArrowSchem= a$1.apply(ArrowUtils.scala:115) =09at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike= .scala:234) =09at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike= .scala:234) =09at scala.collection.Iterator$class.foreach(Iterator.scala:891) =09at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) =09at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) =09at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99) =09at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) =09at org.apache.spark.sql.types.StructType.map(StructType.scala:99) =09at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowSchema(ArrowU= tils.scala:115) =09at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.write= IteratorToStream(ArrowPythonRunner.scala:71) =09at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$ru= n$1.apply(PythonRunner.scala:345) =09at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) =09at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonR= unner.scala:194) {code} =C2=A0 =C2=A0 =C2=A0 -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org For additional commands, e-mail: issues-help@spark.apache.org