spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [41/50] git commit: Assign spill threshold as a fraction of maximum memory
Date Sat, 11 Jan 2014 00:25:58 GMT
Assign spill threshold as a fraction of maximum memory

Further, divide this threshold by the number of tasks running concurrently.

Note that this does not guard against the following scenario: a new task
quickly fills up its share of the memory before old tasks finish spilling
their contents, in which case the total memory used by such maps may exceed
what was specified. Currently, spark.shuffle.safetyFraction mitigates the
effect of this.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4296d96c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4296d96c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4296d96c

Branch: refs/heads/master
Commit: 4296d96c82881cde5832bd8f8a3b48eb9817a218
Parents: 333d58d
Author: Andrew Or <andrewor14@gmail.com>
Authored: Sat Jan 4 00:00:57 2014 -0800
Committer: Andrew Or <andrewor14@gmail.com>
Committed: Sat Jan 4 00:00:57 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkEnv.scala  | 16 +++++
 .../org/apache/spark/executor/Executor.scala    |  2 +
 .../spark/util/collection/AppendOnlyMap.scala   |  5 +-
 .../util/collection/ExternalAppendOnlyMap.scala | 74 ++++++++++++++------
 .../collection/ExternalAppendOnlyMapSuite.scala | 17 ++---
 5 files changed, 81 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4296d96c/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 634a94f..224b5c1 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -59,6 +59,9 @@ class SparkEnv private[spark] (
 
   private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
 
+  // Number of tasks currently running across all threads
+  @volatile private var _numRunningTasks = 0
+
   // A general, soft-reference map for metadata needed during HadoopRDD split computation
   // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
   private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
@@ -86,6 +89,19 @@ class SparkEnv private[spark] (
       pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
     }
   }
