spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
Date Mon, 11 May 2015 11:20:25 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 fff3c86b0 -> da1be15cc


[SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time

tdas

https://issues.apache.org/jira/browse/SPARK-7326

The problem most likely resides in DStream.slice() implementation, as shown below.

  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
    if (!isInitialized) {
      throw new SparkException(this + " has not been initialized")
    }
    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

    logInfo("Slicing from " + fromTime + " to " + toTime +
      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")

    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
      if (time >= zeroTime) getOrCompute(time) else None
    })
  }

Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime)
and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making
isTimeValid() check failed for all the remaining computation.

The fix is to add a new floor() function in Time.scala to respect the zeroTime while performing
the floor :

  def floor(that: Duration, zeroTime: Time): Time = {
    val t = that.milliseconds
    new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
  }

And then change the DStream.slice to call this new floor function by passing in its zeroTime.

    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

This way the alignedToTime and alignedFromTime are *really* aligned in respect to zeroTime
whose value is not really a 0.

Author: Wesley Miao <wesley.miao@gmail.com>
Author: Wesley <wesley.miao@autodesk.com>

Closes #5871 from wesleymiao/spark-7326 and squashes the following commits:

82a4d8c [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream dosen't
work all the time
48b4dc0 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't
work all the time
6ade399 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't
work all the time
2611745 [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't
work all the time

(cherry picked from commit d70a076892e0677acceccaba665908cdf664f1b4)
Signed-off-by: Sean Owen <sowen@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da1be15c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da1be15c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da1be15c

Branch: refs/heads/branch-1.4
Commit: da1be15cc6191f077e282a480bdcc2950fd9c5e8
Parents: fff3c86
Author: Wesley Miao <wesley.miao@gmail.com>
Authored: Mon May 11 12:20:06 2015 +0100
Committer: Sean Owen <sowen@cloudera.com>
Committed: Mon May 11 12:20:21 2015 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/streaming/Time.scala |  5 +++++
 .../spark/streaming/dstream/DStream.scala       | 22 +++++++++++++-------
 .../org/apache/spark/streaming/TimeSuite.scala  |  3 +++
 3 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/da1be15c/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
index 42c4967..92cfd7d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
@@ -63,6 +63,11 @@ case class Time(private val millis: Long) {
     new Time((this.millis / t) * t)
   }
 
+  def floor(that: Duration, zeroTime: Time): Time = {
+    val t = that.milliseconds
+    new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
+  }
+
   def isMultipleOf(that: Duration): Boolean =
     (this.millis % that.milliseconds == 0)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/da1be15c/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index f1f8a70..7092a3d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -763,16 +763,22 @@ abstract class DStream[T: ClassTag] (
     if (!isInitialized) {
       throw new SparkException(this + " has not been initialized")
     }
-    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
-      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
-        + slideDuration + ")")
+
+    val alignedToTime = if ((toTime - zeroTime).isMultipleOf(slideDuration)) {
+      toTime
+    } else {
+      logWarning("toTime (" + toTime + ") is not a multiple of slideDuration ("
+          + slideDuration + ")")
+        toTime.floor(slideDuration, zeroTime)
     }
-    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
-      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
-        + slideDuration + ")")
+
+    val alignedFromTime = if ((fromTime - zeroTime).isMultipleOf(slideDuration)) {
+      fromTime
+    } else {
+      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+      + slideDuration + ")")
+      fromTime.floor(slideDuration, zeroTime)
     }
-    val alignedToTime = toTime.floor(slideDuration)
-    val alignedFromTime = fromTime.floor(slideDuration)
 
     logInfo("Slicing from " + fromTime + " to " + toTime +
       " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")

http://git-wip-us.apache.org/repos/asf/spark/blob/da1be15c/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala
index 5579ac3..e6a0165 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala
@@ -69,6 +69,9 @@ class TimeSuite extends TestSuiteBase {
     assert(new Time(1200).floor(new Duration(200)) == new Time(1200))
     assert(new Time(199).floor(new Duration(200)) == new Time(0))
     assert(new Time(1).floor(new Duration(1)) == new Time(1))
+    assert(new Time(1350).floor(new Duration(200), new Time(50)) == new Time(1250))
+    assert(new Time(1350).floor(new Duration(200), new Time(150)) == new Time(1350))
+    assert(new Time(1350).floor(new Duration(200), new Time(200)) == new Time(1200))
   }
 
   test("isMultipleOf") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message