spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject spark git commit: [SPARK-4797] Replace breezeSquaredDistance
Date Wed, 31 Dec 2014 19:50:55 GMT
Repository: spark
Updated Branches:
  refs/heads/master 352ed6bbe -> 06a9aa589


[SPARK-4797] Replace breezeSquaredDistance

This PR replaces slow breezeSquaredDistance.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #3643 from viirya/faster_squareddistance and squashes the following commits:

f28b275 [Liang-Chi Hsieh] Move the implementation to linalg.Vectors and rename as sqdist.
0bc48ee [Liang-Chi Hsieh] Merge branch 'master' into faster_squareddistance
ba34422 [Liang-Chi Hsieh] Fix bug.
91849d0 [Liang-Chi Hsieh] Modified for comment.
44a65ad [Liang-Chi Hsieh] Modified for comments.
35db395 [Liang-Chi Hsieh] Fix bug and some modifications for comments.
f4f5ebb [Liang-Chi Hsieh] Follow BLAS.dot pattern to replace intersect, diff with while-loop.
a36e09f [Liang-Chi Hsieh] Use while-loop to replace foreach for better performance.
d3e0628 [Liang-Chi Hsieh] Make the methods private.
dd415bc [Liang-Chi Hsieh] Consider different cases of SparseVector and DenseVector.
13669db [Liang-Chi Hsieh] Replace breezeSquaredDistance.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06a9aa58
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06a9aa58
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06a9aa58

Branch: refs/heads/master
Commit: 06a9aa589c518a40a3c7cc201e89d75af77ab93e
Parents: 352ed6b
Author: Liang-Chi Hsieh <viirya@gmail.com>
Authored: Wed Dec 31 11:50:53 2014 -0800
Committer: Xiangrui Meng <meng@databricks.com>
Committed: Wed Dec 31 11:50:53 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/mllib/linalg/Vectors.scala | 80 ++++++++++++++++++++
 .../org/apache/spark/mllib/util/MLUtils.scala   | 13 ++--
 .../apache/spark/mllib/util/MLUtilsSuite.scala  | 15 ++++
 3 files changed, 100 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/06a9aa58/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 01f3f90..6a782b0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -312,6 +312,86 @@ object Vectors {
       math.pow(sum, 1.0 / p)
     }
   }
+ 
+  /**
+   * Returns the squared distance between two Vectors.
+   * @param v1 first Vector.
+   * @param v2 second Vector.
+   * @return squared distance between two Vectors.
+   */
+  def sqdist(v1: Vector, v2: Vector): Double = {
+    var squaredDistance = 0.0
+    (v1, v2) match { 
+      case (v1: SparseVector, v2: SparseVector) =>
+        val v1Values = v1.values
+        val v1Indices = v1.indices
+        val v2Values = v2.values
+        val v2Indices = v2.indices
+        val nnzv1 = v1Indices.size
+        val nnzv2 = v2Indices.size
+        
+        var kv1 = 0
+        var kv2 = 0
+        while (kv1 < nnzv1 || kv2 < nnzv2) {
+          var score = 0.0
+ 
+          if (kv2 >= nnzv2 || (kv1 < nnzv1 && v1Indices(kv1) < v2Indices(kv2)))
{
+            score = v1Values(kv1)
+            kv1 += 1
+          } else if (kv1 >= nnzv1 || (kv2 < nnzv2 && v2Indices(kv2) < v1Indices(kv1)))
{
+            score = v2Values(kv2)
+            kv2 += 1
+          } else {
+            score = v1Values(kv1) - v2Values(kv2)
+            kv1 += 1
+            kv2 += 1
+          }
+          squaredDistance += score * score
+        }
+
+      case (v1: SparseVector, v2: DenseVector) if v1.indices.length / v1.size < 0.5 =>
+        squaredDistance = sqdist(v1, v2)
+
+      case (v1: DenseVector, v2: SparseVector) if v2.indices.length / v2.size < 0.5 =>
+        squaredDistance = sqdist(v2, v1)
+
+      // When a SparseVector is approximately dense, we treat it as a DenseVector
+      case (v1, v2) =>
+        squaredDistance = v1.toArray.zip(v2.toArray).foldLeft(0.0){ (distance, elems) =>
+          val score = elems._1 - elems._2
+          distance + score * score
+        }
+    }
+    squaredDistance
+  }
+
+  /**
+   * Returns the squared distance between DenseVector and SparseVector.
+   */
+  private[mllib] def sqdist(v1: SparseVector, v2: DenseVector): Double = {
+    var kv1 = 0
+    var kv2 = 0
+    val indices = v1.indices
+    var squaredDistance = 0.0
+    var iv1 = indices(kv1)
+    val nnzv2 = v2.size
+   
+    while (kv2 < nnzv2) {
+      var score = 0.0
+      if (kv2 != iv1) {
+        score = v2(kv2)
+      } else {
+        score = v1.values(kv1) - v2(kv2)
+        if (kv1 < indices.length - 1) {
+          kv1 += 1
+          iv1 = indices(kv1)
+        }
+      }
+      squaredDistance += score * score
+      kv2 += 1
+    }
+    squaredDistance
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/06a9aa58/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index da0da0a..c784346 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -19,8 +19,7 @@ package org.apache.spark.mllib.util
 
 import scala.reflect.ClassTag
 
-import breeze.linalg.{DenseVector => BDV, SparseVector => BSV,
-  squaredDistance => breezeSquaredDistance}
+import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.SparkContext
@@ -28,7 +27,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.PartitionwiseSampledRDD
 import org.apache.spark.util.random.BernoulliCellSampler
 import org.apache.spark.mllib.regression.LabeledPoint
-import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
+import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector, Vectors}
 import org.apache.spark.mllib.linalg.BLAS.dot
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
@@ -266,7 +265,7 @@ object MLUtils {
     }
     Vectors.fromBreeze(vector1)
   }
