spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkbrad...@apache.org
Subject spark git commit: [SPARK-15740][MLLIB] Word2VecSuite "big model load / save" caused OOM in maven jenkins builds
Date Wed, 06 Jul 2016 19:56:56 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 2465f0728 -> d7926da5e


[SPARK-15740][MLLIB] Word2VecSuite "big model load / save" caused OOM in maven jenkins builds

## What changes were proposed in this pull request?
"test big model load / save" in Word2VecSuite, lately resulted into OOM.
Therefore we decided to make the partitioning adaptive (not based on spark default "spark.kryoserializer.buffer.max"
conf) and then testing it using a small buffer size in order to trigger partitioning without
allocating too much memory for the test.

## How was this patch tested?
It was tested running the following unit test:
org.apache.spark.mllib.feature.Word2VecSuite

Author: tmnd1991 <antonio.murgia2@studio.unibo.it>

Closes #13509 from tmnd1991/SPARK-15740.

(cherry picked from commit 040f6f9f468f153e4c4db78c26ced0299245fb6f)
Signed-off-by: Joseph K. Bradley <joseph@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7926da5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7926da5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7926da5

Branch: refs/heads/branch-2.0
Commit: d7926da5e72ee2015e3ebe39a5fd0b322e9d1334
Parents: 2465f07
Author: tmnd1991 <antonio.murgia2@studio.unibo.it>
Authored: Wed Jul 6 12:56:26 2016 -0700
Committer: Joseph K. Bradley <joseph@databricks.com>
Committed: Wed Jul 6 12:56:51 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/mllib/feature/Word2Vec.scala   | 16 +++++++------
 .../spark/mllib/feature/Word2VecSuite.scala     | 25 +++++++++++++++++---
 2 files changed, 31 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d7926da5/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index 2f52825..f2211df 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -629,14 +629,16 @@ object Word2VecModel extends Loader[Word2VecModel] {
         ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords)))
       sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
 
-      // We want to partition the model in partitions of size 32MB
-      val partitionSize = (1L << 25)
+      // We want to partition the model in partitions smaller than
+      // spark.kryoserializer.buffer.max
+      val bufferSize = Utils.byteStringAsBytes(
+        spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
       // We calculate the approximate size of the model
-      // We only calculate the array size, not considering
-      // the string size, the formula is:
-      // floatSize * numWords * vectorSize
-      val approxSize = 4L * numWords * vectorSize
-      val nPartitions = ((approxSize / partitionSize) + 1).toInt
+      // We only calculate the array size, considering an
+      // average string size of 15 bytes, the formula is:
+      // (floatSize * vectorSize + 15) * numWords
+      val approxSize = (4L * vectorSize + 15) * numWords
+      val nPartitions = ((approxSize / bufferSize) + 1).toInt
       val dataArray = model.toSeq.map { case (w, v) => Data(w, v) }
       spark.createDataFrame(dataArray).repartition(nPartitions).write.parquet(Loader.dataPath(path))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7926da5/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
index c9fb976..22de4c4 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
@@ -91,11 +91,23 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
 
   }
 
-  ignore("big model load / save") {
-    // create a model bigger than 32MB since 9000 * 1000 * 4 > 2^25
-    val word2VecMap = Map((0 to 9000).map(i => s"$i" -> Array.fill(1000)(0.1f)): _*)
+  test("big model load / save") {
+    // backupping old values
+    val oldBufferConfValue = spark.conf.get("spark.kryoserializer.buffer.max", "64m")
+    val oldBufferMaxConfValue = spark.conf.get("spark.kryoserializer.buffer", "64k")
+
+    // setting test values to trigger partitioning
+    spark.conf.set("spark.kryoserializer.buffer", "50b")
+    spark.conf.set("spark.kryoserializer.buffer.max", "50b")
+
+    // create a model bigger than 50 Bytes
+    val word2VecMap = Map((0 to 10).map(i => s"$i" -> Array.fill(10)(0.1f)): _*)
     val model = new Word2VecModel(word2VecMap)
 
+    // est. size of this model, given the formula:
+    // (floatSize * vectorSize + 15) * numWords
+    // (4 * 10 + 15) * 10 = 550
+    // therefore it should generate multiple partitions
     val tempDir = Utils.createTempDir()
     val path = tempDir.toURI.toString
 
@@ -103,9 +115,16 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext
{
       model.save(sc, path)
       val sameModel = Word2VecModel.load(sc, path)
       assert(sameModel.getVectors.mapValues(_.toSeq) === model.getVectors.mapValues(_.toSeq))
+    }
+    catch {
+      case t: Throwable => fail("exception thrown persisting a model " +
+        "that spans over multiple partitions", t)
     } finally {
       Utils.deleteRecursively(tempDir)
+      spark.conf.set("spark.kryoserializer.buffer", oldBufferConfValue)
+      spark.conf.set("spark.kryoserializer.buffer.max", oldBufferMaxConfValue)
     }
+
   }
 
   test("test similarity for word vectors with large values is not Infinity or NaN") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message