spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject spark git commit: [SPARK-12869] Implemented an improved version of the toIndexedRowMatrix
Date Fri, 15 Apr 2016 00:32:24 GMT
Repository: spark
Updated Branches:
  refs/heads/master 01dd1f5c0 -> c80586d9e


[SPARK-12869] Implemented an improved version of the toIndexedRowMatrix

Hi guys,

I've implemented an improved version of the `toIndexedRowMatrix` function on the `BlockMatrix`.
I needed this for a project, but would like to share it with the rest of the community. In
the case of dense matrices, it can increase performance up to 19 times:
https://github.com/Fokko/BlockMatrixToIndexedRowMatrix

If there are any questions or suggestions, please let me know. Keep up the good work! Cheers.

Author: Fokko Driesprong <f.driesprong@catawiki.nl>
Author: Fokko Driesprong <fokko@driesprongen.nl>

Closes #10839 from Fokko/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c80586d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c80586d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c80586d9

Branch: refs/heads/master
Commit: c80586d9e820d19fc328b3e4c6f1c1439f5583a7
Parents: 01dd1f5
Author: Fokko Driesprong <f.driesprong@catawiki.nl>
Authored: Thu Apr 14 17:32:20 2016 -0700
Committer: Xiangrui Meng <meng@databricks.com>
Committed: Thu Apr 14 17:32:20 2016 -0700

----------------------------------------------------------------------
 .../mllib/linalg/distributed/BlockMatrix.scala  | 34 ++++++++++++++++----
 .../linalg/distributed/BlockMatrixSuite.scala   | 31 ++++++++++++++++--
 2 files changed, 57 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c80586d9/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
index 89c332a..580d7a9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -19,12 +19,12 @@ package org.apache.spark.mllib.linalg.distributed
 
 import scala.collection.mutable.ArrayBuffer
 
-import breeze.linalg.{DenseMatrix => BDM, Matrix => BM}
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM, SparseVector
=> BSV, Vector => BV}
 
 import org.apache.spark.{Partitioner, SparkException}
 import org.apache.spark.annotation.Since
 import org.apache.spark.internal.Logging
-import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix, SparseMatrix}
+import org.apache.spark.mllib.linalg._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
@@ -264,13 +264,35 @@ class BlockMatrix @Since("1.3.0") (
     new CoordinateMatrix(entryRDD, numRows(), numCols())
   }
 
+
   /** Converts to IndexedRowMatrix. The number of columns must be within the integer range.
*/
   @Since("1.3.0")
   def toIndexedRowMatrix(): IndexedRowMatrix = {
-    require(numCols() < Int.MaxValue, "The number of columns must be within the integer
range. " +
-      s"numCols: ${numCols()}")
-    // TODO: This implementation may be optimized
-    toCoordinateMatrix().toIndexedRowMatrix()
+    val cols = numCols().toInt
+
+    require(cols < Int.MaxValue, s"The number of columns should be less than Int.MaxValue
($cols).")
+
+    val rows = blocks.flatMap { case ((blockRowIdx, blockColIdx), mat) =>
+      mat.rowIter.zipWithIndex.map {
+        case (vector, rowIdx) =>
+          blockRowIdx * rowsPerBlock + rowIdx -> (blockColIdx, vector.toBreeze)
+      }
+    }.groupByKey().map { case (rowIdx, vectors) =>
+      val numberNonZeroPerRow = vectors.map(_._2.activeSize).sum.toDouble / cols.toDouble
+
+      val wholeVector = if (numberNonZeroPerRow <= 0.1) { // Sparse at 1/10th nnz
+        BSV.zeros[Double](cols)
+      } else {
+        BDV.zeros[Double](cols)
+      }
+
+      vectors.foreach { case (blockColIdx: Int, vec: BV[Double]) =>
+        val offset = colsPerBlock * blockColIdx
+        wholeVector(offset until offset + colsPerBlock) := vec
+      }
+      new IndexedRow(rowIdx, Vectors.fromBreeze(wholeVector))
+    }
+    new IndexedRowMatrix(rows)
   }
 
   /** Collect the distributed matrix on the driver as a `DenseMatrix`. */

http://git-wip-us.apache.org/repos/asf/spark/blob/c80586d9/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
index f737d2c..f37eaf2 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
@@ -19,10 +19,10 @@ package org.apache.spark.mllib.linalg.distributed
 
 import java.{util => ju}
 
-import breeze.linalg.{DenseMatrix => BDM}
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
 
 import org.apache.spark.{SparkException, SparkFunSuite}
-import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix, SparseMatrix}
+import org.apache.spark.mllib.linalg.{DenseMatrix, DenseVector, Matrices, Matrix, SparseMatrix,
SparseVector, Vectors}
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.mllib.util.TestingUtils._
 
@@ -134,6 +134,33 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext
{
     assert(rowMat.numRows() === m)
     assert(rowMat.numCols() === n)
     assert(rowMat.toBreeze() === gridBasedMat.toBreeze())
+
+    val rows = 1
+    val cols = 10
+
+    val matDense = new DenseMatrix(rows, cols,
+      Array(1.0, 1.0, 3.0, 2.0, 5.0, 6.0, 7.0, 1.0, 2.0, 3.0))
+    val matSparse = new SparseMatrix(rows, cols,
+      Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1), Array(0), Array(1.0))
+
+    val vectors: Seq[((Int, Int), Matrix)] = Seq(
+      ((0, 0), matDense),
+      ((1, 0), matSparse))
+
+    val rdd = sc.parallelize(vectors)
+    val B = new BlockMatrix(rdd, rows, cols)
+
+    val C = B.toIndexedRowMatrix.rows.collect
+
+    (C(0).vector.toBreeze, C(1).vector.toBreeze) match {
+      case (denseVector: BDV[Double], sparseVector: BSV[Double]) =>
+        assert(denseVector.length === sparseVector.length)
+
+        assert(matDense.toArray === denseVector.toArray)
+        assert(matSparse.toArray === sparseVector.toArray)
+      case _ =>
+        throw new RuntimeException("IndexedRow returns vectors of unexpected type")
+    }
   }
 
   test("toBreeze and toLocalMatrix") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message