spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mln...@apache.org
Subject spark git commit: [SPARK-11968][MLLIB] Optimize MLLIB ALS recommendForAll
Date Tue, 09 May 2017 08:06:53 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 54e074349 -> 72fca9a0a


[SPARK-11968][MLLIB] Optimize MLLIB ALS recommendForAll

The recommendForAll of MLLIB ALS is very slow.
GC is a key problem of the current method.
The task use the following code to keep temp result:
val output = new Array[(Int, (Int, Double))](m*n)
m = n = 4096 (default value, no method to set)
so output is about 4k * 4k * (4 + 4 + 8) = 256M. This is a large memory and cause serious
GC problem, and it is frequently OOM.

Actually, we don't need to save all the temp result. Support we recommend topK (topK is about
10, or 20) product for each user, we only need 4k * topK * (4 + 4 + 8) memory to save the
temp result.

The Test Environment:
3 workers: each work 10 core, each work 30G memory, each work 1 executor.
The Data: User 480,000, and Item 17,000

BlockSize:     1024  2048  4096  8192
Old method:  245s  332s  488s  OOM
This solution: 121s  118s   117s  120s

The existing UT.

Author: Peng <peng.meng@intel.com>
Author: Peng Meng <peng.meng@intel.com>

Closes #17742 from mpjlu/OptimizeAls.

(cherry picked from commit 8079424763c2043264f30a6898ce964379bd9b56)
Signed-off-by: Nick Pentreath <nickp@za.ibm.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72fca9a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72fca9a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72fca9a0

Branch: refs/heads/branch-2.2
Commit: 72fca9a0a7a6dd2ab7c338fab9666b51cd981cce
Parents: 54e0743
Author: Peng <peng.meng@intel.com>
Authored: Tue May 9 10:05:49 2017 +0200
Committer: Nick Pentreath <nickp@za.ibm.com>
Committed: Tue May 9 10:08:23 2017 +0200

----------------------------------------------------------------------
 .../MatrixFactorizationModel.scala              | 81 ++++++++++++--------
 1 file changed, 50 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/72fca9a0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 23045fa..d45866c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -39,6 +39,7 @@ import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.BoundedPriorityQueue
 
 /**
  * Model representing the result of matrix factorization.
@@ -274,46 +275,64 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel]
{
       srcFeatures: RDD[(Int, Array[Double])],
       dstFeatures: RDD[(Int, Array[Double])],
       num: Int): RDD[(Int, Array[(Int, Double)])] = {
-    val srcBlocks = blockify(rank, srcFeatures)
-    val dstBlocks = blockify(rank, dstFeatures)
-    val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
-      case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
-        val m = srcIds.length
-        val n = dstIds.length
-        val ratings = srcFactors.transpose.multiply(dstFactors)
-        val output = new Array[(Int, (Int, Double))](m * n)
-        var k = 0
-        ratings.foreachActive { (i, j, r) =>
-          output(k) = (srcIds(i), (dstIds(j), r))
-          k += 1
+    val srcBlocks = blockify(srcFeatures)
+    val dstBlocks = blockify(dstFeatures)
+    /**
+     * The previous approach used for computing top-k recommendations aimed to group
+     * individual factor vectors into blocks, so that Level 3 BLAS operations (gemm) could
+     * be used for efficiency. However, this causes excessive GC pressure due to the large
+     * arrays required for intermediate result storage, as well as a high sensitivity to
the
+     * block size used.
+     * The following approach still groups factors into blocks, but instead computes the
+     * top-k elements per block, using a simple dot product (instead of gemm) and an efficient
+     * [[BoundedPriorityQueue]]. This avoids any large intermediate data structures and results
+     * in significantly reduced GC pressure as well as shuffle data, which far outweighs
+     * any cost incurred from not using Level 3 BLAS operations.
+     */
+    val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) =>
+      val m = srcIter.size
+      val n = math.min(dstIter.size, num)
+      val output = new Array[(Int, (Int, Double))](m * n)
+      var j = 0
+      val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2))
+      srcIter.foreach { case (srcId, srcFactor) =>
+        dstIter.foreach { case (dstId, dstFactor) =>
+          /*
+           * The below code is equivalent to
+           *    `val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1)`
+           * This handwritten version is as or more efficient as BLAS calls in this case.
+           */
+          var score: Double = 0
+          var k = 0
+          while (k < rank) {
+            score += srcFactor(k) * dstFactor(k)
+            k += 1
+          }
+          pq += dstId -> score
+        }
+        val pqIter = pq.iterator
+        var i = 0
+        while (i < n) {
+          output(j + i) = (srcId, pqIter.next())
+          i += 1
         }
-        output.toSeq
+        j += n
+        pq.clear()
+      }
+      output.toSeq
     }
     ratings.topByKey(num)(Ordering.by(_._2))
   }
 
   /**
-   * Blockifies features to use Level-3 BLAS.
+   * Blockifies features to improve the efficiency of cartesian product
+   * TODO: SPARK-20443 - expose blockSize as a param?
    */
   private def blockify(
-      rank: Int,
-      features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = {
-    val blockSize = 4096 // TODO: tune the block size
-    val blockStorage = rank * blockSize
+      features: RDD[(Int, Array[Double])],
+      blockSize: Int = 4096): RDD[Seq[(Int, Array[Double])]] = {
     features.mapPartitions { iter =>
-      iter.grouped(blockSize).map { grouped =>
-        val ids = mutable.ArrayBuilder.make[Int]
-        ids.sizeHint(blockSize)
-        val factors = mutable.ArrayBuilder.make[Double]
-        factors.sizeHint(blockStorage)
-        var i = 0
-        grouped.foreach { case (id, factor) =>
-          ids += id
-          factors ++= factor
-          i += 1
-        }
-        (ids.result(), new DenseMatrix(rank, i, factors.result()))
-      }
+      iter.grouped(blockSize)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message