spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
Date Mon, 10 Dec 2018 18:42:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715337#comment-16715337
] 

ASF GitHub Bot commented on SPARK-24561:
----------------------------------------

icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined
window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240331939
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##########
 @@ -144,24 +282,107 @@ case class WindowInPandasExec(
         queue.close()
       }
 
-      val inputProj = UnsafeProjection.create(allInputs, child.output)
-      val pythonInput = grouped.map { case (_, rows) =>
-        rows.map { row =>
-          queue.add(row.asInstanceOf[UnsafeRow])
-          inputProj(row)
+      val stream = iter.map { row =>
+        queue.add(row.asInstanceOf[UnsafeRow])
+        row
+      }
+
+      val pythonInput = new Iterator[Iterator[UnsafeRow]] {
+
+        // Manage the stream and the grouping.
+        var nextRow: UnsafeRow = null
+        var nextGroup: UnsafeRow = null
+        var nextRowAvailable: Boolean = false
+        private[this] def fetchNextRow() {
+          nextRowAvailable = stream.hasNext
+          if (nextRowAvailable) {
+            nextRow = stream.next().asInstanceOf[UnsafeRow]
+            nextGroup = grouping(nextRow)
+          } else {
+            nextRow = null
+            nextGroup = null
+          }
+        }
+        fetchNextRow()
+
+        // Manage the current partition.
+        val buffer: ExternalAppendOnlyUnsafeRowArray =
+          new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
+        var bufferIterator: Iterator[UnsafeRow] = _
+
+        val indexRow = new SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType))
+
+        val frames = factories.map(_(indexRow))
+
+        private[this] def fetchNextPartition() {
+          // Collect all the rows in the current partition.
+          // Before we start to fetch new input rows, make a copy of nextGroup.
+          val currentGroup = nextGroup.copy()
+
+          // clear last partition
+          buffer.clear()
+
+          while (nextRowAvailable && nextGroup == currentGroup) {
 
 Review comment:
   Sure. Here https://github.com/apache/spark/pull/23279

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --------------------------------------------------------------
>
>                 Key: SPARK-24561
>                 URL: https://issues.apache.org/jira/browse/SPARK-24561
>             Project: Spark
>          Issue Type: Sub-task
>          Components: PySpark
>    Affects Versions: 2.3.1
>            Reporter: Li Jin
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message