spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-19861][SS] watermark should not be a negative time.
Date Thu, 09 Mar 2017 19:07:40 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 2a76e2420 -> ffe65b065


[SPARK-19861][SS] watermark should not be a negative time.

## What changes were proposed in this pull request?

`watermark` should not be negative. This behavior is invalid, check it before real run.

## How was this patch tested?

add new unit test.

Author: uncleGen <hustyugm@gmail.com>
Author: dylon <hustyugm@gmail.com>

Closes #17202 from uncleGen/SPARK-19861.

(cherry picked from commit 30b18e69361746b4d656474374d8b486bb48a19e)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ffe65b06
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ffe65b06
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ffe65b06

Branch: refs/heads/branch-2.1
Commit: ffe65b06511f3143cb2549073bfbe145663ad561
Parents: 2a76e24
Author: uncleGen <hustyugm@gmail.com>
Authored: Thu Mar 9 11:07:31 2017 -0800
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Thu Mar 9 11:07:37 2017 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/Dataset.scala    |  4 +++-
 .../sql/streaming/EventTimeWatermarkSuite.scala | 23 ++++++++++++++++++++
 2 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ffe65b06/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 563bfa8..e2d0e51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -559,7 +559,7 @@ class Dataset[T] private[sql](
    * @param eventTime the name of the column that contains the event time of the row.
    * @param delayThreshold the minimum delay to wait to data to arrive late, relative to
the latest
    *                       record that has been processed in the form of an interval
-   *                       (e.g. "1 minute" or "5 hours").
+   *                       (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
    *
    * @group streaming
    * @since 2.1.0
@@ -572,6 +572,8 @@ class Dataset[T] private[sql](
     val parsedDelay =
       Option(CalendarInterval.fromString("interval " + delayThreshold))
         .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
+    require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
+      s"delay threshold ($delayThreshold) should not be negative.")
     EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ffe65b06/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index c768525..7614ea5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -306,6 +306,29 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter
with Loggin
     )
   }
 
+  test("delay threshold should not be negative.") {
+    val inputData = MemoryStream[Int].toDF()
+    var e = intercept[IllegalArgumentException] {
+      inputData.withWatermark("value", "-1 year")
+    }
+    assert(e.getMessage contains "should not be negative.")
+
+    e = intercept[IllegalArgumentException] {
+      inputData.withWatermark("value", "1 year -13 months")
+    }
+    assert(e.getMessage contains "should not be negative.")
+
+    e = intercept[IllegalArgumentException] {
+      inputData.withWatermark("value", "1 month -40 days")
+    }
+    assert(e.getMessage contains "should not be negative.")
+
+    e = intercept[IllegalArgumentException] {
+      inputData.withWatermark("value", "-10 seconds")
+    }
+    assert(e.getMessage contains "should not be negative.")
+  }
+
   test("the new watermark should override the old one") {
     val df = MemoryStream[(Long, Long)].toDF()
       .withColumn("first", $"_1".cast("timestamp"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message