spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Spark (JIRA)" <j...@apache.org>
Subject [jira] [Assigned] (SPARK-24462) Text socket micro-batch reader throws error when a query is restarted with saved state
Date Mon, 04 Jun 2018 17:46:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-24462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Apache Spark reassigned SPARK-24462:
------------------------------------

    Assignee:     (was: Apache Spark)

> Text socket micro-batch reader throws error when a query is restarted with saved state
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-24462
>                 URL: https://issues.apache.org/jira/browse/SPARK-24462
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: Arun Mahadevan
>            Priority: Critical
>
> Exception thrown:
>  
> {noformat}
> scala> 18/06/01 22:47:04 ERROR MicroBatchExecution: Query [id = 0bdc4428-5d21-4237-9d64-898ae65f28f3,
runId = f6822423-2bd2-47c1-8ed6-799d1c481195] terminated with error
> java.lang.RuntimeException: Offsets committed out of order: 2 followed by -1
>  at scala.sys.package$.error(package.scala:27)
>  at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchReader.commit(socket.scala:197)
>  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$2$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:377)
>  
> {noformat}
>  
> Sample code that reproduces the error on restarting the query.
>  
> {code:java}
>  
> import java.sql.Timestamp
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import spark.implicits._
> import org.apache.spark.sql.streaming.Trigger
> val lines = spark.readStream.format("socket").option("host", "localhost").option("port",
9999).option("includeTimestamp", true).load()
> val words = lines.as[(String, Timestamp)].flatMap(line => line._1.split(" ").map(word
=> (word, line._2))).toDF("word", "timestamp")
> val windowedCounts = words.groupBy(window($"timestamp", "20 minutes", "20 minutes"),
$"word").count().orderBy("window")
> val query = windowedCounts.writeStream.outputMode("complete").option("checkpointLocation",
"/tmp/debug").format("console").option("truncate", "false").start()
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message