spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
Date Tue, 11 Dec 2018 09:50:02 GMT

    [ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716708#comment-16716708
] 

ASF GitHub Bot commented on SPARK-24561:
----------------------------------------

HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined
window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240538539
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##########
 @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution,
Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper bound for each
window
+ * (i.e. window bounds) and pass the data and indices to python work to do the actual window
+ * aggregation.
+ *
+ * It currently materializes all data associated with the same partition key and passes them
to
+ * Python worker. This is not strictly necessary for sliding windows and can be improved
(by
+ * possibly slicing data into overlapping chunks and stitch them together).
+ *
+ * This class groups window expressions by their window boundaries so that window expressions
+ * with the same window boundaries can share the same window bounds. The window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ *     avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ *     avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing),
+ *     avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ *     max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ *       w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing)
+ *       w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because its unbounded window
+ * so it's bound indices will always be the same.
+ *
+ * Unbounded window also have a different eval type, because:
+ * (1) It doesn't have bound indices as input
+ * (2) The udf only needs to be evaluated once the in python worker (because the udf is
 
 Review comment:
   nit: `the in` -> `in the`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --------------------------------------------------------------
>
>                 Key: SPARK-24561
>                 URL: https://issues.apache.org/jira/browse/SPARK-24561
>             Project: Spark
>          Issue Type: Sub-task
>          Components: PySpark
>    Affects Versions: 2.3.1
>            Reporter: Li Jin
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message