spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [1/2] git commit: - Allow for finer control of cleaner - Address review comments, move to incubator spark - Also includes a change to speculation - including preventing exceptions in rare cases.
Date Fri, 11 Oct 2013 22:43:03 GMT
Updated Branches:
  refs/heads/master 8f11c36fe -> d6ead4780


- Allow for finer control of cleaner
- Address review comments, move to incubator spark
- Also includes a change to speculation - including preventing exceptions in rare cases.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b5025d90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b5025d90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b5025d90

Branch: refs/heads/master
Commit: b5025d90bbf3e81701a3c3f5e22efbb2e2164eea
Parents: 232765f
Author: Mridul Muralidharan <mridul@gmail.com>
Authored: Fri Oct 4 21:57:57 2013 +0530
Committer: Mridul Muralidharan <mridul@gmail.com>
Committed: Sun Oct 6 00:35:51 2013 +0530

----------------------------------------------------------------------
 .../org/apache/spark/MapOutputTracker.scala     |  4 +-
 .../scala/org/apache/spark/SparkContext.scala   |  8 +++-
 .../spark/broadcast/BitTorrentBroadcast.scala   |  4 +-
 .../apache/spark/broadcast/HttpBroadcast.scala  |  8 ++--
 .../apache/spark/broadcast/TreeBroadcast.scala  |  4 +-
 .../spark/network/netty/ShuffleSender.scala     |  3 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  4 +-
 .../org/apache/spark/scheduler/ResultTask.scala |  4 +-
 .../apache/spark/scheduler/ShuffleMapTask.scala |  4 +-
 .../cluster/ClusterTaskSetManager.scala         |  4 +-
 .../org/apache/spark/storage/BlockManager.scala | 39 +++++++++++++++++---
 .../org/apache/spark/util/MetadataCleaner.scala | 36 ++++++++++++++++--
 12 files changed, 93 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index ae7cf2a..1e3f1eb 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -32,7 +32,7 @@ import akka.util.Duration
 
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashMap}
+import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashMap}
 
 
 private[spark] sealed trait MapOutputTrackerMessage
@@ -71,7 +71,7 @@ private[spark] class MapOutputTracker extends Logging {
   var cacheEpoch = epoch
   private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]
 
-  val metadataCleaner = new MetadataCleaner("MapOutputTracker", this.cleanup)
+  val metadataCleaner = new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup)
 
   // Send a message to the trackerActor and get its result within a default timeout, or
   // throw a SparkException if this fails.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2fb4a53..2f40179 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -61,7 +61,11 @@ import org.apache.spark.scheduler.local.LocalScheduler
 import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap}
+import org.apache.spark.util._
+import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.storage.RDDInfo
+import org.apache.spark.storage.StorageStatus
+import scala.Some
 import org.apache.spark.scheduler.StageInfo
 import org.apache.spark.storage.RDDInfo
 import org.apache.spark.storage.StorageStatus
@@ -116,7 +120,7 @@ class SparkContext(
 
   // Keeps track of all persisted RDDs
   private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
-  private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
+  private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT,
this.cleanup)
 
   // Initalize the Spark UI
   private[spark] val ui = new SparkUI(this)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
index 93e7815..f82dea9 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.{ListBuffer, Map, Set}
 import scala.math
 
 import org.apache.spark._
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.{BlockManager, StorageLevel}
 import org.apache.spark.util.Utils
 
 private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean,
id: Long)
@@ -36,7 +36,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
 
   def value = value_
 
-  def blockId: String = "broadcast_" + id
+  def blockId: String = BlockManager.toBroadcastId(id)
 
   MultiTracker.synchronized {
     SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 9db26ae..a4ceb0d 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -25,8 +25,8 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
 
 import org.apache.spark.{HttpServer, Logging, SparkEnv}
 import org.apache.spark.io.CompressionCodec
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashSet}
+import org.apache.spark.storage.{BlockManager, StorageLevel}
+import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashSet}
 
 
 private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
@@ -34,7 +34,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal:
Boolea
   
   def value = value_
 
