spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Spark (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-20373) Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute
Date Mon, 08 May 2017 05:40:04 GMT

    [ https://issues.apache.org/jira/browse/SPARK-20373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000294#comment-16000294
] 

Apache Spark commented on SPARK-20373:
--------------------------------------

User 'uncleGen' has created a pull request for this issue:
https://github.com/apache/spark/pull/17896

> Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute
> -----------------------------------------------------------------------
>
>                 Key: SPARK-20373
>                 URL: https://issues.apache.org/jira/browse/SPARK-20373
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.1.0, 2.2.0
>            Reporter: Tathagata Das
>            Priority: Minor
>
> Any Dataset/DataFrame batch query with the operation `withWatermark` does not execute
because the batch planner does not have any rule to explicitly handle the EventTimeWatermark
logical plan. The right solution is to simply remove the plan node, as the watermark should
not affect any batch query in any way.
> {code}
> from pyspark.sql.functions import *
> eventsDF = spark.createDataFrame([("2016-03-11 09:00:07", "dev1", 123)]).toDF("eventTime",
"deviceId", "signal").select(col("eventTime").cast("timestamp").alias("eventTime"), "deviceId",
"signal")
> windowedCountsDF = \
>   eventsDF \
>     .withWatermark("eventTime", "10 minutes") \
>     .groupBy(
>       "deviceId",
>       window("eventTime", "5 minutes")) \
>     .count()
> windowedCountsDF.collect()
> {code}
> This throws as an error 
> {code}
> java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark eventTime#3762657:
timestamp, interval 10 minutes
> +- Project [cast(_1#3762643 as timestamp) AS eventTime#3762657, _2#3762644 AS deviceId#3762651]
>    +- LogicalRDD [_1#3762643, _2#3762644, _3#3762645L]
> 	at scala.Predef$.assert(Predef.scala:170)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> 	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
> 	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> 	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> 	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
> 	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> 	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> 	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
> 	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> 	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> 	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
> 	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> 	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> 	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
> 	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> 	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
> 	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:85)
> 	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:81)
> 	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:90)
> 	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:90)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message