spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject [2/2] spark git commit: SPARK-4682 [CORE] Consolidate various 'Clock' classes
Date Thu, 19 Feb 2015 23:35:34 GMT
SPARK-4682 [CORE] Consolidate various 'Clock' classes

Another one from JoshRosen 's wish list. The first commit is much smaller and removes 2 of the 4 Clock classes. The second is much larger, necessary for consolidating the streaming one. I put together implementations in the way that seemed simplest. Almost all the change is standardizing class and method names.

Author: Sean Owen <sowen@cloudera.com>

Closes #4514 from srowen/SPARK-4682 and squashes the following commits:

5ed3a03 [Sean Owen] Javadoc Clock classes; make ManualClock private[spark]
169dd13 [Sean Owen] Add support for legacy org.apache.spark.streaming clock class names
277785a [Sean Owen] Reduce the net change in this patch by reversing some unnecessary syntax changes along the way
b5e53df [Sean Owen] FakeClock -> ManualClock; getTime() -> getTimeMillis()
160863a [Sean Owen] Consolidate Streaming Clock class into common util Clock
7c956b2 [Sean Owen] Consolidate Clocks except for Streaming Clock

(cherry picked from commit 34b7c35380c88569a1396fb4ed991a0bed4288e7)
Signed-off-by: Andrew Or <andrew@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd49e8b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd49e8b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd49e8b9

Branch: refs/heads/branch-1.3
Commit: bd49e8b962b397b8fb8b22f980739021cf1a195e
Parents: ff8976e
Author: Sean Owen <sowen@cloudera.com>
Authored: Thu Feb 19 15:35:23 2015 -0800
Committer: Andrew Or <andrew@databricks.com>
Committed: Thu Feb 19 15:35:31 2015 -0800

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 28 +-----
 .../spark/deploy/worker/DriverRunner.scala      | 17 ++--
 .../apache/spark/scheduler/DAGScheduler.scala   | 20 ++---
 .../apache/spark/scheduler/TaskSetManager.scala | 16 ++--
 .../scala/org/apache/spark/util/Clock.scala     | 44 +++++++++-
 .../org/apache/spark/util/ManualClock.scala     | 69 +++++++++++++++
 .../spark/ExecutorAllocationManagerSuite.scala  | 65 +++++++-------
 .../spark/deploy/worker/DriverRunnerTest.scala  |  3 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 36 ++++----
 .../scala/org/apache/spark/util/FakeClock.scala | 26 ------
 .../streaming/LocalJavaStreamingContext.java    |  2 +-
 .../flume/FlumePollingStreamSuite.scala         |  7 +-
 .../streaming/LocalJavaStreamingContext.java    |  2 +-
 .../streaming/LocalJavaStreamingContext.java    |  2 +-
 .../streaming/LocalJavaStreamingContext.java    |  2 +-
 .../kinesis/KinesisCheckpointState.scala        | 10 +--
 .../kinesis/KinesisRecordProcessor.scala        |  2 +-
 .../kinesis/KinesisReceiverSuite.scala          | 25 +++---
 .../JavaStreamingLogisticRegressionSuite.java   |  2 +-
 .../JavaStreamingLinearRegressionSuite.java     |  2 +-
 project/MimaExcludes.scala                      |  5 ++
 .../streaming/dstream/FileInputDStream.scala    |  6 +-
 .../streaming/receiver/BlockGenerator.scala     |  3 +-
 .../receiver/ReceivedBlockHandler.scala         |  4 +-
 .../streaming/scheduler/JobGenerator.scala      | 13 ++-
 .../scheduler/ReceivedBlockTracker.scala        |  6 +-
 .../org/apache/spark/streaming/util/Clock.scala | 89 --------------------
 .../spark/streaming/util/RecurringTimer.scala   |  5 +-
 .../streaming/util/WriteAheadLogManager.scala   |  5 +-
 .../streaming/LocalJavaStreamingContext.java    |  2 +-
 .../spark/streaming/BasicOperationsSuite.scala  |  9 +-
 .../spark/streaming/CheckpointSuite.scala       | 33 ++++----
 .../spark/streaming/InputStreamsSuite.scala     | 37 ++++----
 .../streaming/ReceivedBlockHandlerSuite.scala   |  6 +-
 .../streaming/ReceivedBlockTrackerSuite.scala   | 10 +--
 .../apache/spark/streaming/TestSuiteBase.scala  | 15 ++--
 .../streaming/util/WriteAheadLogSuite.scala     | 10 +--
 37 files changed, 301 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 998695b..21c6e6f 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -20,6 +20,7 @@ package org.apache.spark
 import scala.collection.mutable
 
 import org.apache.spark.scheduler._
+import org.apache.spark.util.{SystemClock, Clock}
 
 /**
  * An agent that dynamically allocates and removes executors based on the workload.
@@ -123,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
   private val intervalMillis: Long = 100
 
   // Clock used to schedule when executors should be added and removed
-  private var clock: Clock = new RealClock
+  private var clock: Clock = new SystemClock()
 
   // Listener for Spark events that impact the allocation policy
   private val listener = new ExecutorAllocationListener
@@ -588,28 +589,3 @@ private[spark] class ExecutorAllocationManager(
 private object ExecutorAllocationManager {
   val NOT_SET = Long.MaxValue
 }
-
-/**
- * An abstract clock for measuring elapsed time.
- */
-private trait Clock {
-  def getTimeMillis: Long
-}
-
-/**
- * A clock backed by a monotonically increasing time source.
- * The time returned by this clock does not correspond to any notion of wall-clock time.
- */
-private class RealClock extends Clock {
-  override def getTimeMillis: Long = System.nanoTime / (1000 * 1000)
-}
-
-/**
- * A clock that allows the caller to customize the time.
- * This is used mainly for testing.
- */
-private class TestClock(startTimeMillis: Long) extends Clock {
-  private var time: Long = startTimeMillis
-  override def getTimeMillis: Long = time
-  def tick(ms: Long): Unit = { time += ms }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index b964a09..e16bccb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -20,19 +20,18 @@ package org.apache.spark.deploy.worker
 import java.io._
 
 import scala.collection.JavaConversions._
-import scala.collection.Map
 
 import akka.actor.ActorRef
 import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileUtil, Path}
 
 import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.deploy.{Command, DriverDescription, SparkHadoopUtil}
+import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
 import org.apache.spark.deploy.DeployMessages.DriverStateChanged
 import org.apache.spark.deploy.master.DriverState
 import org.apache.spark.deploy.master.DriverState.DriverState
