spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Barry Becker (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-24019) AnalysisException for Window function expression to compute derivative
Date Thu, 19 Apr 2018 15:08:01 GMT

    [ https://issues.apache.org/jira/browse/SPARK-24019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444202#comment-16444202
] 

Barry Becker edited comment on SPARK-24019 at 4/19/18 3:07 PM:
---------------------------------------------------------------

Lowering to minor because I found a way to specify the derivative window function without
getting the above error. The main fix was to remove rangeBetween from the window spec.

Here is what I now use and it seems to give the result I am looking for without error:
{code:java}
val window = Window.partitionBy("category").orderBy("sequence_num")

// Consider the three sequential series points (Xlag, Ylag), (X, Y), (Xlead, Ylead).
// This defines the derivative as (Ylead - Ylag) / (Xlead - Xlag)
// If the lead or lag points are null, then we fall back on using the middle point.
val yLead = coalesce(lead("value", 1).over(window), col("value"))
val yLag = coalesce(lag("value", 1).over(window), col("value"))
val xLead = coalesce(lead("sequence_num", 1).over(window), col("sequence_num"))
val xLag = coalesce(lag("sequence_num", 1).over(window), col("sequence_num"))
val derivative: Column = (yLead - yLag) / (xLead - xLag)

val resultDf = simpleDf.withColumn("derivative", derivative)

resultDf.show()
assertResult(strip("""1, b, 100.0, -30.0
  |2, b, 70.0, -20.0
  |3, b, 60.0, -10.0
  |1, a, 2.1, 0.2999999999999998
  |2, a, 2.4, 0.8
  |3, a, 3.7, 0.6000000000000001
  |4, a, 3.6, -0.10000000000000009""")
) {
  resultDf.collect().map(row => row.mkString(", ")).mkString("\n")
}{code}


was (Author: barrybecker4):
Lowering to minor because I found a way to specify the deriviative window function without
getting the above error. The main fix was to remove rangeBetween from the window spec.

Here is what I now use and it seems to give the result I am looking for without error:
{code:java}

val window = Window.partitionBy("category").orderBy("sequence_num")

// Consider the three sequential series points (Xlag, Ylag), (X, Y), (Xlead, Ylead).
// This defines the derivative as (Ylead - Ylag) / (Xlead - Xlag)
// If the lead or lag points are null, then we fall back on using the middle point.
val yLead = coalesce(lead("value", 1).over(window), col("value"))
val yLag = coalesce(lag("value", 1).over(window), col("value"))
val xLead = coalesce(lead("sequence_num", 1).over(window), col("sequence_num"))
val xLag = coalesce(lag("sequence_num", 1).over(window), col("sequence_num"))
val derivative: Column = (yLead - yLag) / (xLead - xLag)

val resultDf = simpleDf.withColumn("derivative", derivative)

resultDf.show()
assertResult(strip("""1, b, 100.0, -30.0
  |2, b, 70.0, -20.0
  |3, b, 60.0, -10.0
  |1, a, 2.1, 0.2999999999999998
  |2, a, 2.4, 0.8
  |3, a, 3.7, 0.6000000000000001
  |4, a, 3.6, -0.10000000000000009""")
) {
  resultDf.collect().map(row => row.mkString(", ")).mkString("\n")
}{code}

> AnalysisException for Window function expression to compute derivative
> ----------------------------------------------------------------------
>
>                 Key: SPARK-24019
>                 URL: https://issues.apache.org/jira/browse/SPARK-24019
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.1
>         Environment: Ubuntu, spark 2.1.1, standalone.
>            Reporter: Barry Becker
>            Priority: Minor
>
> I am using spark 2.1.1 currently.
> I created an expression to compute the derivative of some series data using a window
function.
> I have a simple reproducible case of the error.
> I'm only filing this bug because the error message says "Please file a bug report with
this error message, stack trace, and the query."
> Here they are:
> {code:java}
> ((coalesce(lead(value#9, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC
NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), value#9) - coalesce(lag(value#9, 1,
null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING
AND 1 PRECEDING), value#9)) / cast((coalesce(lead(sequence_num#7, 1, null) windowspecdefinition(category#8,
sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), sequence_num#7)
- coalesce(lag(sequence_num#7, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC
NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), sequence_num#7)) as double)) windowspecdefinition(category#8,
sequence_num#7 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS derivative#14
has multiple Window Specifications (ArrayBuffer(windowspecdefinition(category#8, sequence_num#7
ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), windowspecdefinition(category#8,
sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), windowspecdefinition(category#8,
sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING))).
> Please file a bug report with this error message, stack trace, and the query.;
> org.apache.spark.sql.AnalysisException: ((coalesce(lead(value#9, 1, null) windowspecdefinition(category#8,
sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), value#9) - coalesce(lag(value#9,
1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING
AND 1 PRECEDING), value#9)) / cast((coalesce(lead(sequence_num#7, 1, null) windowspecdefinition(category#8,
sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), sequence_num#7)
- coalesce(lag(sequence_num#7, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC
NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), sequence_num#7)) as double)) windowspecdefinition(category#8,
sequence_num#7 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS derivative#14
has multiple Window Specifications (ArrayBuffer(windowspecdefinition(category#8, sequence_num#7
ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), windowspecdefinition(category#8,
sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), windowspecdefinition(category#8,
sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING))).
> Please file a bug report with this error message, stack trace, and the query.;
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
> at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)
> at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$78.apply(Analyzer.scala:1772){code}
> And here is a simple unit test that can be used to reproduce the problem:
> {code:java}
> import com.mineset.spark.testsupport.SparkTestCase.SPARK_SESSION
> import org.apache.spark.sql.Column
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql.functions._
> import org.scalatest.FunSuite
> import com.mineset.spark.testsupport.SparkTestCase._
> /**
> * Test to see that window functions work as expected on spark.
> * @author Barry Becker
> */
> class WindowFunctionSuite extends FunSuite {
> val simpleDf = createSimpleData()
> test("Window function for finding derivatives for 2 series") {
> val window =    Window.partitionBy("category").orderBy("sequence_num")//.rangeBetween(-1,
1)
> // Consider the three sequential series points (Xlag, Ylag), (X, Y), (Xlead, Ylead).
> // This defines the derivative as (Ylead - Ylag) / (Xlead - Xlag)
> // If the lead or lag points are null, then we fall back on using the middle point.
> val yLead = coalesce(lead("value", 1).over(window), col("value"))
> val yLag = coalesce(lag("value", 1).over(window), col("value"))
> val xLead = coalesce(lead("sequence_num", 1).over(window), col("sequence_num"))
> val xLag = coalesce(lag("sequence_num", 1).over(window), col("sequence_num"))
> val derivative: Column = (yLead - yLag) / (xLead - xLag)
> val resultDf = simpleDf.withColumn("derivative", derivative.over(window))
> resultDf.show()
> assertResult("???") {
>   resultDf.collect().map(row => row.mkString(", ")).mkString("\n")
> }
> }
> def createSimpleData() = {
> val data = Seq(
> (1, "a", 2.1),
> (2, "a", 2.4),
> (1, "b", 100.0),
> (3, "a", 3.7),
> (2, "b", 70.0),
> (4, "a", 3.6),
> (3, "b", 60.0))
> SPARK_SESSION.sqlContext.createDataFrame(data).toDF("sequence_num", "category", "value")
> }
> }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message