spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jacek Laskowski (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-21546) dropDuplicates with watermark yields RuntimeException due to binding failure
Date Thu, 27 Jul 2017 08:50:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-21546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Jacek Laskowski updated SPARK-21546:
------------------------------------
    Description: 
With today's master...

The following streaming query with watermark and {{dropDuplicates}} yields {{RuntimeException}}
due to failure in binding.

{code}
val topic1 = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  option("startingoffsets", "earliest").
  load

val records = topic1.
  withColumn("eventtime", 'timestamp).  // <-- just to put the right name given the purpose
  withWatermark(eventTime = "eventtime", delayThreshold = "30 seconds"). // <-- use the
renamed eventtime column
  dropDuplicates("value").  // dropDuplicates will use watermark
                            // only when eventTime column exists
  // include the watermark column => internal design leak?
  select('key cast "string", 'value cast "string", 'eventtime).
  as[(String, String, java.sql.Timestamp)]

scala> records.explain
== Physical Plan ==
*Project [cast(key#0 as string) AS key#169, cast(value#1 as string) AS value#170, eventtime#157-T30000ms]
+- StreamingDeduplicate [value#1], StatefulOperatorStateInfo(<unknown>,93c3de98-3f85-41a4-8aef-d09caf8ea693,0,0),
0
   +- Exchange hashpartitioning(value#1, 200)
      +- EventTimeWatermark eventtime#157: timestamp, interval 30 seconds
         +- *Project [key#0, value#1, timestamp#5 AS eventtime#157]
            +- StreamingRelation kafka, [key#0, value#1, topic#2, partition#3, offset#4L,
timestamp#5, timestampType#6]

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = records.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime("10 seconds")).
  queryName("from-kafka-topic1-to-console").
  outputMode(OutputMode.Update).
  start
{code}

{code}
-------------------------------------------
Batch: 0
-------------------------------------------
17/07/27 10:28:58 ERROR Executor: Exception in task 3.0 in stage 13.0 (TID 438)
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: eventtime#157-T30000ms
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.bind(GeneratePredicate.scala:45)
	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.bind(GeneratePredicate.scala:40)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:977)
	at org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:370)
	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.org$apache$spark$sql$execution$streaming$WatermarkSupport$$super$newPredicate(statefulOperators.scala:350)
	at org.apache.spark.sql.execution.streaming.WatermarkSupport$$anonfun$watermarkPredicateForKeys$1.apply(statefulOperators.scala:160)
	at org.apache.spark.sql.execution.streaming.WatermarkSupport$$anonfun$watermarkPredicateForKeys$1.apply(statefulOperators.scala:160)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.sql.execution.streaming.WatermarkSupport$class.watermarkPredicateForKeys(statefulOperators.scala:160)
	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.watermarkPredicateForKeys$lzycompute(statefulOperators.scala:350)
	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.watermarkPredicateForKeys(statefulOperators.scala:350)
	at org.apache.spark.sql.execution.streaming.WatermarkSupport$class.removeKeysOlderThanWatermark(statefulOperators.scala:167)
	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.removeKeysOlderThanWatermark(statefulOperators.scala:350)
	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec$$anonfun$doExecute$4$$anonfun$apply$4$$anonfun$apply$mcV$sp$1.apply$mcV$sp(statefulOperators.scala:403)
	at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:96)
	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.timeTakenMs(statefulOperators.scala:350)
	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec$$anonfun$doExecute$4$$anonfun$apply$4.apply$mcV$sp(statefulOperators.scala:403)
	at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:344)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Couldn't find eventtime#157-T30000ms in [value#185]
	at scala.sys.package$.error(package.scala:27)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 49 more
{code}

I'm somehow convinced that watermark support leaks from {{StreamingDeduplicate}} and forces
a Spark developer to include extra fields for watermark. I think filter pushdown (for the
select) should not be executed for this case or should include the extra {{eventTime}} column
(regardless of whether a developer uses it or not).


  was:
With today's master...

The following streaming query yields {{RuntimeException}} due to failure in binding (most
likely due to {{select}} operator).

{code}
val topic1 = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  option("startingoffsets", "earliest").
  load

val records = topic1.
  withColumn("eventtime", 'timestamp).  // <-- just to put the right name given the purpose
  withWatermark(eventTime = "eventtime", delayThreshold = "30 seconds"). // <-- use the
renamed eventtime column
  dropDuplicates("value").  // dropDuplicates will use watermark
                            // only when eventTime column exists
  // include the watermark column => internal design leak?
  select('key cast "string", 'value cast "string", 'eventtime).
  as[(String, String, java.sql.Timestamp)]

scala> records.explain
== Physical Plan ==
*Project [cast(key#0 as string) AS key#169, cast(value#1 as string) AS value#170, eventtime#157-T30000ms]
+- StreamingDeduplicate [value#1], StatefulOperatorStateInfo(<unknown>,93c3de98-3f85-41a4-8aef-d09caf8ea693,0,0),
0
   +- Exchange hashpartitioning(value#1, 200)
      +- EventTimeWatermark eventtime#157: timestamp, interval 30 seconds
         +- *Project [key#0, value#1, timestamp#5 AS eventtime#157]
            +- StreamingRelation kafka, [key#0, value#1, topic#2, partition#3, offset#4L,
timestamp#5, timestampType#6]

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = records.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime("10 seconds")).
  queryName("from-kafka-topic1-to-console").
  outputMode(OutputMode.Update).
  start
{code}

{code}
-------------------------------------------
Batch: 0
-------------------------------------------
17/07/27 10:28:58 ERROR Executor: Exception in task 3.0 in stage 13.0 (TID 438)
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: eventtime#157-T30000ms
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.bind(GeneratePredicate.scala:45)
	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.bind(GeneratePredicate.scala:40)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:977)
	at org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:370)
	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.org$apache$spark$sql$execution$streaming$WatermarkSupport$$super$newPredicate(statefulOperators.scala:350)
	at org.apache.spark.sql.execution.streaming.WatermarkSupport$$anonfun$watermarkPredicateForKeys$1.apply(statefulOperators.scala:160)
	at org.apache.spark.sql.execution.streaming.WatermarkSupport$$anonfun$watermarkPredicateForKeys$1.apply(statefulOperators.scala:160)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.sql.execution.streaming.WatermarkSupport$class.watermarkPredicateForKeys(statefulOperators.scala:160)
	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.watermarkPredicateForKeys$lzycompute(statefulOperators.scala:350)
	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.watermarkPredicateForKeys(statefulOperators.scala:350)
	at org.apache.spark.sql.execution.streaming.WatermarkSupport$class.removeKeysOlderThanWatermark(statefulOperators.scala:167)
	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.removeKeysOlderThanWatermark(statefulOperators.scala:350)
	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec$$anonfun$doExecute$4$$anonfun$apply$4$$anonfun$apply$mcV$sp$1.apply$mcV$sp(statefulOperators.scala:403)
	at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:96)
	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.timeTakenMs(statefulOperators.scala:350)
	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec$$anonfun$doExecute$4$$anonfun$apply$4.apply$mcV$sp(statefulOperators.scala:403)
	at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:344)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Couldn't find eventtime#157-T30000ms in [value#185]
	at scala.sys.package$.error(package.scala:27)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 49 more
{code}

I'm somehow convinced that watermark support leaks from {{StreamingDeduplicate}} and forces
a Spark developer to include extra fields for watermark. I think filter pushdown (for the
select) should not be executed for this case or should include the extra {{eventTime}} column
(regardless of whether a developer uses it or not).



> dropDuplicates with watermark yields RuntimeException due to binding failure
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-21546
>                 URL: https://issues.apache.org/jira/browse/SPARK-21546
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: Jacek Laskowski
>
> With today's master...
> The following streaming query with watermark and {{dropDuplicates}} yields {{RuntimeException}}
due to failure in binding.
> {code}
> val topic1 = spark.
>   readStream.
>   format("kafka").
>   option("subscribe", "topic1").
>   option("kafka.bootstrap.servers", "localhost:9092").
>   option("startingoffsets", "earliest").
>   load
> val records = topic1.
>   withColumn("eventtime", 'timestamp).  // <-- just to put the right name given the
purpose
>   withWatermark(eventTime = "eventtime", delayThreshold = "30 seconds"). // <-- use
the renamed eventtime column
>   dropDuplicates("value").  // dropDuplicates will use watermark
>                             // only when eventTime column exists
>   // include the watermark column => internal design leak?
>   select('key cast "string", 'value cast "string", 'eventtime).
>   as[(String, String, java.sql.Timestamp)]
> scala> records.explain
> == Physical Plan ==
> *Project [cast(key#0 as string) AS key#169, cast(value#1 as string) AS value#170, eventtime#157-T30000ms]
> +- StreamingDeduplicate [value#1], StatefulOperatorStateInfo(<unknown>,93c3de98-3f85-41a4-8aef-d09caf8ea693,0,0),
0
>    +- Exchange hashpartitioning(value#1, 200)
>       +- EventTimeWatermark eventtime#157: timestamp, interval 30 seconds
>          +- *Project [key#0, value#1, timestamp#5 AS eventtime#157]
>             +- StreamingRelation kafka, [key#0, value#1, topic#2, partition#3, offset#4L,
timestamp#5, timestampType#6]
> import org.apache.spark.sql.streaming.{OutputMode, Trigger}
> val sq = records.
>   writeStream.
>   format("console").
>   option("truncate", false).
>   trigger(Trigger.ProcessingTime("10 seconds")).
>   queryName("from-kafka-topic1-to-console").
>   outputMode(OutputMode.Update).
>   start
> {code}
> {code}
> -------------------------------------------
> Batch: 0
> -------------------------------------------
> 17/07/27 10:28:58 ERROR Executor: Exception in task 3.0 in stage 13.0 (TID 438)
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree:
eventtime#157-T30000ms
> 	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
> 	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
> 	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
> 	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
> 	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
> 	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> 	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
> 	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
> 	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
> 	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
> 	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
> 	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
> 	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
> 	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
> 	at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.bind(GeneratePredicate.scala:45)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.bind(GeneratePredicate.scala:40)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:977)
> 	at org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:370)
> 	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.org$apache$spark$sql$execution$streaming$WatermarkSupport$$super$newPredicate(statefulOperators.scala:350)
> 	at org.apache.spark.sql.execution.streaming.WatermarkSupport$$anonfun$watermarkPredicateForKeys$1.apply(statefulOperators.scala:160)
> 	at org.apache.spark.sql.execution.streaming.WatermarkSupport$$anonfun$watermarkPredicateForKeys$1.apply(statefulOperators.scala:160)
> 	at scala.Option.map(Option.scala:146)
> 	at org.apache.spark.sql.execution.streaming.WatermarkSupport$class.watermarkPredicateForKeys(statefulOperators.scala:160)
> 	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.watermarkPredicateForKeys$lzycompute(statefulOperators.scala:350)
> 	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.watermarkPredicateForKeys(statefulOperators.scala:350)
> 	at org.apache.spark.sql.execution.streaming.WatermarkSupport$class.removeKeysOlderThanWatermark(statefulOperators.scala:167)
> 	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.removeKeysOlderThanWatermark(statefulOperators.scala:350)
> 	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec$$anonfun$doExecute$4$$anonfun$apply$4$$anonfun$apply$mcV$sp$1.apply$mcV$sp(statefulOperators.scala:403)
> 	at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:96)
> 	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.timeTakenMs(statefulOperators.scala:350)
> 	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec$$anonfun$doExecute$4$$anonfun$apply$4.apply$mcV$sp(statefulOperators.scala:403)
> 	at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:108)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:344)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Couldn't find eventtime#157-T30000ms in [value#185]
> 	at scala.sys.package$.error(package.scala:27)
> 	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
> 	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
> 	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
> 	... 49 more
> {code}
> I'm somehow convinced that watermark support leaks from {{StreamingDeduplicate}} and
forces a Spark developer to include extra fields for watermark. I think filter pushdown (for
the select) should not be executed for this case or should include the extra {{eventTime}}
column (regardless of whether a developer uses it or not).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message