+import org.apache.spark.util.{Clock, SystemClock}
 
 /**
  * Manages the execution of one driver, including automatically restarting the driver on failure.
@@ -59,9 +58,7 @@ private[spark] class DriverRunner(
   // Decoupled for testing
   private[deploy] def setClock(_clock: Clock) = clock = _clock
   private[deploy] def setSleeper(_sleeper: Sleeper) = sleeper = _sleeper
-  private var clock = new Clock {
-    def currentTimeMillis(): Long = System.currentTimeMillis()
-  }
+  private var clock: Clock = new SystemClock()
   private var sleeper = new Sleeper {
     def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed})
   }
@@ -190,9 +187,9 @@ private[spark] class DriverRunner(
         initialize(process.get)
       }
 
-      val processStart = clock.currentTimeMillis()
+      val processStart = clock.getTimeMillis()
       val exitCode = process.get.waitFor()
-      if (clock.currentTimeMillis() - processStart > successfulRunDuration * 1000) {
+      if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
         waitSeconds = 1
       }
 
@@ -208,10 +205,6 @@ private[spark] class DriverRunner(
   }
 }
 
-private[deploy] trait Clock {
-  def currentTimeMillis(): Long
-}
-
 private[deploy] trait Sleeper {
   def sleep(seconds: Int)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8b62d24..c58721c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -63,7 +63,7 @@ class DAGScheduler(
     mapOutputTracker: MapOutputTrackerMaster,
     blockManagerMaster: BlockManagerMaster,
     env: SparkEnv,
-    clock: org.apache.spark.util.Clock = SystemClock)
+    clock: Clock = new SystemClock())
   extends Logging {
 
   def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
@@ -657,7 +657,7 @@ class DAGScheduler(
       // completion events or stage abort
       stageIdToStage -= s.id
       jobIdToStageIds -= job.jobId
-      listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), jobResult))
+      listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), jobResult))
     }
   }
 
@@ -706,7 +706,7 @@ class DAGScheduler(
         stage.latestInfo.stageFailed(stageFailedMessage)
         listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
       }
-      listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
+      listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
     }
   }
 
@@ -745,7 +745,7 @@ class DAGScheduler(
       logInfo("Missing parents: " + getMissingParentStages(finalStage))
       val shouldRunLocally =
         localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
-      val jobSubmissionTime = clock.getTime()
+      val jobSubmissionTime = clock.getTimeMillis()
       if (shouldRunLocally) {
         // Compute very short actions like first() or take() with no parent stages locally.
         listenerBus.post(
@@ -871,7 +871,7 @@ class DAGScheduler(
       logDebug("New pending tasks: " + stage.pendingTasks)
       taskScheduler.submitTasks(
         new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
-      stage.latestInfo.submissionTime = Some(clock.getTime())
+      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
     } else {
       // Because we posted SparkListenerStageSubmitted earlier, we should post
       // SparkListenerStageCompleted here in case there are no tasks to run.
@@ -940,12 +940,12 @@ class DAGScheduler(
 
     def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = {
       val serviceTime = stage.latestInfo.submissionTime match {
-        case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
+        case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
         case _ => "Unknown"
       }
       if (errorMessage.isEmpty) {
         logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
-        stage.latestInfo.completionTime = Some(clock.getTime())
+        stage.latestInfo.completionTime = Some(clock.getTimeMillis())
       } else {
         stage.latestInfo.stageFailed(errorMessage.get)
         logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
@@ -971,7 +971,7 @@ class DAGScheduler(
                     markStageAsFinished(stage)
                     cleanupStateForJobAndIndependentStages(job)
                     listenerBus.post(
-                      SparkListenerJobEnd(job.jobId, clock.getTime(), JobSucceeded))
+                      SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
                   }
 
                   // taskSucceeded runs some user code that might throw an exception. Make sure
@@ -1187,7 +1187,7 @@ class DAGScheduler(
     }
     val dependentJobs: Seq[ActiveJob] =
       activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq
-    failedStage.latestInfo.completionTime = Some(clock.getTime())
+    failedStage.latestInfo.completionTime = Some(clock.getTimeMillis())
     for (job <- dependentJobs) {
       failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason")
     }
@@ -1242,7 +1242,7 @@ class DAGScheduler(
     if (ableToCancelStages) {
       job.listener.jobFailed(error)
       cleanupStateForJobAndIndependentStages(job)
-      listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
+      listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 99a5f71..529237f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -51,7 +51,7 @@ private[spark] class TaskSetManager(
     sched: TaskSchedulerImpl,
     val taskSet: TaskSet,
     val maxTaskFailures: Int,
-    clock: Clock = SystemClock)
+    clock: Clock = new SystemClock())
   extends Schedulable with Logging {
 
   val conf = sched.sc.conf
@@ -166,7 +166,7 @@ private[spark] class TaskSetManager(
   // last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
   // We then move down if we manage to launch a "more local" task.
   var currentLocalityIndex = 0    // Index of our current locality level in validLocalityLevels
-  var lastLaunchTime = clock.getTime()  // Time we last launched a task at this level
+  var lastLaunchTime = clock.getTimeMillis()  // Time we last launched a task at this level
 
   override def schedulableQueue = null
 
@@ -281,7 +281,7 @@ private[spark] class TaskSetManager(
       val failed = failedExecutors.get(taskId).get
 
       return failed.contains(execId) &&
-        clock.getTime() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
+        clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
     }
 
     false
@@ -428,7 +428,7 @@ private[spark] class TaskSetManager(
     : Option[TaskDescription] =
   {
     if (!isZombie) {
-      val curTime = clock.getTime()
+      val curTime = clock.getTimeMillis()
 
       var allowedLocality = maxLocality
 
@@ -459,7 +459,7 @@ private[spark] class TaskSetManager(
             lastLaunchTime = curTime
           }
           // Serialize and return the task
-          val startTime = clock.getTime()
+          val startTime = clock.getTimeMillis()
           val serializedTask: ByteBuffer = try {
             Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
           } catch {
@@ -674,7 +674,7 @@ private[spark] class TaskSetManager(
           return
         }
         val key = ef.description
-        val now = clock.getTime()
+        val now = clock.getTimeMillis()
         val (printFull, dupCount) = {
           if (recentExceptions.contains(key)) {
             val (dupCount, printTime) = recentExceptions(key)
@@ -706,7 +706,7 @@ private[spark] class TaskSetManager(
     }
     // always add to failed executors
     failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
-      put(info.executorId, clock.getTime())
+      put(info.executorId, clock.getTimeMillis())
     sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
     addPendingTask(index)
     if (!isZombie && state != TaskState.KILLED && !reason.isInstanceOf[TaskCommitDenied]) {
@@ -821,7 +821,7 @@ private[spark] class TaskSetManager(
     val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
     logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
     if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
-      val time = clock.getTime()
+      val time = clock.getTimeMillis()
       val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
       Arrays.sort(durations)
       val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1))

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/core/src/main/scala/org/apache/spark/util/Clock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Clock.scala b/core/src/main/scala/org/apache/spark/util/Clock.scala
index 97c2b45..e92ed11 100644
--- a/core/src/main/scala/org/apache/spark/util/Clock.scala
+++ b/core/src/main/scala/org/apache/spark/util/Clock.scala
@@ -21,9 +21,47 @@ package org.apache.spark.util
  * An interface to represent clocks, so that they can be mocked out in unit tests.
  */
 private[spark] trait Clock {
-  def getTime(): Long
+  def getTimeMillis(): Long
+  def waitTillTime(targetTime: Long): Long
 }
 