-
+ 
   /**
    * Returns the squared Euclidean distance between two vectors. The following formula will
be used
    * if it does not introduce too much numerical error:
@@ -316,12 +315,10 @@ object MLUtils {
       val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dotValue)) /
         (sqDist + EPSILON)
       if (precisionBound2 > precision) {
-        // TODO: breezeSquaredDistance is slow,
-        // so we should replace it with our own implementation.
-        sqDist = breezeSquaredDistance(v1.toBreeze, v2.toBreeze)
+        sqDist = Vectors.sqdist(v1, v2)
       }
     } else {
-      sqDist = breezeSquaredDistance(v1.toBreeze, v2.toBreeze)
+      sqDist = Vectors.sqdist(v1, v2)
     }
     sqDist
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/06a9aa58/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index df07987..7778847 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -52,12 +52,27 @@ class MLUtilsSuite extends FunSuite with MLlibTestSparkContext {
       val values = indices.map(i => a(i))
       val v2 = Vectors.sparse(n, indices, values)
       val norm2 = Vectors.norm(v2, 2.0)
+      val v3 = Vectors.sparse(n, indices, indices.map(i => a(i) + 0.5))
+      val norm3 = Vectors.norm(v3, 2.0)
       val squaredDist = breezeSquaredDistance(v1.toBreeze, v2.toBreeze)
       val fastSquaredDist1 = fastSquaredDistance(v1, norm1, v2, norm2, precision)
       assert((fastSquaredDist1 - squaredDist) <= precision * squaredDist, s"failed with
m = $m")
       val fastSquaredDist2 =
         fastSquaredDistance(v1, norm1, Vectors.dense(v2.toArray), norm2, precision)
       assert((fastSquaredDist2 - squaredDist) <= precision * squaredDist, s"failed with
m = $m")
+      val squaredDist2 = breezeSquaredDistance(v2.toBreeze, v3.toBreeze)
+      val fastSquaredDist3 =
+        fastSquaredDistance(v2, norm2, v3, norm3, precision)
+      assert((fastSquaredDist3 - squaredDist2) <= precision * squaredDist2, s"failed with
m = $m")
+      if (m > 10) { 
+        val v4 = Vectors.sparse(n, indices.slice(0, m - 10),
+          indices.map(i => a(i) + 0.5).slice(0, m - 10))
+        val norm4 = Vectors.norm(v4, 2.0)
+        val squaredDist = breezeSquaredDistance(v2.toBreeze, v4.toBreeze)
+        val fastSquaredDist =
+          fastSquaredDistance(v2, norm2, v4, norm4, precision)
+        assert((fastSquaredDist - squaredDist) <= precision * squaredDist, s"failed with
m = $m")
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message