+
+  /**
+   * Return the number of tasks currently running across all threads
+   */
+  def numRunningTasks: Int = _numRunningTasks
+
+  def incrementNumRunningTasks() = synchronized {
+    _numRunningTasks += 1
+  }
+
+  def decrementNumRunningTasks() = synchronized {
+    _numRunningTasks -= 1
+  }
 }
 
 object SparkEnv extends Logging {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4296d96c/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index e51d274..bd202af 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -186,6 +186,7 @@ private[spark] class Executor(
       var taskStart: Long = 0
       def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
       val startGCTime = gcTime
+      env.incrementNumRunningTasks()
 
       try {
         SparkEnv.set(env)
@@ -279,6 +280,7 @@ private[spark] class Executor(
           //System.exit(1)
         }
       } finally {
+        env.decrementNumRunningTasks()
         runningTasks.remove(taskId)
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4296d96c/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index d2a9574..d8fa7ed 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark.util.collection
 
-import java.util
-import java.util.Comparator
+import java.util.{Arrays, Comparator}
 
 /**
  * A simple open hash table optimized for the append-only use case, where keys
@@ -270,7 +269,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
V)] wi
         cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
       }
     }
-    util.Arrays.sort(data, 0, newIndex, rawOrdering)
+    Arrays.sort(data, 0, newIndex, rawOrdering)
 
     new Iterator[(K, V)] {
       var i = 0

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4296d96c/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 68a2319..c348168 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -32,17 +32,28 @@ import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
  * An append-only map that spills sorted content to disk when the memory threshold is exceeded.
  *
  * This map takes two passes over the data:
- *   (1) Values are merged into combiners, which are sorted and spilled to disk in as necessary.
+ *
+ *   (1) Values are merged into combiners, which are sorted and spilled to disk as necessary
  *   (2) Combiners are read from disk and merged together
  *
- * Two parameters control the memory threshold: `spark.shuffle.buffer.mb` specifies the maximum
- * size of the in-memory map before a spill, and `spark.shuffle.buffer.fraction` specifies
an
- * additional margin of safety. The second parameter is important for the following reason:
+ * The setting of the spill threshold faces the following trade-off: If the spill threshold
is
+ * too high, the in-memory map may occupy more memory than is available, resulting in OOM.
+ * However, if the spill threshold is too low, we spill frequently and incur unnecessary
disk
+ * writes. This may lead to a performance regression compared to the normal case of using
the
+ * non-spilling AppendOnlyMap.
+ *
+ * A few parameters control the memory threshold:
+ *
+ *   `spark.shuffle.memoryFraction` specifies the collective amount of memory used for storing
+ *   these maps as a fraction of the executor's total memory. Since each concurrently running
+ *   task maintains one map, the actual threshold for each map is this quantity divided by
the
+ *   number of running tasks.
  *
- * If the spill threshold is set too high, the in-memory map may occupy more memory than
is
- * available, resulting in OOM. However, if the spill threshold is set too low, we spill
- * frequently and incur unnecessary disk writes. This may lead to a performance regression
- * compared to the normal case of using the non-spilling AppendOnlyMap.
+ *   `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction
of
+ *   this threshold, in case map size estimation is not sufficiently accurate.
+ *
+ *   `spark.shuffle.updateThresholdInterval` controls how frequently each thread checks on
+ *   shared executor state to update its local memory threshold.
  */
 
 private[spark] class ExternalAppendOnlyMap[K, V, C](
@@ -56,35 +67,54 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
   import ExternalAppendOnlyMap._
 
   private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
-  private val spilledMaps = new ArrayBuffer[DiskIterator]
-
+  private val spilledMaps = new ArrayBuffer[DiskMapIterator]
   private val sparkConf = new SparkConf()
-  private val memoryThresholdMB = {
-    // TODO: Turn this into a fraction of memory per reducer
-    val bufferSize = sparkConf.getLong("spark.shuffle.buffer.mb", 1024)
-    val bufferPercent = sparkConf.getDouble("spark.shuffle.buffer.fraction", 0.8)
-    bufferSize * bufferPercent
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+    val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.75)
+    val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8)
+    (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
   }
+
+  // Maximum size for this map before a spill is triggered
+  private var spillThreshold = maxMemoryThreshold
+
+  // How often to update spillThreshold
+  private val updateThresholdInterval =
+    sparkConf.getInt("spark.shuffle.updateThresholdInterval", 100)
+
   private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
   private val syncWrites = sparkConf.get("spark.shuffle.sync", "false").toBoolean
   private val comparator = new KCComparator[K, C]
   private val ser = serializer.newInstance()
+  private var insertCount = 0
   private var spillCount = 0
 
-  def insert(key: K, value: V): Unit = {
+  def insert(key: K, value: V) {
+    insertCount += 1
     val update: (Boolean, C) => C = (hadVal, oldVal) => {
       if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
     }
     currentMap.changeValue(key, update)
-    if (currentMap.estimateSize() > memoryThresholdMB * 1024 * 1024) {
+    if (insertCount % updateThresholdInterval == 1) {
+      updateSpillThreshold()
+    }
+    if (currentMap.estimateSize() > spillThreshold) {
       spill()
     }
   }
 
-  private def spill(): Unit = {
+  // TODO: differentiate ShuffleMapTask's from ResultTask's
+  private def updateSpillThreshold() {
+    val numRunningTasks = math.max(SparkEnv.get.numRunningTasks, 1)
+    spillThreshold = maxMemoryThreshold / numRunningTasks
+  }
+
+  private def spill() {
     spillCount += 1
-    logWarning(s"In-memory KV map exceeded threshold of $memoryThresholdMB MB!")
-    logWarning(s"Spilling to disk ($spillCount time"+(if (spillCount > 1) "s" else "")+"
so far)")
+    logWarning("In-memory map exceeded %s MB! Spilling to disk (%d time%s so far)"
+      .format(spillThreshold / (1024 * 1024), spillCount, if (spillCount > 1) "s" else
""))
     val (blockId, file) = diskBlockManager.createTempBlock()
     val writer =
       new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity, syncWrites)
@@ -100,7 +130,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
       writer.close()
     }
     currentMap = new SizeTrackingAppendOnlyMap[K, C]
-    spilledMaps.append(new DiskIterator(file))
+    spilledMaps.append(new DiskMapIterator(file))
   }
 
   override def iterator: Iterator[(K, C)] = {
@@ -228,7 +258,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
   }
 
   // Iterate through (K, C) pairs in sorted order from an on-disk map
-  private class DiskIterator(file: File) extends Iterator[(K, C)] {
+  private class DiskMapIterator(file: File) extends Iterator[(K, C)] {
     val fileStream = new FileInputStream(file)
     val bufferedStream = new FastBufferedInputStream(fileStream)
     val deserializeStream = ser.deserializeStream(bufferedStream)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4296d96c/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 6c93b1f..ef957bb 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -5,15 +5,13 @@ import scala.collection.mutable.ArrayBuffer
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
 import org.apache.spark._
-import org.apache.spark.SparkContext.rddToPairRDDFunctions
+import org.apache.spark.SparkContext._
 
 class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext
{
 
   override def beforeEach() {
     val conf = new SparkConf(false)
     conf.set("spark.shuffle.externalSorting", "true")
-    conf.set("spark.shuffle.buffer.mb", "1024")
-    conf.set("spark.shuffle.buffer.fraction", "0.8")
     sc = new SparkContext("local", "test", conf)
   }
 
@@ -27,14 +25,14 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter
with Local
     }
 
   test("simple insert") {
-    var map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+    val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
       mergeValue, mergeCombiners)
 
     // Single insert
     map.insert(1, 10)
     var it = map.iterator
     assert(it.hasNext)
-    var kv = it.next()
+    val kv = it.next()
     assert(kv._1 == 1 && kv._2 == ArrayBuffer[Int](10))
     assert(!it.hasNext)
 
@@ -59,7 +57,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with
Local
     map.insert(1, 100)
     map.insert(2, 200)
     map.insert(1, 1000)
-    var it = map.iterator
+    val it = map.iterator
     assert(it.hasNext)
     val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
     assert(result == Set[(Int, Set[Int])](
@@ -177,8 +175,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter
with Local
   }
 
   test("spilling") {
-    System.setProperty("spark.shuffle.buffer.mb", "1")
-    System.setProperty("spark.shuffle.buffer.fraction", "0.05")
+    // TODO: Figure out correct memory parameters to actually induce spilling
+    // System.setProperty("spark.shuffle.buffer.mb", "1")
+    // System.setProperty("spark.shuffle.buffer.fraction", "0.05")
 
     // reduceByKey - should spill exactly 6 times
     val rddA = sc.parallelize(0 until 10000).map(i => (i/2, i))
@@ -226,4 +225,6 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter
with Local
       }
     }
   }
+
+  // TODO: Test memory allocation for multiple concurrently running tasks
 }


Mime
View raw message