-private[spark] object SystemClock extends Clock {
-  def getTime(): Long = System.currentTimeMillis()
+/**
+ * A clock backed by the actual time from the OS as reported by the `System` API.
+ */
+private[spark] class SystemClock extends Clock {
+
+  val minPollTime = 25L
+
+  /**
+   * @return the same time (milliseconds since the epoch)
+   *         as is reported by `System.currentTimeMillis()`
+   */
+  def getTimeMillis(): Long = System.currentTimeMillis()
+
+  /**
+   * @param targetTime block until the current time is at least this value
+   * @return current system time when wait has completed
+   */
+  def waitTillTime(targetTime: Long): Long = {
+    var currentTime = 0L
+    currentTime = System.currentTimeMillis()
+
+    var waitTime = targetTime - currentTime
+    if (waitTime <= 0) {
+      return currentTime
+    }
+
+    val pollTime = math.max(waitTime / 10.0, minPollTime).toLong
+
+    while (true) {
+      currentTime = System.currentTimeMillis()
+      waitTime = targetTime - currentTime
+      if (waitTime <= 0) {
+        return currentTime
+      }
+      val sleepTime = math.min(waitTime, pollTime)
+      Thread.sleep(sleepTime)
+    }
+    -1
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/core/src/main/scala/org/apache/spark/util/ManualClock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala
new file mode 100644
index 0000000..cf89c17
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * A `Clock` whose time can be manually set and modified. Its reported time does not change
+ * as time elapses, but only as its time is modified by callers. This is mainly useful for
+ * testing.
+ *
+ * @param time initial time (in milliseconds since the epoch)
+ */
+private[spark] class ManualClock(private var time: Long) extends Clock {
+
+  /**
+   * @return `ManualClock` with initial time 0
+   */
+  def this() = this(0L)
+
+  def getTimeMillis(): Long =
+    synchronized {
+      time
+    }
+
+  /**
+   * @param timeToSet new time (in milliseconds) that the clock should represent
+   */
+  def setTime(timeToSet: Long) =
+    synchronized {
+      time = timeToSet
+      notifyAll()
+    }
+
+  /**
+   * @param timeToAdd time (in milliseconds) to add to the clock's time
+   */
+  def advance(timeToAdd: Long) =
+    synchronized {
+      time += timeToAdd
+      notifyAll()
+    }
+
+  /**
+   * @param targetTime block until the clock time is set or advanced to at least this time
+   * @return current time reported by the clock when waiting finishes
+   */
+  def waitTillTime(targetTime: Long): Long =
+    synchronized {
+      while (time < targetTime) {
+        wait(100)
+      }
+      getTimeMillis()
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index d3123e8..abfcee7 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.{FunSuite, PrivateMethodTester}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.util.ManualClock
 
 /**
  * Test add and remove behavior of ExecutorAllocationManager.
@@ -321,7 +322,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
 
   test("starting/canceling add timer") {
     sc = createSparkContext(2, 10)
-    val clock = new TestClock(8888L)
+    val clock = new ManualClock(8888L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
 
@@ -330,21 +331,21 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
     onSchedulerBacklogged(manager)
     val firstAddTime = addTime(manager)
     assert(firstAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
-    clock.tick(100L)
+    clock.advance(100L)
     onSchedulerBacklogged(manager)
     assert(addTime(manager) === firstAddTime) // timer is already started
-    clock.tick(200L)
+    clock.advance(200L)
     onSchedulerBacklogged(manager)
     assert(addTime(manager) === firstAddTime)
     onSchedulerQueueEmpty(manager)
 
     // Restart add timer
-    clock.tick(1000L)
+    clock.advance(1000L)
     assert(addTime(manager) === NOT_SET)
     onSchedulerBacklogged(manager)
     val secondAddTime = addTime(manager)
     assert(secondAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
-    clock.tick(100L)
+    clock.advance(100L)
     onSchedulerBacklogged(manager)
     assert(addTime(manager) === secondAddTime) // timer is already started
     assert(addTime(manager) !== firstAddTime)
@@ -353,7 +354,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
 
   test("starting/canceling remove timers") {
     sc = createSparkContext(2, 10)
-    val clock = new TestClock(14444L)
+    val clock = new ManualClock(14444L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
 
@@ -366,17 +367,17 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
     assert(removeTimes(manager).contains("1"))
     val firstRemoveTime = removeTimes(manager)("1")
     assert(firstRemoveTime === clock.getTimeMillis + executorIdleTimeout * 1000)
-    clock.tick(100L)
+    clock.advance(100L)
     onExecutorIdle(manager, "1")
     assert(removeTimes(manager)("1") === firstRemoveTime) // timer is already started
-    clock.tick(200L)
+    clock.advance(200L)
     onExecutorIdle(manager, "1")
     assert(removeTimes(manager)("1") === firstRemoveTime)
-    clock.tick(300L)
+    clock.advance(300L)
     onExecutorIdle(manager, "2")
     assert(removeTimes(manager)("2") !== firstRemoveTime) // different executor
     assert(removeTimes(manager)("2") === clock.getTimeMillis + executorIdleTimeout * 1000)
-    clock.tick(400L)
+    clock.advance(400L)
     onExecutorIdle(manager, "3")
     assert(removeTimes(manager)("3") !== firstRemoveTime)
     assert(removeTimes(manager)("3") === clock.getTimeMillis + executorIdleTimeout * 1000)
@@ -385,7 +386,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
     assert(removeTimes(manager).contains("3"))
 
     // Restart remove timer
-    clock.tick(1000L)
+    clock.advance(1000L)
     onExecutorBusy(manager, "1")
     assert(removeTimes(manager).size === 2)
     onExecutorIdle(manager, "1")
@@ -401,7 +402,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
   test("mock polling loop with no events") {
     sc = createSparkContext(1, 20)
     val manager = sc.executorAllocationManager.get
-    val clock = new TestClock(2020L)
+    val clock = new ManualClock(2020L)
     manager.setClock(clock)
 
     // No events - we should not be adding or removing
@@ -410,15 +411,15 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
     schedule(manager)
     assert(numExecutorsPending(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
-    clock.tick(100L)
+    clock.advance(100L)
     schedule(manager)
     assert(numExecutorsPending(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
-    clock.tick(1000L)
+    clock.advance(1000L)
     schedule(manager)
     assert(numExecutorsPending(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
-    clock.tick(10000L)
+    clock.advance(10000L)
     schedule(manager)
     assert(numExecutorsPending(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
@@ -426,57 +427,57 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
 
   test("mock polling loop add behavior") {
     sc = createSparkContext(1, 20)
-    val clock = new TestClock(2020L)
+    val clock = new ManualClock(2020L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
     // Scheduler queue backlogged
     onSchedulerBacklogged(manager)
-    clock.tick(schedulerBacklogTimeout * 1000 / 2)
+    clock.advance(schedulerBacklogTimeout * 1000 / 2)
     schedule(manager)
     assert(numExecutorsPending(manager) === 0) // timer not exceeded yet
-    clock.tick(schedulerBacklogTimeout * 1000)
+    clock.advance(schedulerBacklogTimeout * 1000)
     schedule(manager)
     assert(numExecutorsPending(manager) === 1) // first timer exceeded
-    clock.tick(sustainedSchedulerBacklogTimeout * 1000 / 2)
+    clock.advance(sustainedSchedulerBacklogTimeout * 1000 / 2)
     schedule(manager)
     assert(numExecutorsPending(manager) === 1) // second timer not exceeded yet
-    clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+    clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
     assert(numExecutorsPending(manager) === 1 + 2) // second timer exceeded
-    clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+    clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
     assert(numExecutorsPending(manager) === 1 + 2 + 4) // third timer exceeded
 
     // Scheduler queue drained
     onSchedulerQueueEmpty(manager)
-    clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+    clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
     assert(numExecutorsPending(manager) === 7) // timer is canceled
-    clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+    clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
     assert(numExecutorsPending(manager) === 7)
 
     // Scheduler queue backlogged again
     onSchedulerBacklogged(manager)
-    clock.tick(schedulerBacklogTimeout * 1000)
+    clock.advance(schedulerBacklogTimeout * 1000)
     schedule(manager)
     assert(numExecutorsPending(manager) === 7 + 1) // timer restarted
-    clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+    clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
     assert(numExecutorsPending(manager) === 7 + 1 + 2)
-    clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+    clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
     assert(numExecutorsPending(manager) === 7 + 1 + 2 + 4)
-    clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+    clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
     assert(numExecutorsPending(manager) === 20) // limit reached
   }
 
   test("mock polling loop remove behavior") {
     sc = createSparkContext(1, 20)
-    val clock = new TestClock(2020L)
+    val clock = new ManualClock(2020L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
 
@@ -486,11 +487,11 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
     onExecutorAdded(manager, "executor-3")
     assert(removeTimes(manager).size === 3)
     assert(executorsPendingToRemove(manager).isEmpty)
-    clock.tick(executorIdleTimeout * 1000 / 2)
+    clock.advance(executorIdleTimeout * 1000 / 2)
     schedule(manager)
     assert(removeTimes(manager).size === 3) // idle threshold not reached yet
     assert(executorsPendingToRemove(manager).isEmpty)
-    clock.tick(executorIdleTimeout * 1000)
+    clock.advance(executorIdleTimeout * 1000)
     schedule(manager)
     assert(removeTimes(manager).isEmpty) // idle threshold exceeded
     assert(executorsPendingToRemove(manager).size === 2) // limit reached (1 executor remaining)
@@ -511,7 +512,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
     assert(!removeTimes(manager).contains("executor-5"))
     assert(!removeTimes(manager).contains("executor-6"))
     assert(executorsPendingToRemove(manager).size === 2)
-    clock.tick(executorIdleTimeout * 1000)
+    clock.advance(executorIdleTimeout * 1000)
     schedule(manager)
     assert(removeTimes(manager).isEmpty) // idle executors are removed
     assert(executorsPendingToRemove(manager).size === 4)
@@ -529,7 +530,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
     assert(removeTimes(manager).contains("executor-5"))
     assert(removeTimes(manager).contains("executor-6"))
     assert(executorsPendingToRemove(manager).size === 4)
-    clock.tick(executorIdleTimeout * 1000)
+    clock.advance(executorIdleTimeout * 1000)
     schedule(manager)
     assert(removeTimes(manager).isEmpty)
     assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 executor remaining)

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index b6f4411..aa6e487 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -27,6 +27,7 @@ import org.scalatest.FunSuite
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.{Command, DriverDescription}
+import org.apache.spark.util.Clock
 
 class DriverRunnerTest extends FunSuite {
   private def createDriverRunner() = {
@@ -129,7 +130,7 @@ class DriverRunnerTest extends FunSuite {
       .thenReturn(-1) // fail 3
       .thenReturn(-1) // fail 4
       .thenReturn(0) // success
-    when(clock.currentTimeMillis())
+    when(clock.getTimeMillis())
       .thenReturn(0).thenReturn(1000) // fail 1 (short)
       .thenReturn(1000).thenReturn(2000) // fail 2 (short)
       .thenReturn(2000).thenReturn(10000) // fail 3 (long)

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 5958056..12330d8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.FunSuite
 
 import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.FakeClock
+import org.apache.spark.util.ManualClock
 
 class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
   extends DAGScheduler(sc) {
@@ -164,7 +164,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     sc = new SparkContext("local", "test")
     val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(1)
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     // Offer a host with NO_PREF as the constraint,
@@ -213,7 +213,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     sc = new SparkContext("local", "test")
     val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
     val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     // An executor that is not NODE_LOCAL should be rejected.
@@ -234,7 +234,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
       Seq(TaskLocation("host1"), TaskLocation("host2", "exec2")),
       Seq()   // Last task has no locality prefs
     )
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
     // First offer host1, exec1: first task should be chosen
     assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -263,7 +263,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
       Seq(TaskLocation("host2", "exec3")),
       Seq()   // Last task has no locality prefs
     )
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
     // First offer host1, exec1: first task should be chosen
     assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0)
@@ -283,7 +283,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
       Seq(TaskLocation("host3")),
       Seq(TaskLocation("host2"))
     )
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     // First offer host1: first task should be chosen
@@ -321,7 +321,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
       Seq(TaskLocation("host2")),
       Seq(TaskLocation("host3"))
     )
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     // First offer host1: first task should be chosen
@@ -353,7 +353,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     sc = new SparkContext("local", "test")
     val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(1)
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -370,7 +370,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     sc = new SparkContext("local", "test")
     val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(1)
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
@@ -402,7 +402,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
       ("exec1.1", "host1"), ("exec2", "host2"))
     // affinity to exec1 on host1 - which we will fail.
     val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, 4, clock)
 
     {
@@ -486,7 +486,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
       Seq(TaskLocation("host1", "execB")),
       Seq(TaskLocation("host2", "execC")),
       Seq())
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
     // Only ANY is valid
     assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
@@ -522,7 +522,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     val taskSet = FakeTask.createTaskSet(2,
       Seq(TaskLocation("host1", "execA")),
       Seq(TaskLocation("host1", "execA")))
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
@@ -611,7 +611,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
       Seq(TaskLocation("host2"), TaskLocation("host1")),
       Seq(),
       Seq(TaskLocation("host3", "execC")))
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0)
@@ -637,7 +637,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
       Seq(TaskLocation("host2")),
       Seq(),
       Seq(TaskLocation("host3")))
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     // node-local tasks are scheduled without delay
@@ -658,7 +658,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
       Seq(TaskLocation("host2")),
       Seq(ExecutorCacheTaskLocation("host1", "execA")),
       Seq(ExecutorCacheTaskLocation("host2", "execB")))
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     // process-local tasks are scheduled first
@@ -678,7 +678,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
       Seq(),
       Seq(ExecutorCacheTaskLocation("host1", "execA")),
       Seq(ExecutorCacheTaskLocation("host2", "execB")))
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     // process-local tasks are scheduled first
@@ -698,7 +698,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     val taskSet = FakeTask.createTaskSet(2,
       Seq(TaskLocation("host1", "execA")),
       Seq(TaskLocation("host2", "execB.1")))
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
     // Only ANY is valid
     assert(manager.myLocalityLevels.sameElements(Array(ANY)))
@@ -732,7 +732,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
       Seq(HostTaskLocation("host1")),
       Seq(HostTaskLocation("host2")),
       Seq(HDFSCacheTaskLocation("host3")))
-    val clock = new FakeClock
+    val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
     assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
     sched.removeExecutor("execA")

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/core/src/test/scala/org/apache/spark/util/FakeClock.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/FakeClock.scala b/core/src/test/scala/org/apache/spark/util/FakeClock.scala
deleted file mode 100644
index 0a45917..0000000
--- a/core/src/test/scala/org/apache/spark/util/FakeClock.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-class FakeClock extends Clock {
-  private var time = 0L
-
-  def advance(millis: Long): Unit = time += millis
-
-  def getTime(): Long = time
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 1e24da7..cfedb5a 100644
--- a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext {
         SparkConf conf = new SparkConf()
             .setMaster("local[2]")
             .setAppName("test")
-            .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
         ssc = new JavaStreamingContext(conf, new Duration(1000));
         ssc.checkpoint("checkpoint");
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index b57a1c7..e04d408 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -34,10 +34,9 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
 import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.util.ManualClock
 import org.apache.spark.streaming.{Seconds, TestOutputStream, StreamingContext}
 import org.apache.spark.streaming.flume.sink._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ManualClock, Utils}
 
 class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging {
 
@@ -54,7 +53,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
 
   def beforeFunction() {
     logInfo("Using manual clock")
-    conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+    conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
   }
 
   before(beforeFunction())
@@ -236,7 +235,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
         tx.commit()
         tx.close()
         Thread.sleep(500) // Allow some time for the events to reach
-        clock.addToTime(batchDuration.milliseconds)
+        clock.advance(batchDuration.milliseconds)
       }
       null
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 1e24da7..cfedb5a 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext {
         SparkConf conf = new SparkConf()
             .setMaster("local[2]")
             .setAppName("test")
-            .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
         ssc = new JavaStreamingContext(conf, new Duration(1000));
         ssc.checkpoint("checkpoint");
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 1e24da7..cfedb5a 100644
--- a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext {
         SparkConf conf = new SparkConf()
             .setMaster("local[2]")
             .setAppName("test")
-            .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
         ssc = new JavaStreamingContext(conf, new Duration(1000));
         ssc.checkpoint("checkpoint");
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 1e24da7..cfedb5a 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext {
         SparkConf conf = new SparkConf()
             .setMaster("local[2]")
             .setAppName("test")
-            .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
         ssc = new JavaStreamingContext(conf, new Duration(1000));
         ssc.checkpoint("checkpoint");
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
index 0b80b61..588e86a 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
@@ -18,9 +18,7 @@ package org.apache.spark.streaming.kinesis
 
 import org.apache.spark.Logging
 import org.apache.spark.streaming.Duration
-import org.apache.spark.streaming.util.Clock
-import org.apache.spark.streaming.util.ManualClock
-import org.apache.spark.streaming.util.SystemClock
+import org.apache.spark.util.{Clock, ManualClock, SystemClock}
 
 /**
  * This is a helper class for managing checkpoint clocks.
@@ -35,7 +33,7 @@ private[kinesis] class KinesisCheckpointState(
   
   /* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */
   val checkpointClock = new ManualClock()
-  checkpointClock.setTime(currentClock.currentTime() + checkpointInterval.milliseconds)
+  checkpointClock.setTime(currentClock.getTimeMillis() + checkpointInterval.milliseconds)
 
   /**
    * Check if it's time to checkpoint based on the current time and the derived time 
@@ -44,13 +42,13 @@ private[kinesis] class KinesisCheckpointState(
    * @return true if it's time to checkpoint
    */
   def shouldCheckpoint(): Boolean = {
-    new SystemClock().currentTime() > checkpointClock.currentTime()
+    new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis()
   }
 
   /**
    * Advance the checkpoint clock by the checkpoint interval.
    */
   def advanceCheckpoint() = {
-    checkpointClock.addToTime(checkpointInterval.milliseconds)
+    checkpointClock.advance(checkpointInterval.milliseconds)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index 8ecc2d9..af8cd87 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -104,7 +104,7 @@ private[kinesis] class KinesisRecordProcessor(
           logDebug(s"Checkpoint:  WorkerId $workerId completed checkpoint of ${batch.size}" +
               s" records for shardId $shardId")
           logDebug(s"Checkpoint:  Next checkpoint is at " +
-              s" ${checkpointState.checkpointClock.currentTime()} for shardId $shardId")
+              s" ${checkpointState.checkpointClock.getTimeMillis()} for shardId $shardId")
         }
       } catch {
         case e: Throwable => {

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index f56898a..255fe65 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -25,8 +25,7 @@ import org.apache.spark.streaming.Milliseconds
 import org.apache.spark.streaming.Seconds
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.TestSuiteBase
-import org.apache.spark.streaming.util.Clock
-import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.util.{ManualClock, Clock}
 
 import org.mockito.Mockito._
 import org.scalatest.BeforeAndAfter
@@ -129,45 +128,45 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
   }
 
   test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") {
-    when(currentClockMock.currentTime()).thenReturn(0)
+    when(currentClockMock.getTimeMillis()).thenReturn(0)
 
     val checkpointIntervalMillis = 10
     val checkpointState =
       new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
-    assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+    assert(checkpointState.checkpointClock.getTimeMillis() == checkpointIntervalMillis)
 
-    verify(currentClockMock, times(1)).currentTime()
+    verify(currentClockMock, times(1)).getTimeMillis()
   }
 
   test("should checkpoint if we have exceeded the checkpoint interval") {
-    when(currentClockMock.currentTime()).thenReturn(0)
+    when(currentClockMock.getTimeMillis()).thenReturn(0)
 
     val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
     assert(checkpointState.shouldCheckpoint())
 
-    verify(currentClockMock, times(1)).currentTime()
+    verify(currentClockMock, times(1)).getTimeMillis()
   }
 
   test("shouldn't checkpoint if we have not exceeded the checkpoint interval") {
-    when(currentClockMock.currentTime()).thenReturn(0)
+    when(currentClockMock.getTimeMillis()).thenReturn(0)
 
     val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
     assert(!checkpointState.shouldCheckpoint())
 
-    verify(currentClockMock, times(1)).currentTime()
+    verify(currentClockMock, times(1)).getTimeMillis()
   }
 
   test("should add to time when advancing checkpoint") {
-    when(currentClockMock.currentTime()).thenReturn(0)
+    when(currentClockMock.getTimeMillis()).thenReturn(0)
 
     val checkpointIntervalMillis = 10
     val checkpointState =
       new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
-    assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+    assert(checkpointState.checkpointClock.getTimeMillis() == checkpointIntervalMillis)
     checkpointState.advanceCheckpoint()
-    assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis))
+    assert(checkpointState.checkpointClock.getTimeMillis() == (2 * checkpointIntervalMillis))
 
-    verify(currentClockMock, times(1)).currentTime()
+    verify(currentClockMock, times(1)).getTimeMillis()
   }
 
   test("shutdown should checkpoint if the reason is TERMINATE") {

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/mllib/src/test/java/org/apache/spark/ml/classification/JavaStreamingLogisticRegressionSuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/ml/classification/JavaStreamingLogisticRegressionSuite.java b/mllib/src/test/java/org/apache/spark/ml/classification/JavaStreamingLogisticRegressionSuite.java
index ac945ba..640d2ec 100644
--- a/mllib/src/test/java/org/apache/spark/ml/classification/JavaStreamingLogisticRegressionSuite.java
+++ b/mllib/src/test/java/org/apache/spark/ml/classification/JavaStreamingLogisticRegressionSuite.java
@@ -47,7 +47,7 @@ public class JavaStreamingLogisticRegressionSuite implements Serializable {
     SparkConf conf = new SparkConf()
       .setMaster("local[2]")
       .setAppName("test")
-      .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+      .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
     ssc = new JavaStreamingContext(conf, new Duration(1000));
     ssc.checkpoint("checkpoint");
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java
index a4dd1ac..899c4ea 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java
@@ -45,7 +45,7 @@ public class JavaStreamingLinearRegressionSuite implements Serializable {
     SparkConf conf = new SparkConf()
       .setMaster("local[2]")
       .setAppName("test")
-      .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+      .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
     ssc = new JavaStreamingContext(conf, new Duration(1000));
     ssc.checkpoint("checkpoint");
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4065a56..ee6229a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -148,6 +148,11 @@ object MimaExcludes {
             ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.mllib.linalg.VectorUDT"),
             ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.linalg.VectorUDT.serialize"),
             ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.linalg.VectorUDT.sqlType")
+          ) ++ Seq(
+            // SPARK-4682
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.RealClock"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Clock"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.TestClock")
           )
 
         case v if v.startsWith("1.2") =>

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 4f7db41..22de8c0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -88,7 +88,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
 
   // Initial ignore threshold based on which old, existing files in the directory (at the time of
   // starting the streaming application) will be ignored or considered
-  private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L
+  private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.getTimeMillis() else 0L
 
   /*
    * Make sure that the information of files selected in the last few batches are remembered.
@@ -161,7 +161,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
    */
   private def findNewFiles(currentTime: Long): Array[String] = {
     try {
-      lastNewFileFindingTime = clock.currentTime()
+      lastNewFileFindingTime = clock.getTimeMillis()
 
       // Calculate ignore threshold
       val modTimeIgnoreThreshold = math.max(
@@ -174,7 +174,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
         def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
       }
       val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
-      val timeTaken = clock.currentTime() - lastNewFileFindingTime
+      val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
       logInfo("Finding new files took " + timeTaken + " ms")
       logDebug("# cached file times = " + fileToModTime.size)
       if (timeTaken > slideDuration.milliseconds) {

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 79263a7..ee5e639 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -23,7 +23,8 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.storage.StreamBlockId
-import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
+import org.apache.spark.streaming.util.RecurringTimer
+import org.apache.spark.util.SystemClock
 
 /** Listener object for BlockGenerator events */
 private[streaming] trait BlockGeneratorListener {

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index f7a8ebe..dcdc27d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -27,8 +27,8 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.storage._
-import org.apache.spark.streaming.util.{Clock, SystemClock, WriteAheadLogFileSegment, WriteAheadLogManager}
-import org.apache.spark.util.Utils
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogManager}
+import org.apache.spark.util.{Clock, SystemClock, Utils}
 
 /** Trait that represents the metadata related to storage of blocks */
 private[streaming] trait ReceivedBlockStoreResult {

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 8632c94..ac92774 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -23,7 +23,8 @@ import akka.actor.{ActorRef, Props, Actor}
 
 import org.apache.spark.{SparkEnv, Logging}
 import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
-import org.apache.spark.streaming.util.{Clock, ManualClock, RecurringTimer}
+import org.apache.spark.streaming.util.RecurringTimer
+import org.apache.spark.util.{Clock, ManualClock}
 
 /** Event classes for JobGenerator */
 private[scheduler] sealed trait JobGeneratorEvent
@@ -45,8 +46,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
 
   val clock = {
     val clockClass = ssc.sc.conf.get(
-      "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
-    Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+      "spark.streaming.clock", "org.apache.spark.util.SystemClock")
+    try {
+      Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+    } catch {
+      case e: ClassNotFoundException if clockClass.startsWith("org.apache.spark.streaming") =>
+        val newClockClass = clockClass.replace("org.apache.spark.streaming", "org.apache.spark")
+        Class.forName(newClockClass).newInstance().asInstanceOf[Clock]
+    }
   }
 
   private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index e19ac93..200cf4e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -27,8 +27,8 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{SparkException, Logging, SparkConf}
 import org.apache.spark.streaming.Time
-import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
-import org.apache.spark.util.Utils
+import org.apache.spark.streaming.util.WriteAheadLogManager
+import org.apache.spark.util.{Clock, Utils}
 
 /** Trait representing any event in the ReceivedBlockTracker that updates its state. */
 private[streaming] sealed trait ReceivedBlockTrackerLogEvent
@@ -150,7 +150,7 @@ private[streaming] class ReceivedBlockTracker(
    * returns only after the files are cleaned up.
    */
   def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
-    assert(cleanupThreshTime.milliseconds < clock.currentTime())
+    assert(cleanupThreshTime.milliseconds < clock.getTimeMillis())
     val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
     logInfo("Deleting batches " + timesToCleanup)
     writeToLog(BatchCleanupEvent(timesToCleanup))

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
deleted file mode 100644
index d6d96d7..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.util
-
-private[streaming]
-trait Clock {
-  def currentTime(): Long
-  def waitTillTime(targetTime: Long): Long
-}
-
-private[streaming]
-class SystemClock() extends Clock {
-
-  val minPollTime = 25L
-
-  def currentTime(): Long = {
-    System.currentTimeMillis()
-  }
-
-  def waitTillTime(targetTime: Long): Long = {
-    var currentTime = 0L
-    currentTime = System.currentTimeMillis()
-
-    var waitTime = targetTime - currentTime
-    if (waitTime <= 0) {
-      return currentTime
-    }
-
-    val pollTime = math.max(waitTime / 10.0, minPollTime).toLong
-
-    while (true) {
-      currentTime = System.currentTimeMillis()
-      waitTime = targetTime - currentTime
-      if (waitTime <= 0) {
-        return currentTime
-      }
-      val sleepTime = math.min(waitTime, pollTime)
-      Thread.sleep(sleepTime)
-    }
-    -1
-  }
-}
-
-private[streaming]
-class ManualClock() extends Clock {
-
-  private var time = 0L
-
-  def currentTime() = this.synchronized {
-    time
-  }
-
-  def setTime(timeToSet: Long) = {
-    this.synchronized {
-      time = timeToSet
-      this.notifyAll()
-    }
-  }
-
-  def addToTime(timeToAdd: Long) = {
-    this.synchronized {
-      time += timeToAdd
-      this.notifyAll()
-    }
-  }
-  def waitTillTime(targetTime: Long): Long = {
-    this.synchronized {
-      while (time < targetTime) {
-        this.wait(100)
-      }
-    }
-    currentTime()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
index 1a616a0..c8eef83 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming.util
 
 import org.apache.spark.Logging
+import org.apache.spark.util.{Clock, SystemClock}
 
 private[streaming]
 class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
@@ -38,7 +39,7 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
    * current system time.
    */
   def getStartTime(): Long = {
-    (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
+    (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period
   }
 
   /**
@@ -48,7 +49,7 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
    * more than current time.
    */
   def getRestartTime(originalStartTime: Long): Long = {
-    val gap = clock.currentTime - originalStartTime
+    val gap = clock.getTimeMillis() - originalStartTime
     (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
index 166661b..985ded9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
@@ -19,13 +19,12 @@ package org.apache.spark.streaming.util
 import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, ExecutionContext, Future}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.spark.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, SystemClock, Utils}
 import WriteAheadLogManager._
 
 /**
@@ -82,7 +81,7 @@ private[streaming] class WriteAheadLogManager(
     var succeeded = false
     while (!succeeded && failures < maxFailures) {
       try {
-        fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+        fileSegment = getLogWriter(clock.getTimeMillis()).write(byteBuffer)
         succeeded = true
       } catch {
         case ex: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 1e24da7..cfedb5a 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext {
         SparkConf conf = new SparkConf()
             .setMaster("local[2]")
             .setAppName("test")
-            .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
         ssc = new JavaStreamingContext(conf, new Duration(1000));
         ssc.checkpoint("checkpoint");
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index e8f4a77..cf19171 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -22,13 +22,12 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 import scala.language.existentials
 import scala.reflect.ClassTag
 
-import util.ManualClock
-
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.{BlockRDD, RDD}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.{DStream, WindowedDStream}
+import org.apache.spark.util.{Clock, ManualClock}
 import org.apache.spark.HashPartitioner
 
 class BasicOperationsSuite extends TestSuiteBase {
@@ -586,7 +585,7 @@ class BasicOperationsSuite extends TestSuiteBase {
         for (i <- 0 until input.size) {
           testServer.send(input(i).toString + "\n")
           Thread.sleep(200)
-          clock.addToTime(batchDuration.milliseconds)
+          clock.advance(batchDuration.milliseconds)
           collectRddInfo()
         }
 
@@ -637,8 +636,8 @@ class BasicOperationsSuite extends TestSuiteBase {
         ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]]
       if (rememberDuration != null) ssc.remember(rememberDuration)
       val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
-      val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-      assert(clock.currentTime() === Seconds(10).milliseconds)
+      val clock = ssc.scheduler.clock.asInstanceOf[Clock]
+      assert(clock.getTimeMillis() === Seconds(10).milliseconds)
       assert(output.size === numExpectedOutput)
       operatedStream
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 8f8bc61..03c448f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -32,8 +32,7 @@ import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutput
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
-import org.apache.spark.streaming.util.ManualClock
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, ManualClock, Utils}
 
 /**
  * This test suites tests the checkpointing functionality of DStreams -
@@ -61,7 +60,7 @@ class CheckpointSuite extends TestSuiteBase {
 
     assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
 
-    conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+    conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
 
     val stateStreamCheckpointInterval = Seconds(1)
     val fs = FileSystem.getLocal(new Configuration())
@@ -324,13 +323,13 @@ class CheckpointSuite extends TestSuiteBase {
      * Writes a file named `i` (which contains the number `i`) to the test directory and sets its
      * modification time to `clock`'s current time.
      */
-    def writeFile(i: Int, clock: ManualClock): Unit = {
+    def writeFile(i: Int, clock: Clock): Unit = {
       val file = new File(testDir, i.toString)
       Files.write(i + "\n", file, Charsets.UTF_8)
-      assert(file.setLastModified(clock.currentTime()))
+      assert(file.setLastModified(clock.getTimeMillis()))
       // Check that the file's modification date is actually the value we wrote, since rounding or
       // truncation will break the test:
-      assert(file.lastModified() === clock.currentTime())
+      assert(file.lastModified() === clock.getTimeMillis())
     }
 
     /**
@@ -372,13 +371,13 @@ class CheckpointSuite extends TestSuiteBase {
         ssc.start()
 
         // Advance half a batch so that the first file is created after the StreamingContext starts
-        clock.addToTime(batchDuration.milliseconds / 2)
+        clock.advance(batchDuration.milliseconds / 2)
         // Create files and advance manual clock to process them
         for (i <- Seq(1, 2, 3)) {
           writeFile(i, clock)
           // Advance the clock after creating the file to avoid a race when
           // setting its modification time
-          clock.addToTime(batchDuration.milliseconds)
+          clock.advance(batchDuration.milliseconds)
           if (i != 3) {
             // Since we want to shut down while the 3rd batch is processing
             eventually(eventuallyTimeout) {
@@ -386,7 +385,7 @@ class CheckpointSuite extends TestSuiteBase {
             }
           }
         }
-        clock.addToTime(batchDuration.milliseconds)
+        clock.advance(batchDuration.milliseconds)
         eventually(eventuallyTimeout) {
           // Wait until all files have been recorded and all batches have started
           assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3)
@@ -410,7 +409,7 @@ class CheckpointSuite extends TestSuiteBase {
         writeFile(i, clock)
         // Advance the clock after creating the file to avoid a race when
         // setting its modification time
-        clock.addToTime(batchDuration.milliseconds)
+        clock.advance(batchDuration.milliseconds)
       }
 
       // Recover context from checkpoint file and verify whether the files that were
@@ -419,7 +418,7 @@ class CheckpointSuite extends TestSuiteBase {
       withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
         // So that the restarted StreamingContext's clock has gone forward in time since failure
         ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration * 3).milliseconds.toString)
-        val oldClockTime = clock.currentTime()
+        val oldClockTime = clock.getTimeMillis()
         clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
         val batchCounter = new BatchCounter(ssc)
         val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
@@ -430,7 +429,7 @@ class CheckpointSuite extends TestSuiteBase {
         ssc.start()
         // Verify that the clock has traveled forward to the expected time
         eventually(eventuallyTimeout) {
-          clock.currentTime() === oldClockTime
+          clock.getTimeMillis() === oldClockTime
         }
         // Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch)
         val numBatchesAfterRestart = 4
@@ -441,12 +440,12 @@ class CheckpointSuite extends TestSuiteBase {
           writeFile(i, clock)
           // Advance the clock after creating the file to avoid a race when
           // setting its modification time
-          clock.addToTime(batchDuration.milliseconds)
+          clock.advance(batchDuration.milliseconds)
           eventually(eventuallyTimeout) {
             assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1)
           }
         }
-        clock.addToTime(batchDuration.milliseconds)
+        clock.advance(batchDuration.milliseconds)
         logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]"))
         assert(outputStream.output.size > 0, "No files processed after restart")
         ssc.stop()
@@ -521,12 +520,12 @@ class CheckpointSuite extends TestSuiteBase {
    */
   def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
     val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-    logInfo("Manual clock before advancing = " + clock.currentTime())
+    logInfo("Manual clock before advancing = " + clock.getTimeMillis())
     for (i <- 1 to numBatches.toInt) {
-      clock.addToTime(batchDuration.milliseconds)
+      clock.advance(batchDuration.milliseconds)
       Thread.sleep(batchDuration.milliseconds)
     }
-    logInfo("Manual clock after advancing = " + clock.currentTime())
+    logInfo("Manual clock after advancing = " + clock.getTimeMillis())
     Thread.sleep(batchDuration.milliseconds)
 
     val outputStream = ssc.graph.getOutputStreams.filter { dstream =>

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 01084a4..7ed6320 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -17,12 +17,8 @@
 
 package org.apache.spark.streaming
 
-import akka.actor.Actor
-import akka.actor.Props
-import akka.util.ByteString
-
 import java.io.{File, BufferedWriter, OutputStreamWriter}
-import java.net.{InetSocketAddress, SocketException, ServerSocket}
+import java.net.{SocketException, ServerSocket}
 import java.nio.charset.Charset
 import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
 import java.util.concurrent.atomic.AtomicInteger
@@ -36,9 +32,8 @@ import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.Logging
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.util.ManualClock
-import org.apache.spark.util.Utils
-import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
+import org.apache.spark.util.{ManualClock, Utils}
+import org.apache.spark.streaming.receiver.Receiver
 import org.apache.spark.rdd.RDD
 import org.apache.hadoop.io.{Text, LongWritable}
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
@@ -69,7 +64,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     for (i <- 0 until input.size) {
       testServer.send(input(i).toString + "\n")
       Thread.sleep(500)
-      clock.addToTime(batchDuration.milliseconds)
+      clock.advance(batchDuration.milliseconds)
     }
     Thread.sleep(1000)
     logInfo("Stopping server")
@@ -120,19 +115,19 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
         // Advance the clock so that the files are created after StreamingContext starts, but
         // not enough to trigger a batch
-        clock.addToTime(batchDuration.milliseconds / 2)
+        clock.advance(batchDuration.milliseconds / 2)
 
         val input = Seq(1, 2, 3, 4, 5)
         input.foreach { i =>
           Thread.sleep(batchDuration.milliseconds)
           val file = new File(testDir, i.toString)
           Files.write(Array[Byte](i.toByte), file)
-          assert(file.setLastModified(clock.currentTime()))
-          assert(file.lastModified === clock.currentTime)
+          assert(file.setLastModified(clock.getTimeMillis()))
+          assert(file.lastModified === clock.getTimeMillis())
           logInfo("Created file " + file)
           // Advance the clock after creating the file to avoid a race when
           // setting its modification time
-          clock.addToTime(batchDuration.milliseconds)
+          clock.advance(batchDuration.milliseconds)
           eventually(eventuallyTimeout) {
             assert(batchCounter.getNumCompletedBatches === i)
           }
@@ -179,7 +174,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     while((!MultiThreadTestReceiver.haveAllThreadsFinished || output.sum < numTotalRecords) &&
       System.currentTimeMillis() - startTime < 5000) {
       Thread.sleep(100)
-      clock.addToTime(batchDuration.milliseconds)
+      clock.advance(batchDuration.milliseconds)
     }
     Thread.sleep(1000)
     logInfo("Stopping context")
@@ -214,7 +209,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     for (i <- 0 until input.size) {
       // Enqueue more than 1 item per tick but they should dequeue one at a time
       inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
-      clock.addToTime(batchDuration.milliseconds)
+      clock.advance(batchDuration.milliseconds)
     }
     Thread.sleep(1000)
     logInfo("Stopping context")
@@ -256,12 +251,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     // Enqueue the first 3 items (one by one), they should be merged in the next batch
     val inputIterator = input.toIterator
     inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
-    clock.addToTime(batchDuration.milliseconds)
+    clock.advance(batchDuration.milliseconds)
     Thread.sleep(1000)
 
     // Enqueue the remaining items (again one by one), merged in the final batch
     inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
-    clock.addToTime(batchDuration.milliseconds)
+    clock.advance(batchDuration.milliseconds)
     Thread.sleep(1000)
     logInfo("Stopping context")
     ssc.stop()
@@ -308,19 +303,19 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
         // Advance the clock so that the files are created after StreamingContext starts, but
         // not enough to trigger a batch
-        clock.addToTime(batchDuration.milliseconds / 2)
+        clock.advance(batchDuration.milliseconds / 2)
 
         // Over time, create files in the directory
         val input = Seq(1, 2, 3, 4, 5)
         input.foreach { i =>
           val file = new File(testDir, i.toString)
           Files.write(i + "\n", file, Charset.forName("UTF-8"))
-          assert(file.setLastModified(clock.currentTime()))
-          assert(file.lastModified === clock.currentTime)
+          assert(file.setLastModified(clock.getTimeMillis()))
+          assert(file.lastModified === clock.getTimeMillis())
           logInfo("Created file " + file)
           // Advance the clock after creating the file to avoid a race when
           // setting its modification time
-          clock.addToTime(batchDuration.milliseconds)
+          clock.advance(batchDuration.milliseconds)
           eventually(eventuallyTimeout) {
             assert(batchCounter.getNumCompletedBatches === i)
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message