Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E4B0410F16 for ; Fri, 11 Oct 2013 22:43:26 +0000 (UTC) Received: (qmail 61990 invoked by uid 500); 11 Oct 2013 22:43:26 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 61953 invoked by uid 500); 11 Oct 2013 22:43:26 -0000 Mailing-List: contact commits-help@spark.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@spark.incubator.apache.org Delivered-To: mailing list commits@spark.incubator.apache.org Received: (qmail 61946 invoked by uid 99); 11 Oct 2013 22:43:26 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Oct 2013 22:43:26 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 11 Oct 2013 22:43:24 +0000 Received: (qmail 60749 invoked by uid 99); 11 Oct 2013 22:43:04 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Oct 2013 22:43:04 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C3A0A915CEF; Fri, 11 Oct 2013 22:43:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: matei@apache.org To: commits@spark.incubator.apache.org Date: Fri, 11 Oct 2013 22:43:03 -0000 Message-Id: <5457567ba7e941cfb7e58dac97570835@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: - Allow for finer control of cleaner - Address review comments, move to incubator spark - Also includes a change to speculation - including preventing exceptions in rare cases. X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/master 8f11c36fe -> d6ead4780 - Allow for finer control of cleaner - Address review comments, move to incubator spark - Also includes a change to speculation - including preventing exceptions in rare cases. Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b5025d90 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b5025d90 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b5025d90 Branch: refs/heads/master Commit: b5025d90bbf3e81701a3c3f5e22efbb2e2164eea Parents: 232765f Author: Mridul Muralidharan Authored: Fri Oct 4 21:57:57 2013 +0530 Committer: Mridul Muralidharan Committed: Sun Oct 6 00:35:51 2013 +0530 ---------------------------------------------------------------------- .../org/apache/spark/MapOutputTracker.scala | 4 +- .../scala/org/apache/spark/SparkContext.scala | 8 +++- .../spark/broadcast/BitTorrentBroadcast.scala | 4 +- .../apache/spark/broadcast/HttpBroadcast.scala | 8 ++-- .../apache/spark/broadcast/TreeBroadcast.scala | 4 +- .../spark/network/netty/ShuffleSender.scala | 3 +- .../apache/spark/scheduler/DAGScheduler.scala | 4 +- .../org/apache/spark/scheduler/ResultTask.scala | 4 +- .../apache/spark/scheduler/ShuffleMapTask.scala | 4 +- .../cluster/ClusterTaskSetManager.scala | 4 +- .../org/apache/spark/storage/BlockManager.scala | 39 +++++++++++++++++--- .../org/apache/spark/util/MetadataCleaner.scala | 36 ++++++++++++++++-- 12 files changed, 93 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ae7cf2a..1e3f1eb 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -32,7 +32,7 @@ import akka.util.Duration import org.apache.spark.scheduler.MapStatus import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashMap} +import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashMap} private[spark] sealed trait MapOutputTrackerMessage @@ -71,7 +71,7 @@ private[spark] class MapOutputTracker extends Logging { var cacheEpoch = epoch private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]] - val metadataCleaner = new MetadataCleaner("MapOutputTracker", this.cleanup) + val metadataCleaner = new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup) // Send a message to the trackerActor and get its result within a default timeout, or // throw a SparkException if this fails. http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2fb4a53..2f40179 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -61,7 +61,11 @@ import org.apache.spark.scheduler.local.LocalScheduler import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.storage.{StorageUtils, BlockManagerSource} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap} +import org.apache.spark.util._ +import org.apache.spark.scheduler.StageInfo +import org.apache.spark.storage.RDDInfo +import org.apache.spark.storage.StorageStatus +import scala.Some import org.apache.spark.scheduler.StageInfo import org.apache.spark.storage.RDDInfo import org.apache.spark.storage.StorageStatus @@ -116,7 +120,7 @@ class SparkContext( // Keeps track of all persisted RDDs private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]] - private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup) + private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup) // Initalize the Spark UI private[spark] val ui = new SparkUI(this) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala index 93e7815..f82dea9 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.{ListBuffer, Map, Set} import scala.math import org.apache.spark._ -import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.{BlockManager, StorageLevel} import org.apache.spark.util.Utils private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long) @@ -36,7 +36,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: def value = value_ - def blockId: String = "broadcast_" + id + def blockId: String = BlockManager.toBroadcastId(id) MultiTracker.synchronized { SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 9db26ae..a4ceb0d 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -25,8 +25,8 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import org.apache.spark.{HttpServer, Logging, SparkEnv} import org.apache.spark.io.CompressionCodec -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashSet} +import org.apache.spark.storage.{BlockManager, StorageLevel} +import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashSet} private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long) @@ -34,7 +34,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea def value = value_ - def blockId: String = "broadcast_" + id + def blockId: String = BlockManager.toBroadcastId(id) HttpBroadcast.synchronized { SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false) @@ -82,7 +82,7 @@ private object HttpBroadcast extends Logging { private var server: HttpServer = null private val files = new TimeStampedHashSet[String] - private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup) + private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup) private lazy val compressionCodec = CompressionCodec.createCodec() http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala index 80c97ca..b664f28 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.{ListBuffer, Map, Set} import scala.math import org.apache.spark._ -import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.{BlockManager, StorageLevel} import org.apache.spark.util.Utils private[spark] class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long) @@ -33,7 +33,7 @@ extends Broadcast[T](id) with Logging with Serializable { def value = value_ - def blockId = "broadcast_" + id + def blockId = BlockManager.toBroadcastId(id) MultiTracker.synchronized { SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala index 8afcbe1..0c5ded3 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala @@ -21,6 +21,7 @@ import java.io.File import org.apache.spark.Logging import org.apache.spark.util.Utils +import org.apache.spark.storage.ShuffleBlockManager private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging { @@ -54,7 +55,7 @@ private[spark] object ShuffleSender { val pResovler = new PathResolver { override def getAbsolutePath(blockId: String): String = { - if (!blockId.startsWith("shuffle_")) { + if (!ShuffleBlockManager.isShuffle(blockId)) { throw new Exception("Block " + blockId + " is not a shuffle block") } // Figure out which local directory it hashes to, and which subdirectory in that http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4053b91..9c9834e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.storage.{BlockManager, BlockManagerMaster} -import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap} +import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap} /** * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of @@ -138,7 +138,7 @@ class DAGScheduler( val activeJobs = new HashSet[ActiveJob] val resultStageToJob = new HashMap[Stage, ActiveJob] - val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup) + val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup) // Start a thread to run the DAGScheduler event loop def start() { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 07e8317..6dd422b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -23,7 +23,7 @@ import java.util.zip.{GZIPInputStream, GZIPOutputStream} import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDDCheckpointData -import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap} +import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap} private[spark] object ResultTask { @@ -32,7 +32,7 @@ private[spark] object ResultTask { // expensive on the master node if it needs to launch thousands of tasks. val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]] - val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.clearOldValues) + val metadataCleaner = new MetadataCleaner(MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues) def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = { synchronized { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index d23df0d..3b9d567 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.HashMap import org.apache.spark._ import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.storage._ -import org.apache.spark.util.{TimeStampedHashMap, MetadataCleaner} +import org.apache.spark.util.{MetadataCleanerType, TimeStampedHashMap, MetadataCleaner} import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDDCheckpointData @@ -37,7 +37,7 @@ private[spark] object ShuffleMapTask { // expensive on the master node if it needs to launch thousands of tasks. val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]] - val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.clearOldValues) + val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_MAP_TASK, serializedInfoCache.clearOldValues) def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = { synchronized { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala index 194ab55..936167c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -630,11 +630,11 @@ private[spark] class ClusterTaskSetManager( var foundTasks = false val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) - if (tasksSuccessful >= minFinishedForSpeculation) { + if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { val time = clock.getTime() val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray Arrays.sort(durations) - val medianDuration = durations(min((0.5 * numTasks).round.toInt, durations.size - 1)) + val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1)) val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100) // TODO: Threshold should also look at standard deviation of task durations and have a lower // bound based on that. http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 495a72d..37a67e7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -154,7 +154,8 @@ private[spark] class BlockManager( var heartBeatTask: Cancellable = null - val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks) + private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks) + private val broadcastCleaner = new MetadataCleaner(MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks) initialize() // The compression codec to use. Note that the "lazy" val is necessary because we want to delay @@ -911,13 +912,36 @@ private[spark] class BlockManager( } } - def dropOldBlocks(cleanupTime: Long) { - logInfo("Dropping blocks older than " + cleanupTime) + private def dropOldNonBroadcastBlocks(cleanupTime: Long) { + logInfo("Dropping non broadcast blocks older than " + cleanupTime) val iterator = blockInfo.internalMap.entrySet().iterator() while (iterator.hasNext) { val entry = iterator.next() val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2) - if (time < cleanupTime) { + if (time < cleanupTime && ! BlockManager.isBroadcastBlock(id) ) { + info.synchronized { + val level = info.level + if (level.useMemory) { + memoryStore.remove(id) + } + if (level.useDisk) { + diskStore.remove(id) + } + iterator.remove() + logInfo("Dropped block " + id) + } + reportBlockStatus(id, info) + } + } + } + + private def dropOldBroadcastBlocks(cleanupTime: Long) { + logInfo("Dropping broadcast blocks older than " + cleanupTime) + val iterator = blockInfo.internalMap.entrySet().iterator() + while (iterator.hasNext) { + val entry = iterator.next() + val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2) + if (time < cleanupTime && BlockManager.isBroadcastBlock(id) ) { info.synchronized { val level = info.level if (level.useMemory) { @@ -937,7 +961,7 @@ private[spark] class BlockManager( def shouldCompress(blockId: String): Boolean = { if (ShuffleBlockManager.isShuffle(blockId)) { compressShuffle - } else if (blockId.startsWith("broadcast_")) { + } else if (BlockManager.isBroadcastBlock(blockId)) { compressBroadcast } else if (blockId.startsWith("rdd_")) { compressRdds @@ -994,6 +1018,7 @@ private[spark] class BlockManager( memoryStore.clear() diskStore.clear() metadataCleaner.cancel() + broadcastCleaner.cancel() logInfo("BlockManager stopped") } } @@ -1067,5 +1092,9 @@ private[spark] object BlockManager extends Logging { { blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.host)) } + + def isBroadcastBlock(blockId: String): Boolean = null != blockId && blockId.startsWith("broadcast_") + + def toBroadcastId(id: Long): String = "broadcast_" + id } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b5025d90/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index a430a75..0ce1394 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -17,7 +17,6 @@ package org.apache.spark.util -import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors} import java.util.{TimerTask, Timer} import org.apache.spark.Logging @@ -25,11 +24,14 @@ import org.apache.spark.Logging /** * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries) */ -class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging { +class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc: (Long) => Unit) extends Logging { + val name = cleanerType.toString + private val delaySeconds = MetadataCleaner.getDelaySeconds private val periodSeconds = math.max(10, delaySeconds / 10) private val timer = new Timer(name + " cleanup timer", true) + private val task = new TimerTask { override def run() { try { @@ -53,9 +55,37 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging } } +object MetadataCleanerType extends Enumeration("MapOutputTracker", "SparkContext", "HttpBroadcast", "DagScheduler", "ResultTask", + "ShuffleMapTask", "BlockManager", "BroadcastVars") { + + val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, SHUFFLE_MAP_TASK, BLOCK_MANAGER, BROADCAST_VARS = Value + + type MetadataCleanerType = Value + + def systemProperty(which: MetadataCleanerType.MetadataCleanerType) = "spark.cleaner.ttl." + which.toString +} object MetadataCleaner { + + // using only sys props for now : so that workers can also get to it while preserving earlier behavior. def getDelaySeconds = System.getProperty("spark.cleaner.ttl", "-1").toInt - def setDelaySeconds(delay: Int) { System.setProperty("spark.cleaner.ttl", delay.toString) } + + def getDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType): Int = { + System.getProperty(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds.toString).toInt + } + + def setDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType, delay: Int) { + System.setProperty(MetadataCleanerType.systemProperty(cleanerType), delay.toString) + } + + def setDelaySeconds(delay: Int, resetAll: Boolean = true) { + // override for all ? + System.setProperty("spark.cleaner.ttl", delay.toString) + if (resetAll) { + for (cleanerType <- MetadataCleanerType.values) { + System.clearProperty(MetadataCleanerType.systemProperty(cleanerType)) + } + } + } }