-  def blockId: String = "broadcast_" + id
+  def blockId: String = BlockManager.toBroadcastId(id)
 
   HttpBroadcast.synchronized {
     SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
@@ -82,7 +82,7 @@ private object HttpBroadcast extends Logging {
   private var server: HttpServer = null
 
   private val files = new TimeStampedHashSet[String]
-  private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)
+  private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup)
 
   private lazy val compressionCodec = CompressionCodec.createCodec()
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
index 80c97ca..b664f28 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.{ListBuffer, Map, Set}
 import scala.math
 
 import org.apache.spark._
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.{BlockManager, StorageLevel}
 import org.apache.spark.util.Utils
 
 private[spark] class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
@@ -33,7 +33,7 @@ extends Broadcast[T](id) with Logging with Serializable {
 
   def value = value_
 
-  def blockId = "broadcast_" + id
+  def blockId = BlockManager.toBroadcastId(id)
 
   MultiTracker.synchronized {
     SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
index 8afcbe1..0c5ded3 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
@@ -21,6 +21,7 @@ import java.io.File
 
 import org.apache.spark.Logging
 import org.apache.spark.util.Utils
+import org.apache.spark.storage.ShuffleBlockManager
 
 
 private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging
{
@@ -54,7 +55,7 @@ private[spark] object ShuffleSender {
 
     val pResovler = new PathResolver {
       override def getAbsolutePath(blockId: String): String = {
-        if (!blockId.startsWith("shuffle_")) {
+        if (!ShuffleBlockManager.isShuffle(blockId)) {
           throw new Exception("Block " + blockId + " is not a shuffle block")
         }
         // Figure out which local directory it hashes to, and which subdirectory in that

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4053b91..9c9834e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
 import org.apache.spark.storage.{BlockManager, BlockManagerMaster}
-import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
+import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
 
 /**
  * The high-level scheduling layer that implements stage-oriented scheduling. It computes
a DAG of
@@ -138,7 +138,7 @@ class DAGScheduler(
   val activeJobs = new HashSet[ActiveJob]
   val resultStageToJob = new HashMap[Stage, ActiveJob]
 
-  val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
+  val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
 
   // Start a thread to run the DAGScheduler event loop
   def start() {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 07e8317..6dd422b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -23,7 +23,7 @@ import java.util.zip.{GZIPInputStream, GZIPOutputStream}
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.RDDCheckpointData
-import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
+import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
 
 private[spark] object ResultTask {
 
@@ -32,7 +32,7 @@ private[spark] object ResultTask {
   // expensive on the master node if it needs to launch thousands of tasks.
   val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
 
-  val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.clearOldValues)
+  val metadataCleaner = new MetadataCleaner(MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues)
 
   def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _):
Array[Byte] = {
     synchronized {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index d23df0d..3b9d567 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.HashMap
 import org.apache.spark._
 import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.storage._
-import org.apache.spark.util.{TimeStampedHashMap, MetadataCleaner}
+import org.apache.spark.util.{MetadataCleanerType, TimeStampedHashMap, MetadataCleaner}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.RDDCheckpointData
 
@@ -37,7 +37,7 @@ private[spark] object ShuffleMapTask {
   // expensive on the master node if it needs to launch thousands of tasks.
   val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
 
-  val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.clearOldValues)
+  val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_MAP_TASK, serializedInfoCache.clearOldValues)
 
   def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte]
= {
     synchronized {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 194ab55..936167c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -630,11 +630,11 @@ private[spark] class ClusterTaskSetManager(
     var foundTasks = false
     val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
     logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
-    if (tasksSuccessful >= minFinishedForSpeculation) {
+    if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0)
{
       val time = clock.getTime()
       val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
       Arrays.sort(durations)
-      val medianDuration = durations(min((0.5 * numTasks).round.toInt, durations.size - 1))
+      val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size
- 1))
       val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100)
       // TODO: Threshold should also look at standard deviation of task durations and have
a lower
       // bound based on that.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 495a72d..37a67e7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -154,7 +154,8 @@ private[spark] class BlockManager(
 
   var heartBeatTask: Cancellable = null
 
-  val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
+  private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks)
+  private val broadcastCleaner = new MetadataCleaner(MetadataCleanerType.BROADCAST_VARS,
this.dropOldBroadcastBlocks)
   initialize()
 
   // The compression codec to use. Note that the "lazy" val is necessary because we want
to delay
@@ -911,13 +912,36 @@ private[spark] class BlockManager(
     }
   }
 
-  def dropOldBlocks(cleanupTime: Long) {
-    logInfo("Dropping blocks older than " + cleanupTime)
+  private def dropOldNonBroadcastBlocks(cleanupTime: Long) {
+    logInfo("Dropping non broadcast blocks older than " + cleanupTime)
     val iterator = blockInfo.internalMap.entrySet().iterator()
     while (iterator.hasNext) {
       val entry = iterator.next()
       val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
-      if (time < cleanupTime) {
+      if (time < cleanupTime && ! BlockManager.isBroadcastBlock(id) ) {
+        info.synchronized {
+          val level = info.level
+          if (level.useMemory) {
+            memoryStore.remove(id)
+          }
+          if (level.useDisk) {
+            diskStore.remove(id)
+          }
+          iterator.remove()
+          logInfo("Dropped block " + id)
+        }
+        reportBlockStatus(id, info)
+      }
+    }
+  }
+
+  private def dropOldBroadcastBlocks(cleanupTime: Long) {
+    logInfo("Dropping broadcast blocks older than " + cleanupTime)
+    val iterator = blockInfo.internalMap.entrySet().iterator()
+    while (iterator.hasNext) {
+      val entry = iterator.next()
+      val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
+      if (time < cleanupTime && BlockManager.isBroadcastBlock(id) ) {
         info.synchronized {
           val level = info.level
           if (level.useMemory) {
@@ -937,7 +961,7 @@ private[spark] class BlockManager(
   def shouldCompress(blockId: String): Boolean = {
     if (ShuffleBlockManager.isShuffle(blockId)) {
       compressShuffle
-    } else if (blockId.startsWith("broadcast_")) {
+    } else if (BlockManager.isBroadcastBlock(blockId)) {
       compressBroadcast
     } else if (blockId.startsWith("rdd_")) {
       compressRdds
@@ -994,6 +1018,7 @@ private[spark] class BlockManager(
     memoryStore.clear()
     diskStore.clear()
     metadataCleaner.cancel()
+    broadcastCleaner.cancel()
     logInfo("BlockManager stopped")
   }
 }
@@ -1067,5 +1092,9 @@ private[spark] object BlockManager extends Logging {
   {
     blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.host))
   }
+
+  def isBroadcastBlock(blockId: String): Boolean = null != blockId && blockId.startsWith("broadcast_")
+
+  def toBroadcastId(id: Long): String = "broadcast_" + id
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index a430a75..0ce1394 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.util
 
-import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors}
 import java.util.{TimerTask, Timer}
 import org.apache.spark.Logging
 
@@ -25,11 +24,14 @@ import org.apache.spark.Logging
 /**
  * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
  */
-class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
+class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc:
(Long) => Unit) extends Logging {
+  val name = cleanerType.toString
+
   private val delaySeconds = MetadataCleaner.getDelaySeconds
   private val periodSeconds = math.max(10, delaySeconds / 10)
   private val timer = new Timer(name + " cleanup timer", true)
 
+
   private val task = new TimerTask {
     override def run() {
       try {
@@ -53,9 +55,37 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends
Logging
   }
 }
 
+object MetadataCleanerType extends Enumeration("MapOutputTracker", "SparkContext", "HttpBroadcast",
"DagScheduler", "ResultTask",
+  "ShuffleMapTask", "BlockManager", "BroadcastVars") {
+
+  val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, SHUFFLE_MAP_TASK,
BLOCK_MANAGER, BROADCAST_VARS = Value
+
+  type MetadataCleanerType = Value
+
+  def systemProperty(which: MetadataCleanerType.MetadataCleanerType) = "spark.cleaner.ttl."
+ which.toString
+}
 
 object MetadataCleaner {
+
+  // using only sys props for now : so that workers can also get to it while preserving earlier
behavior.
   def getDelaySeconds = System.getProperty("spark.cleaner.ttl", "-1").toInt
-  def setDelaySeconds(delay: Int) { System.setProperty("spark.cleaner.ttl", delay.toString)
}
+
+  def getDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType): Int = {
+    System.getProperty(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds.toString).toInt
+  }
+
+  def setDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType, delay: Int) {
+    System.setProperty(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
+  }
+
+  def setDelaySeconds(delay: Int, resetAll: Boolean = true) {
+    // override for all ?
+    System.setProperty("spark.cleaner.ttl", delay.toString)
+    if (resetAll) {
+      for (cleanerType <- MetadataCleanerType.values) {
+        System.clearProperty(MetadataCleanerType.systemProperty(cleanerType))
+      }
+    }
+  }
 }
 


Mime
View raw message