mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlyubi...@apache.org
Subject git commit: Revert "MAHOUT-1566: (Experimental) Regular ALS factorizer with conversion tests, optimizer enhancements and bug fixes."
Date Wed, 04 Jun 2014 21:28:35 GMT
Repository: mahout
Updated Branches:
  refs/heads/master 656878ea6 -> 26d3db6ba


Revert "MAHOUT-1566: (Experimental) Regular ALS factorizer with conversion tests, optimizer enhancements and bug fixes."

This reverts commit 656878ea69adf7663015156b55ad5c9ea3ec7e4f. (somehow went partially only).


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/26d3db6b
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/26d3db6b
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/26d3db6b

Branch: refs/heads/master
Commit: 26d3db6ba1f12870af996ef9eaea17dfc4a8b8fb
Parents: 656878e
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Authored: Wed Jun 4 14:27:37 2014 -0700
Committer: Dmitriy Lyubimov <dlyubimov@apache.org>
Committed: Wed Jun 4 14:27:37 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 -
 .../mahout/math/drm/CheckpointedOps.scala       |   3 +-
 .../mahout/math/drm/DistributedEngine.scala     |  11 +-
 .../org/apache/mahout/math/drm/DrmLike.scala    |   2 +-
 .../org/apache/mahout/math/drm/DrmLikeOps.scala |   9 +-
 .../apache/mahout/math/drm/RLikeDrmOps.scala    |   9 +-
 .../mahout/math/drm/decompositions/DQR.scala    |  57 ++++++
 .../mahout/math/drm/decompositions/DSPCA.scala  | 153 ++++++++++++++++
 .../mahout/math/drm/decompositions/DSSVD.scala  |  83 +++++++++
 .../apache/mahout/math/drm/logical/OpAewB.scala |   9 +-
 .../mahout/math/drm/logical/OpMapBlock.scala    |   8 +-
 .../org/apache/mahout/math/drm/package.scala    |  79 +++++----
 .../mahout/math/scalabindings/MatrixOps.scala   |   2 -
 .../math/scalabindings/RLikeMatrixOps.scala     |  12 +-
 .../scalabindings/decompositions/SSVD.scala     | 165 +++++++++++++++++
 .../mahout/math/scalabindings/package.scala     |  21 +++
 .../mahout/math/scalabindings/MathSuite.scala   |  76 ++++++++
 .../mahout/sparkbindings/SparkEngine.scala      |  35 ++--
 .../apache/mahout/sparkbindings/blas/ABt.scala  |  35 ++--
 .../apache/mahout/sparkbindings/blas/AewB.scala | 159 +++++------------
 .../drm/CheckpointedDrmSpark.scala              |   8 +-
 .../mahout/sparkbindings/drm/package.scala      |  28 +--
 .../apache/mahout/sparkbindings/package.scala   |   4 +-
 .../mahout/sparkbindings/blas/ABtSuite.scala    |   6 +-
 .../mahout/sparkbindings/blas/AewBSuite.scala   |  16 +-
 .../decompositions/MathSuite.scala              | 176 +++++++++++++++++++
 .../sparkbindings/drm/RLikeDrmOpsSuite.scala    |  37 ----
 .../sparkbindings/test/MahoutLocalContext.scala |   2 +-
 28 files changed, 887 insertions(+), 320 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 3b2e61b..bea124a 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,8 +2,6 @@ Mahout Change Log
 
 Release 1.0 - unreleased
 
-  MAHOUT-1566: (Experimental) Regular ALS factorizer with conversion tests, optimizer enhancements and bug fixes (dlyubimov)
-
   MAHOUT-1537: Minor fixes to spark-shell (Anand Avati via dlyubimov)
 
   MAHOUT-1529: Finalize abstraction of distributed logical plans from backend operations (dlyubimov)

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
index edd0cfc..fa1ccfd 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
@@ -18,7 +18,7 @@
 package org.apache.mahout.math.drm
 
 import scala.reflect.ClassTag
-import org.apache.mahout.math._
+import org.apache.mahout.math.Vector
 
 
 /**
@@ -35,6 +35,5 @@ class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
   /** Column Means */
   def colMeans(): Vector = drm.context.colMeans(drm)
 
-  def norm():Double = drm.context.norm(drm)
 }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index 5ffee9d..0e76d87 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -19,11 +19,10 @@ package org.apache.mahout.math.drm
 
 import scala.reflect.ClassTag
 import logical._
-import org.apache.mahout.math._
-import scalabindings._
-import RLikeOps._
+import org.apache.mahout.math.{Matrix, Vector}
 import DistributedEngine._
 import org.apache.mahout.math.scalabindings._
+import RLikeOps._
 
 /** Abstraction of optimizer/distributed engine */
 trait DistributedEngine {
@@ -44,12 +43,10 @@ trait DistributedEngine {
   def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K]
 
   /** Engine-specific colSums implementation based on a checkpoint. */
-  def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector
+  def colSums[K:ClassTag](drm:CheckpointedDrm[K]):Vector
 
   /** Engine-specific colMeans implementation based on a checkpoint. */
-  def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector
-
-  def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double
+  def colMeans[K:ClassTag](drm:CheckpointedDrm[K]):Vector
 
   /** Broadcast support */
   def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector]

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
index 7fbfc12..8e0db1e 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
@@ -28,7 +28,7 @@ package org.apache.mahout.math.drm
  */
 trait DrmLike[K] {
 
-  protected[mahout] def partitioningTag: Long
+  protected[mahout] def partitioningTag:Long
 
   protected[mahout] val context:DistributedContext
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
index 328805a..35e28af 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
@@ -36,14 +36,9 @@ class DrmLikeOps[K : ClassTag](protected[drm] val drm: DrmLike[K]) {
    * @tparam R
    * @return
    */
-  def mapBlock[R: ClassTag](ncol: Int = -1, identicallyParitioned: Boolean = true)
+  def mapBlock[R : ClassTag](ncol: Int = -1)
       (bmf: BlockMapFunc[K, R]): DrmLike[R] =
-    new OpMapBlock[K, R](
-      A = drm,
-      bmf = bmf,
-      _ncol = ncol,
-      identicallyPartitioned = identicallyParitioned
-    )
+    new OpMapBlock[K, R](A = drm, bmf = bmf, _ncol = ncol)
 
 
   /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
index 7ac5577..f46d15c 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -25,13 +25,13 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
 
   import RLikeDrmOps._
 
-  def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "+")
+  def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '+')
 
-  def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "-")
+  def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '-')
 
-  def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "*")
+  def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '*')
 
-  def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "/")
+  def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '/')
 
   def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+")
 
@@ -70,6 +70,7 @@ class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
 
   def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm)
 
+
 }
 
 object RLikeDrmOps {

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
new file mode 100644
index 0000000..34ae345
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
@@ -0,0 +1,57 @@
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.log4j.Logger
+
+object DQR {
+
+  private val log = Logger.getLogger(DQR.getClass)
+
+  /**
+   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+   * controlled (<5000 or so). <P>
+   *
+   * It is recommended to checkpoint A since it does two passes over it. <P>
+   *
+   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+   * their RDD should be able to zip successfully.
+   */
+  def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
+
+    if (A.ncol > 5000)
+      log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
+
+    implicit val ctx = A.context
+
+    val AtA = (A.t %*% A).checkpoint()
+    val inCoreAtA = AtA.collect
+
+    if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
+
+    val ch = chol(inCoreAtA)
+    val inCoreR = (ch.getL cloned) t
+
+    if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
+
+    if (checkRankDeficiency && !ch.isPositiveDefinite)
+      throw new IllegalArgumentException("R is rank-deficient.")
+
+    val bcastAtA = drmBroadcast(inCoreAtA)
+
+    // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
+    // decompose A'A in the backend again.
+
+    // Compute Q = A*inv(L') -- we can do it blockwise.
+    val Q = A.mapBlock() {
+      case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
+    }
+
+    Q -> inCoreR
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
new file mode 100644
index 0000000..9e33416
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSPCA {
+
+  /**
+   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+   * document of the MAHOUT-817.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations (hint: use either 0 or 1)
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = {
+
+    val drmA = A.checkpoint()
+    implicit val ctx = A.context
+
+    val m = drmA.nrow
+    val n = drmA.ncol
+    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+    val pfxed = safeToNonNegInt((m min n) - k min p)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    // Dataset mean
+    val xi = drmA.colMeans
+
+    // We represent Omega by its seed.
+    val omegaSeed = RandomUtils.getRandom().nextInt()
+    val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
+
+    // This done in front in a single-threaded fashion for now. Even though it doesn't require any
+    // memory beyond that is required to keep xi around, it still might be parallelized to backs
+    // for significantly big n and r. TODO
+    val s_o = omega.t %*% xi
+
+    val bcastS_o = drmBroadcast(s_o)
+    val bcastXi = drmBroadcast(xi)
+
+    var drmY = drmA.mapBlock(ncol = r) {
+      case (keys, blockA) =>
+        val s_o:Vector = bcastS_o
+        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+        for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
+        keys -> blockY
+    }
+        // Checkpoint Y
+        .checkpoint()
+
+    var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+    var s_q = drmQ.colSums()
+    var bcastVarS_q = drmBroadcast(s_q)
+
+    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+    // still be identically partitioned.
+    var drmBt = (drmA.t %*% drmQ).checkpoint()
+
+    var s_b = (drmBt.t %*% xi).collect(::, 0)
+    var bcastVarS_b = drmBroadcast(s_b)
+
+    for (i <- 0 until q) {
+
+      // These closures don't seem to live well with outside-scope vars. This doesn't record closure
+      // attributes correctly. So we create additional set of vals for broadcast vars to properly 
+      // create readonly closure attributes in this very scope.
+      val bcastS_q = bcastVarS_q
+      val bcastS_b = bcastVarS_b
+      val bcastXib = bcastXi
+
+      // Fix Bt as B' -= xi cross s_q
+      drmBt = drmBt.mapBlock() {
+        case (keys, block) =>
+          val s_q: Vector = bcastS_q
+          val xi: Vector = bcastXib
+          keys.zipWithIndex.foreach {
+            case (key, idx) => block(idx, ::) -= s_q * xi(key)
+          }
+          keys -> block
+      }
+
+      drmY.uncache()
+      drmQ.uncache()
+
+      drmY = (drmA %*% drmBt)
+          // Fix Y by subtracting s_b from each row of the AB'
+          .mapBlock() {
+        case (keys, block) =>
+          val s_b: Vector = bcastS_b
+          for (row <- 0 until block.nrow) block(row, ::) -= s_b
+          keys -> block
+      }
+          // Checkpoint Y
+          .checkpoint()
+
+      drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+      s_q = drmQ.colSums()
+      bcastVarS_q = drmBroadcast(s_q)
+
+      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+      // identically partitioned anymore.
+      drmBt = (drmA.t %*% drmQ).checkpoint()
+
+      s_b = (drmBt.t %*% xi).collect(::, 0)
+      bcastVarS_b = drmBroadcast(s_b)
+    }
+
+    val c = s_q cross s_b
+    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
+        c - c.t + (s_q cross s_q) * (xi dot xi)
+    val (inCoreUHat, d) = eigen(inCoreBBt)
+    val s = d.sqrt
+
+    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+    val drmU = drmQ %*% inCoreUHat
+    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
new file mode 100644
index 0000000..0da9ec7
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
@@ -0,0 +1,83 @@
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSSVD {
+
+  /**
+   * Distributed Stochastic Singular Value decomposition algorithm.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = {
+
+    val drmA = A.checkpoint()
+
+    val m = drmA.nrow
+    val n = drmA.ncol
+    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+    val pfxed = safeToNonNegInt((m min n) - k min p)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    // We represent Omega by its seed.
+    val omegaSeed = RandomUtils.getRandom().nextInt()
+
+    // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
+    // instantiate the Omega random matrix view in the backend instead. That way serialized closure
+    // is much more compact.
+    var drmY = drmA.mapBlock(ncol = r) {
+      case (keys, blockA) =>
+        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+        keys -> blockY
+    }
+
+    var drmQ = dqrThin(drmY.checkpoint())._1
+    // Checkpoint Q if last iteration
+    if (q == 0) drmQ = drmQ.checkpoint()
+
+    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+    // still be identically partitioned.
+    var drmBt = drmA.t %*% drmQ
+    // Checkpoint B' if last iteration
+    if (q == 0) drmBt = drmBt.checkpoint()
+
+    for (i <- 0  until q) {
+      drmY = drmA %*% drmBt
+      drmQ = dqrThin(drmY.checkpoint())._1
+      // Checkpoint Q if last iteration
+      if (i == q - 1) drmQ = drmQ.checkpoint()
+
+      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+      // identically partitioned anymore.
+      drmBt = drmA.t %*% drmQ
+      // Checkpoint B' if last iteration
+      if (i == q - 1) drmBt = drmBt.checkpoint()
+    }
+
+    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect
+    val (inCoreUHat, d) = eigen(inCoreBBt)
+    val s = d.sqrt
+
+    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+    val drmU = drmQ %*% inCoreUHat
+    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
index da7c0d5..d07172a 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
@@ -19,24 +19,17 @@ package org.apache.mahout.math.drm.logical
 
 import scala.reflect.ClassTag
 import org.apache.mahout.math.drm.DrmLike
-import scala.util.Random
 
 /** DRM elementwise operator */
 case class OpAewB[K: ClassTag](
     override var A: DrmLike[K],
     override var B: DrmLike[K],
-    val op: String
+    val op: Char
     ) extends AbstractBinaryOp[K, K, K] {
 
-
-
   assert(A.ncol == B.ncol, "arguments must have same number of columns")
   assert(A.nrow == B.nrow, "arguments must have same number of rows")
 
-  override protected[mahout] lazy val partitioningTag: Long =
-    if (A.partitioningTag == B.partitioningTag) A.partitioningTag
-    else Random.nextLong()
-
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
index 7299d9e..8e4362d 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
@@ -21,18 +21,16 @@ import scala.reflect.ClassTag
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm.{BlockMapFunc, DrmLike}
-import scala.util.Random
 
 class OpMapBlock[S: ClassTag, R: ClassTag](
     override var A: DrmLike[S],
     val bmf: BlockMapFunc[S, R],
     val _ncol: Int = -1,
-    val _nrow: Long = -1,
-    identicallyPartitioned:Boolean
+    val _nrow: Long = -1
     ) extends AbstractUnaryOp[S, R] {
 
-  override protected[mahout] lazy val partitioningTag: Long =
-    if (identicallyPartitioned) A.partitioningTag else Random.nextLong()
+
+  override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
 
   /** R-like syntax for number of rows. */
   def nrow: Long = if (_nrow >= 0) _nrow else A.nrow

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index 02e8b7a..768bb1c 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -18,9 +18,7 @@
 package org.apache.mahout.math
 
 import scala.reflect.ClassTag
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.decompositions.{DSSVD, DSPCA, DQR}
+import org.apache.mahout.math.drm.decompositions.{DSPCA, DSSVD, DQR}
 
 package object drm {
 
@@ -72,45 +70,56 @@ package object drm {
       (implicit ctx: DistributedContext): CheckpointedDrm[Long] = ctx.drmParallelizeEmptyLong(nrow, ncol, numPartitions)
 
   /** Implicit broadcast -> value conversion. */
-  implicit def bcast2val[T](bcast: BCast[T]): T = bcast.value
+  implicit def bcast2val[T](bcast:BCast[T]):T = bcast.value
 
   /** Just throw all engine operations into context as well. */
-  implicit def ctx2engine(ctx: DistributedContext): DistributedEngine = ctx.engine
+  implicit def ctx2engine(ctx:DistributedContext):DistributedEngine = ctx.engine
 
   implicit def drm2drmCpOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedOps[K] =
     new CheckpointedOps[K](drm)
 
+  implicit def drm2Checkpointed[K](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint()
+
+  // ============== Decompositions ===================
+
   /**
-   * We assume that whenever computational action is invoked without explicit checkpoint, the user
-   * doesn't imply caching
+   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+   * controlled (<5000 or so). <P>
+   *
+   * It is recommended to checkpoint A since it does two passes over it. <P>
+   *
+   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+   * their RDD should be able to zip successfully.
    */
-  implicit def drm2Checkpointed[K: ClassTag](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint(CacheHint.NONE)
-
-  /** Implicit conversion to in-core with NONE caching of the result. */
-  implicit def drm2InCore[K: ClassTag](drm: DrmLike[K]): Matrix = drm.collect
-
-  /** Do vertical concatenation of collection of blockified tuples */
-  def rbind[K: ClassTag](blocks: Iterable[BlockifiedDrmTuple[K]]): BlockifiedDrmTuple[K] = {
-    assert(blocks.nonEmpty, "rbind: 0 blocks passed in")
-    if (blocks.size == 1) {
-      // No coalescing required.
-      blocks.head
-    } else {
-      // compute total number of rows in a new block
-      val m = blocks.view.map(_._2.nrow).sum
-      val n = blocks.head._2.ncol
-      val coalescedBlock = blocks.head._2.like(m, n)
-      val coalescedKeys = new Array[K](m)
-      var row = 0
-      for (elem <- blocks.view) {
-        val block = elem._2
-        val rowEnd = row + block.nrow
-        coalescedBlock(row until rowEnd, ::) := block
-        elem._1.copyToArray(coalescedKeys, row)
-        row = rowEnd
-      }
-      coalescedKeys -> coalescedBlock
-    }
-  }
+  def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) =
+    DQR.dqrThin(A, checkRankDeficiency)
+
+  /**
+   * Distributed Stochastic Singular Value decomposition algorithm.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(A, k, p, q)
+
+  /**
+   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+   * document of the MAHOUT-817.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations (hint: use either 0 or 1)
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(A, k, p, q)
+
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
index 48c2048..a872240 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
@@ -51,8 +51,6 @@ class MatrixOps(val m: Matrix) {
 
   def -(that: Matrix) = cloned -= that
 
-  def -:(that: Matrix) = that - m
-
   // m.plus(that)?
 
   def +(that: Double) = cloned += that

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
index af53c51..23c2ac2 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
@@ -20,7 +20,7 @@ import org.apache.mahout.math.{Vector, Matrix}
 import scala.collection.JavaConversions._
 import RLikeOps._
 
-class RLikeMatrixOps(m: Matrix) extends MatrixOps(m) {
+class RLikeMatrixOps(_m: Matrix) extends MatrixOps(_m) {
 
   /**
    * matrix-matrix multiplication
@@ -47,14 +47,12 @@ class RLikeMatrixOps(m: Matrix) extends MatrixOps(m) {
 
   def *(that: Double) = cloned *= that
 
-  def /(that: Matrix) = cloned /= that
+  def /(that:Matrix) = cloned /= that
 
-  def /:(that: Matrix) = that / m
-
-  def /(that: Double) = cloned /= that
+  def /(that:Double) = cloned /= that
 
   /** 1.0 /: A is eqivalent to R's 1.0/A */
-  def /:(that: Double) = that /=: cloned
+  def /:(that:Double) = that /=: cloned
 
   /**
    * in-place Hadamard product. We probably don't want to use assign
@@ -84,7 +82,7 @@ class RLikeMatrixOps(m: Matrix) extends MatrixOps(m) {
   }
 
   /** 1.0 /=: A is equivalent to A = 1.0/A in R */
-  def /=:(that: Double) = {
+  def /=:(that:Double) = {
     m.foreach(that /=: _.vector())
     m
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
new file mode 100644
index 0000000..80385a3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.math._
+import org.apache.mahout.math.{Matrices, Matrix}
+import org.apache.mahout.common.RandomUtils
+import org.apache.log4j.Logger
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+
+private[math] object SSVD {
+
+  private val log = Logger.getLogger(SSVD.getClass)
+
+  /**
+   * In-core SSVD algorithm.
+   *
+   * @param a input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s)
+   */
+  def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+    val m = a.nrow
+    val n = a.ncol
+    if (k > min(m, n))
+      throw new IllegalArgumentException(
+        "k cannot be greater than smaller of m,n")
+    val pfxed = min(p, min(m, n) - k)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    val rnd = RandomUtils.getRandom
+    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+    var y = a %*% omega
+    var yty = y.t %*% y
+    val at = a.t
+    var ch = chol(yty)
+    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+    var bt = ch.solveRight(at %*% y)
+
+    // Power iterations
+    for (i <- 0 until q) {
+      y = a %*% bt
+      yty = y.t %*% y
+      ch = chol(yty)
+      bt = ch.solveRight(at %*% y)
+    }
+
+    val bbt = bt.t %*% bt
+    val (uhat, d) = eigen(bbt)
+
+    val s = d.sqrt
+    val u = ch.solveRight(y) %*% uhat
+    val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+  }
+
+  /**
+   * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
+   * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
+   * to save some memory for sparse inputs by removing direct mean subtraction.<P>
+   *
+   * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
+   * If retaining distances and orignal scaled variances not that important, the normalized PCA space
+   * is just U.
+   *
+   * Important: data points are considered to be rows.
+   *
+   * @param a input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s)
+   */
+  def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+    val m = a.nrow
+    val n = a.ncol
+    if (k > min(m, n))
+      throw new IllegalArgumentException(
+        "k cannot be greater than smaller of m,n")
+    val pfxed = min(p, min(m, n) - k)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    val rnd = RandomUtils.getRandom
+    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+    // Dataset mean
+    val xi = a.colMeans()
+
+    if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
+
+    var y = a %*% omega
+
+    // Fixing y
+    val s_o = omega.t %*% xi
+    y := ((r,c,v) => v - s_o(c))
+
+    var yty = y.t %*% y
+    var ch = chol(yty)
+//    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+
+    // This is implicit Q of QR(Y)
+    var qm = ch.solveRight(y)
+    var bt = a.t %*% qm
+    var s_q = qm.colSums()
+    var s_b = bt.t %*% xi
+
+    // Power iterations
+    for (i <- 0 until q) {
+
+      // Fix bt
+      bt -= xi cross s_q
+
+      y = a %*% bt
+
+      // Fix Y again.
+      y := ((r,c,v) => v - s_b(c))
+
+      yty = y.t %*% y
+      ch = chol(yty)
+      qm = ch.solveRight(y)
+      bt = a.t %*% qm
+      s_q = qm.colSums()
+      s_b = bt.t %*% xi
+    }
+
+    val c = s_q cross s_b
+
+    // BB' computation becomes
+    val bbt = bt.t %*% bt -c - c.t +  (s_q cross s_q) * (xi dot xi)
+
+    val (uhat, d) = eigen(bbt)
+
+    val s = d.sqrt
+    val u = qm %*% uhat
+    val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
index 2b7773b..4599146 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
@@ -281,5 +281,26 @@ package object scalabindings {
     x(::, 0)
   }
 
+  def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = SSVD.ssvd(a, k, p, q)
+
+  /**
+   * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
+   * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
+   * to save some memory for sparse inputs by removing direct mean subtraction.<P>
+   *
+   * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
+   * If retaining distances and orignal scaled variances not that important, the normalized PCA space
+   * is just U.
+   *
+   * Important: data points are considered to be rows.
+   *
+   * @param a input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s)
+   */
+  def spca(a: Matrix, k: Int, p: Int = 15, q: Int = 0) =
+    SSVD.spca(a = a, k = k, p = p, q = q)
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
index b10cde3..d1171cc 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
@@ -193,6 +193,82 @@ class MathSuite extends FunSuite with MahoutSuite {
 
   }
 
+  test("ssvd") {
+    val a = dense(
+      (1, 2, 3),
+      (3, 4, 5),
+      (-2, 6, 7),
+      (-3, 8, 9)
+    )
+
+    val rank = 2
+    val (u, v, s) = ssvd(a, k = rank, q = 1)
+
+    val (uControl, vControl, sControl) = svd(a)
+
+    printf("U:\n%s\n", u)
+    printf("U-control:\n%s\n", uControl)
+    printf("V:\n%s\n", v)
+    printf("V-control:\n%s\n", vControl)
+    printf("Sigma:\n%s\n", s)
+    printf("Sigma-control:\n%s\n", sControl)
+
+    (s - sControl(0 until rank)).norm(2) should be < 1E-7
+
+    // Singular vectors may be equivalent down to a sign only.
+    (u.norm - uControl(::, 0 until rank).norm).abs should be < 1E-7
+    (v.norm - vControl(::, 0 until rank).norm).abs should be < 1E-7
+
+  }
+
+  test("spca") {
+
+    import math._
+
+    val rnd = RandomUtils.getRandom
+
+    // Number of points
+    val m =  500
+    // Length of actual spectrum
+    val spectrumLen = 40
+
+    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
+    printf("spectrum:%s\n", spectrum)
+
+    val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
+        ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
+
+    // PCA Rotation matrix -- should also be orthonormal.
+    val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
+
+    val input = (u %*%: diagv(spectrum)) %*% tr.t
+
+    // Calculate just first 10 principal factors and reduce dimensionality.
+    // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
+    // ensure to zero stochastic error and assert only functional correctness of the method's pca-
+    // specific additions.
+    val k = 10
+    var (pca, _, s) = spca(a = input, k = k, p=spectrumLen, q = 1)
+    printf("Svs:%s\n",s)
+    // Un-normalized pca data:
+    pca = pca %*%: diagv(s)
+
+    // Of course, once we calculated the pca, the spectrum is going to be different since our originally
+    // generated input was not centered. So here, we'd just brute-solve pca to verify
+    val xi = input.colMeans()
+    for (r <- 0 until input.nrow) input(r, ::) -= xi
+    var (pcaControl, _, sControl) = svd(m = input)
+
+    printf("Svs-control:%s\n",sControl)
+    pcaControl = (pcaControl %*%: diagv(sControl))(::,0 until k)
+
+    printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
+    printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
+
+    (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
+
+  }
+
   test("random uniform") {
     val omega1 = Matrices.symmetricUniformView(2, 3, 1234)
     val omega2 = Matrices.symmetricUniformView(2, 3, 1234)

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 3a03e58..0c904ab 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -17,9 +17,9 @@
 
 package org.apache.mahout.sparkbindings
 
-import org.apache.mahout.math._
-import scalabindings._
+import org.apache.mahout.math.scalabindings._
 import RLikeOps._
+import org.apache.mahout.math.drm._
 import org.apache.mahout.math.drm.logical._
 import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput}
 import org.apache.mahout.math._
@@ -30,8 +30,6 @@ import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
 import scala.Some
 import scala.collection.JavaConversions._
 import org.apache.spark.SparkContext
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.RLikeDrmOps._
 
 /** Spark-specific non-drm-method operations */
 object SparkEngine extends DistributedEngine {
@@ -53,17 +51,7 @@ object SparkEngine extends DistributedEngine {
   }
 
   /** Engine-specific colMeans implementation based on a checkpoint. */
-  override def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector =
-    if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
-
-  override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double =
-    drm.rdd
-        // Compute sum of squares of each vector
-        .map {
-      case (key, v) => v dot v
-    }
-        .reduce(_ + _)
-
+  def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector = if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
 
   /**
    * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
@@ -135,10 +123,7 @@ object SparkEngine extends DistributedEngine {
 
     {
       implicit def getWritable(x: Any): Writable = val2key()
-      new CheckpointedDrmSpark(
-        rdd = rdd.map(t => (key2val(t._1), t._2)),
-        _cacheStorageLevel = StorageLevel.MEMORY_ONLY
-      )(km.asInstanceOf[ClassTag[Any]])
+      new CheckpointedDrmSpark(rdd.map(t => (key2val(t._1), t._2)))(km.asInstanceOf[ClassTag[Any]])
     }
   }
 
@@ -225,8 +210,16 @@ object SparkEngine extends DistributedEngine {
       case op@OpAtA(a) => AtA.at_a(op, tr2phys(a)(op.classTagA))
       case op@OpAx(a, x) => Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
       case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
-      case op@OpAewB(a, b, opId) => AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAewScalar(a, s, _) => AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewB(a, b, '+') => AewB.a_plus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAewB(a, b, '-') => AewB.a_minus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAewB(a, b, '*') => AewB.a_hadamard_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAewB(a, b, '/') => AewB.a_eldiv_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAewScalar(a, s, "+") => AewB.a_plus_scalar(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewScalar(a, s, "-") => AewB.a_minus_scalar(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewScalar(a, s, "-:") => AewB.scalar_minus_a(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewScalar(a, s, "*") => AewB.a_times_scalar(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewScalar(a, s, "/") => AewB.a_div_scalar(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewScalar(a, s, "/:") => AewB.scalar_div_a(op, tr2phys(a)(op.classTagA), s)
       case op@OpRowRange(a, _) => Slicing.rowRange(op, tr2phys(a)(op.classTagA))
       case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
       // Custom operators, we just execute them

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 1e3f286..97873bd 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -65,22 +65,16 @@ object ABt {
     // Blockify everything.
     val blocksA = srcA.toBlockifiedDrmRdd()
 
-        // Mark row-blocks with group id
+        // mark row-blocks with group id
         .mapPartitionsWithIndex((part, iter) => {
+      val rowBlockId = part
+      val (blockKeys, block) = iter.next()
 
-      if (iter.isEmpty) {
-        Iterator.empty
-      } else {
-
-        val rowBlockId = part
-        val (blockKeys, block) = iter.next()
+      // Each partition must have exactly one block due to implementation guarantees of blockify()
+      iter.ensuring(!_.hasNext)
 
-        // Each partition must have exactly one block due to implementation guarantees of blockify()
-        assert(!iter.hasNext, "Partition #%d is expected to have at most 1 block at AB'.".format(part))
-
-        // the output is (row block id, array of row keys, and the matrix representing the block).
-        Iterator((rowBlockId, blockKeys, block))
-      }
+      // the output is (row block id, array of row keys, and the matrix representing the block).
+      Iterator((rowBlockId, blockKeys, block))
     })
 
     val blocksB = srcB.toBlockifiedDrmRdd()
@@ -102,7 +96,7 @@ object ABt {
 
 
     // The plan.
-    var blockifiedRdd :BlockifiedDrmRdd[K] = blocksA
+    val blockifiedRdd :BlockifiedDrmRdd[K] = blocksA
 
         // Build Cartesian. It may require a bit more memory there at tasks.
         .cartesian(blocksB)
@@ -125,7 +119,7 @@ object ABt {
         // Combine -- this is probably the most efficient
         .combineByKey[(Array[K],Matrix)](
 
-          createCombiner = (t: (Array[K], Array[Int], Matrix)) => t match {
+          createCombiner = (t:(Array[K],Array[Int],Matrix)) => t match {
             case (rowKeys, colKeys, blockProd) =>
 
               // Accumulator is a row-wise block of sparse vectors.
@@ -154,20 +148,11 @@ object ABt {
           mergeCombiners = (c1: (Array[K], Matrix), c2: (Array[K], Matrix)) => {
             c1._2 += c2._2
             c1
-          },
-
-          // Cartesian will tend to produce much more partitions that we actually need for co-grouping,
-          // and as a result, we may see empty partitions than we actually need.
-          numPartitions = numProductPartitions
-        )
+          })
 
         // Combine leaves residual block key -- we don't need that.
         .map(_._2)
 
-    // This may produce more than one block per partition. Most implementation rely on convention of
-    // having at most one block per partition.
-    blockifiedRdd = rbind(blockifiedRdd)
-
     new DrmRddInput(blockifiedSrc = Some(blockifiedRdd))
   }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index 384b986..ec6e99e 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -24,91 +24,67 @@ import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.{Matrix, Vector}
 import org.apache.mahout.math.drm.logical.{OpAewScalar, OpAewB}
-import org.apache.log4j.Logger
-import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFuncScalar, ReduceFunc}
 
 /** Elementwise drm-drm operators */
 object AewB {
 
-  private val log = Logger.getLogger(AewB.getClass)
+  @inline
+  def a_plus_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] =
+    a_ew_b(op, srcA, srcB, reduceFunc = (a, b) => a += b)
 
-  /**
-   * Set to false to disallow in-place elementwise operations in case side-effects and non-idempotent
-   * computations become a problem.
-   */
-  final val PROPERTY_AEWB_INPLACE = "mahout.math.AewB.inplace"
+  @inline
+  def a_minus_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] =
+    a_ew_b(op, srcA, srcB, reduceFunc = (a, b) => a -= b)
 
-  type ReduceFunc = (Vector, Vector) => Vector
+  @inline
+  def a_hadamard_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] =
+    a_ew_b(op, srcA, srcB, reduceFunc = (a, b) => a *= b)
 
-  type ReduceFuncScalar = (Matrix, Double) => Matrix
+  @inline
+  def a_eldiv_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] =
+    a_ew_b(op, srcA, srcB, reduceFunc = (a, b) => a /= b)
 
-  private[blas] def getEWOps() = {
-    val inplaceProp = System.getProperty(PROPERTY_AEWB_INPLACE, "true").toBoolean
-    if (inplaceProp) InplaceEWOps else CloningEWOps
-  }
-
-  /** Elementwise matrix-matrix operator, now handles both non- and identically partitioned */
-  def a_ew_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
+  @inline
+  def a_plus_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
+    a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => A += s)
 
-    val ewOps = getEWOps()
-    val opId = op.op
+  @inline
+  def a_minus_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
+    a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => A -= s)
 
-    val reduceFunc = opId match {
-      case "+" => ewOps.plus
-      case "-" => ewOps.minus
-      case "*" => ewOps.hadamard
-      case "/" => ewOps.eldiv
-      case default => throw new IllegalArgumentException("Unsupported elementwise operator:%s.".format(opId))
-    }
+  @inline
+  def scalar_minus_a[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
+    a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => s -=: A)
 
-    val a = srcA.toDrmRdd()
-    val b = srcB.toDrmRdd()
+  @inline
+  def a_times_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
+    a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => A *= s)
 
-    // Check if A and B are identically partitioned AND keyed. if they are, then just perform zip
-    // instead of join, and apply the op map-side. Otherwise, perform join and apply the op
-    // reduce-side.
-    val rdd = if (op.isIdenticallyPartitioned(op.A)) {
+  @inline
+  def a_div_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
+    a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => A /= s)
 
-      log.debug("applying zipped elementwise")
+  @inline
+  def scalar_div_a[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
+    a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => s /=: A)
 
-      a
-          .zip(b)
-          .map {
-        case ((keyA, vectorA), (keyB, vectorB)) =>
-          assert(keyA == keyB, "inputs are claimed identically partitioned, but they are not identically keyed")
-          keyA -> reduceFunc(vectorA, vectorB)
-      }
-    } else {
-
-      log.debug("applying elementwise as join")
-
-      a
-          .cogroup(b, numPartitions = a.partitions.size max b.partitions.size)
-          .map({
-        case (key, (vectorSeqA, vectorSeqB)) =>
-          key -> reduceFunc(vectorSeqA.reduce(reduceFunc), vectorSeqB.reduce(reduceFunc))
-      })
-    }
+  /** Parallel way of this operation (this assumes different partitioning of the sources */
+  private[blas] def a_ew_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K],
+      reduceFunc: (Vector, Vector) => Vector): DrmRddInput[K] = {
+    val a = srcA.toDrmRdd()
+    val b = srcB.toDrmRdd()
+    val rdd = a
+        .cogroup(b, numPartitions = a.partitions.size max b.partitions.size)
+        .map({
+      case (key, (vectorSeqA, vectorSeqB)) =>
+        key -> reduceFunc(vectorSeqA.reduce(reduceFunc), vectorSeqB.reduce(reduceFunc))
+    })
 
     new DrmRddInput(rowWiseSrc = Some(op.ncol -> rdd))
   }
 
-  /** Physical algorithm to handle matrix-scalar operators like A - s or s -: A */
-  def a_ew_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double):
-  DrmRddInput[K] = {
-
-    val ewOps = getEWOps()
-    val opId = op.op
-
-    val reduceFunc = opId match {
-      case "+" => ewOps.plusScalar
-      case "-" => ewOps.minusScalar
-      case "*" => ewOps.timesScalar
-      case "/" => ewOps.divScalar
-      case "-:" => ewOps.scalarMinus
-      case "/:" => ewOps.scalarDiv
-      case default => throw new IllegalArgumentException("Unsupported elementwise operator:%s.".format(opId))
-    }
+  private[blas] def a_ew_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double,
+      reduceFunc: (Matrix, Double) => Matrix): DrmRddInput[K] = {
     val a = srcA.toBlockifiedDrmRdd()
     val rdd = a
         .map({
@@ -116,55 +92,6 @@ object AewB {
     })
     new DrmRddInput[K](blockifiedSrc = Some(rdd))
   }
-}
-
-trait EWOps {
-
-  val plus: ReduceFunc
 
-  val minus: ReduceFunc
 
-  val hadamard: ReduceFunc
-
-  val eldiv: ReduceFunc
-
-  val plusScalar: ReduceFuncScalar
-
-  val minusScalar: ReduceFuncScalar
-
-  val scalarMinus: ReduceFuncScalar
-
-  val timesScalar: ReduceFuncScalar
-
-  val divScalar: ReduceFuncScalar
-
-  val scalarDiv: ReduceFuncScalar
-
-}
-
-object InplaceEWOps extends EWOps {
-  val plus: ReduceFunc = (a, b) => a += b
-  val minus: ReduceFunc = (a, b) => a -= b
-  val hadamard: ReduceFunc = (a, b) => a *= b
-  val eldiv: ReduceFunc = (a, b) => a /= b
-  val plusScalar: ReduceFuncScalar = (A, s) => A += s
-  val minusScalar: ReduceFuncScalar = (A, s) => A -= s
-  val scalarMinus: ReduceFuncScalar = (A, s) => s -=: A
-  val timesScalar: ReduceFuncScalar = (A, s) => A *= s
-  val divScalar: ReduceFuncScalar = (A, s) => A /= s
-  val scalarDiv: ReduceFuncScalar = (A, s) => s /=: A
 }
-
-object CloningEWOps extends EWOps {
-  val plus: ReduceFunc = (a, b) => a + b
-  val minus: ReduceFunc = (a, b) => a - b
-  val hadamard: ReduceFunc = (a, b) => a * b
-  val eldiv: ReduceFunc = (a, b) => a / b
-  val plusScalar: ReduceFuncScalar = (A, s) => A + s
-  val minusScalar: ReduceFuncScalar = (A, s) => A - s
-  val scalarMinus: ReduceFuncScalar = (A, s) => s -: A
-  val timesScalar: ReduceFuncScalar = (A, s) => A * s
-  val divScalar: ReduceFuncScalar = (A, s) => A / s
-  val scalarDiv: ReduceFuncScalar = (A, s) => s /: A
-}
-

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 1e2028d..2d80fe3 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -19,9 +19,8 @@ package org.apache.mahout.sparkbindings.drm
 
 import org.apache.mahout.math._
 import math._
-import scalabindings._
+import org.apache.mahout.math.scalabindings._
 import RLikeOps._
-import drm._
 import scala.collection.JavaConversions._
 import org.apache.spark.storage.StorageLevel
 import reflect._
@@ -96,9 +95,8 @@ class CheckpointedDrmSpark[K: ClassTag](
 
     val intRowIndices = implicitly[ClassTag[K]] == implicitly[ClassTag[Int]]
 
-    val cols = ncol
-    val rows = safeToNonNegInt(nrow)
-
+    val cols = rdd.map(_._2.length).fold(0)(max(_, _))
+    val rows = if (intRowIndices) rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1 else rdd.count().toInt
 
     // since currently spark #collect() requires Serializeable support,
     // we serialize DRM vectors into byte arrays on backend and restore Vector

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
index 37a9ac2..322d67c 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
@@ -23,14 +23,13 @@ import scala.collection.JavaConversions._
 import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
 import org.apache.log4j.Logger
 import java.lang.Math
-import org.apache.spark.rdd.{FilteredRDD, RDD}
+import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.mahout.math.drm._
 import SparkContext._
-import org.apache.mahout.math
 
 
 package object drm {
@@ -57,9 +56,8 @@ package object drm {
 
     rdd.mapPartitions(iter => {
 
-      if (iter.isEmpty) {
-        Iterator.empty
-      } else {
+      if (!iter.hasNext) Iterator.empty
+      else {
 
         val data = iter.toIterable
         val keys = data.map(t => t._1).toArray[K]
@@ -72,34 +70,24 @@ package object drm {
     })
   }
 
-  /** Performs rbind() on all blocks inside same partition to ensure there's only one block here. */
-  private[sparkbindings] def rbind[K: ClassTag](rdd: BlockifiedDrmRdd[K]): BlockifiedDrmRdd[K] =
-    rdd.mapPartitions(iter => {
-      if (iter.isEmpty) {
-        Iterator.empty
-      } else {
-        Iterator(math.drm.rbind(iter.toIterable))
-      }
-    })
-
   private[sparkbindings] def deblockify[K: ClassTag](rdd: BlockifiedDrmRdd[K]): DrmRdd[K] =
 
   // Just flat-map rows, connect with the keys
-    rdd.flatMap {
+    rdd.flatMap({
       case (blockKeys: Array[K], block: Matrix) =>
 
         blockKeys.ensuring(blockKeys.size == block.nrow)
-        blockKeys.view.zipWithIndex.map {
+        blockKeys.view.zipWithIndex.map({
           case (key, idx) =>
-            var v = block(idx, ::) // This is just a view!
+            var v = block(idx, ::)
 
             // If a view rather than a concrete vector, clone into a concrete vector in order not to
             // attempt to serialize outer matrix when we save it (Although maybe most often this
             // copying is excessive?)
             // if (v.isInstanceOf[MatrixVectorView]) v = v.cloned
             key -> v
-        }
-    }
+        })
 
+    })
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index c4ef0d3..34d81cf 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -29,7 +29,6 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.broadcast.Broadcast
 import org.apache.mahout.math.{VectorWritable, Vector, MatrixWritable, Matrix}
 import org.apache.hadoop.io.Writable
-import org.apache.spark.storage.StorageLevel
 
 /** Public api for Spark-specific operators */
 package object sparkbindings {
@@ -180,8 +179,7 @@ package object sparkbindings {
     new CheckpointedDrmSpark[K](
       rdd = rdd,
       _nrow = nrow,
-      _ncol = ncol,
-      _cacheStorageLevel = StorageLevel.NONE
+      _ncol = ncol
     )
 
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
index 52e2b35..0834145 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
@@ -33,15 +33,13 @@ class ABtSuite extends FunSuite with MahoutLocalContext {
   test("ABt") {
     val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
     val inCoreB = dense((3, 4, 5), (5, 6, 7))
-    val A = drmParallelize(m = inCoreA, numPartitions = 3)
-    val B = drmParallelize(m = inCoreB, numPartitions = 2)
+    val A = drmParallelize(m = inCoreA, numPartitions = 2)
+    val B = drmParallelize(m = inCoreB)
 
     val op = new OpABt(A, B)
 
     val drm = new CheckpointedDrmSpark(ABt.abt(op, srcA = A, srcB = B), op.nrow, op.ncol)
 
-    printf("AB' num partitions = %d.\n", drm.rdd.partitions.size)
-
     val inCoreMControl = inCoreA %*% inCoreB.t
     val inCoreM = drm.collect
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
index 389ef65..2efa5ff 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
@@ -36,9 +36,9 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
     val A = drmParallelize(m = inCoreA, numPartitions = 2)
     val B = drmParallelize(m = inCoreB)
 
-    val op = new OpAewB(A, B, "*")
+    val op = new OpAewB(A, B, '*')
 
-    val M = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+    val M = new CheckpointedDrmSpark(AewB.a_hadamard_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
 
     val inCoreM = M.collect
     val inCoreMControl = inCoreA * inCoreB
@@ -53,9 +53,9 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
     val A = drmParallelize(m = inCoreA, numPartitions = 2)
     val B = drmParallelize(m = inCoreB)
 
-    val op = new OpAewB(A, B, "+")
+    val op = new OpAewB(A, B, '+')
 
-    val M = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+    val M = new CheckpointedDrmSpark(AewB.a_plus_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
 
     val inCoreM = M.collect
     val inCoreMControl = inCoreA + inCoreB
@@ -70,9 +70,9 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
     val A = drmParallelize(m = inCoreA, numPartitions = 2)
     val B = drmParallelize(m = inCoreB)
 
-    val op = new OpAewB(A, B, "-")
+    val op = new OpAewB(A, B, '-')
 
-    val M = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+    val M = new CheckpointedDrmSpark(AewB.a_minus_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
 
     val inCoreM = M.collect
     val inCoreMControl = inCoreA - inCoreB
@@ -87,9 +87,9 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
     val A = drmParallelize(m = inCoreA, numPartitions = 2)
     val B = drmParallelize(m = inCoreB)
 
-    val op = new OpAewB(A, B, "/")
+    val op = new OpAewB(A, B, '/')
 
-    val M = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+    val M = new CheckpointedDrmSpark(AewB.a_eldiv_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
 
     val inCoreM = M.collect
     val inCoreMControl = inCoreA / inCoreB

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
new file mode 100644
index 0000000..2369441
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.drm.decompositions
+
+import org.scalatest.{Matchers, FunSuite}
+import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.math._
+import drm._
+import scalabindings._
+import RLikeOps._
+import RLikeDrmOps._
+import org.apache.mahout.sparkbindings._
+import org.apache.mahout.common.RandomUtils
+
+class MathSuite extends FunSuite with Matchers with MahoutLocalContext {
+
+  test("thin distributed qr") {
+
+    val inCoreA = dense(
+      (1, 2, 3, 4),
+      (2, 3, 4, 5),
+      (3, -4, 5, 6),
+      (4, 5, 6, 7),
+      (8, 6, 7, 8)
+    )
+
+    val A = drmParallelize(inCoreA, numPartitions = 2)
+    val (drmQ, inCoreR) = dqrThin(A, checkRankDeficiency = false)
+
+    // Assert optimizer still knows Q and A are identically partitioned
+    drmQ.partitioningTag should equal (A.partitioningTag)
+
+    drmQ.rdd.partitions.size should be(A.rdd.partitions.size)
+
+    // Should also be zippable
+    drmQ.rdd.zip(other = A.rdd)
+
+    val inCoreQ = drmQ.collect
+
+    printf("A=\n%s\n", inCoreA)
+    printf("Q=\n%s\n", inCoreQ)
+    printf("R=\n%s\n", inCoreR)
+
+    val (qControl, rControl) = qr(inCoreA)
+    printf("qControl=\n%s\n", qControl)
+    printf("rControl=\n%s\n", rControl)
+
+    // Validate with Cholesky
+    val ch = chol(inCoreA.t %*% inCoreA)
+    printf("A'A=\n%s\n", inCoreA.t %*% inCoreA)
+    printf("L:\n%s\n", ch.getL)
+
+    val rControl2 = (ch.getL cloned).t
+    val qControl2 = ch.solveRight(inCoreA)
+    printf("qControl2=\n%s\n", qControl2)
+    printf("rControl2=\n%s\n", rControl2)
+
+    // Housholder approach seems to be a little bit more stable
+    (rControl - inCoreR).norm should be < 1E-5
+    (qControl - inCoreQ).norm should be < 1E-5
+
+    // Assert identicity with in-core Cholesky-based -- this should be tighter.
+    (rControl2 - inCoreR).norm should be < 1E-10
+    (qControl2 - inCoreQ).norm should be < 1E-10
+
+    // Assert orhtogonality:
+    // (a) Q[,j] dot Q[,j] == 1.0 for all j
+    // (b) Q[,i] dot Q[,j] == 0.0 for all i != j
+    for (col <- 0 until inCoreQ.ncol)
+      ((inCoreQ(::, col) dot inCoreQ(::, col)) - 1.0).abs should be < 1e-10
+    for (col1 <- 0 until inCoreQ.ncol - 1; col2 <- col1 + 1 until inCoreQ.ncol)
+      (inCoreQ(::, col1) dot inCoreQ(::, col2)).abs should be < 1e-10
+
+
+  }
+
+  test("dssvd - the naive-est - q=0") {
+    dssvdNaive(q = 0)
+  }
+
+  test("ddsvd - naive - q=1") {
+    dssvdNaive(q = 1)
+  }
+
+  test("ddsvd - naive - q=2") {
+    dssvdNaive(q = 2)
+  }
+
+
+  def dssvdNaive(q: Int) {
+    val inCoreA = dense(
+      (1, 2, 3, 4),
+      (2, 3, 4, 5),
+      (3, -4, 5, 6),
+      (4, 5, 6, 7),
+      (8, 6, 7, 8)
+    )
+    val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+    val (drmU, drmV, s) = dssvd(drmA, k = 4, q = q)
+    val (inCoreU, inCoreV) = (drmU.collect, drmV.collect)
+
+    printf("U:\n%s\n", inCoreU)
+    printf("V:\n%s\n", inCoreV)
+    printf("Sigma:\n%s\n", s)
+
+    (inCoreA - (inCoreU %*%: diagv(s)) %*% inCoreV.t).norm should be < 1E-5
+  }
+
+  test("dspca") {
+
+    import math._
+
+    val rnd = RandomUtils.getRandom
+
+    // Number of points
+    val m =  500
+    // Length of actual spectrum
+    val spectrumLen = 40
+
+    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
+    printf("spectrum:%s\n", spectrum)
+
+    val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
+        ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
+
+    // PCA Rotation matrix -- should also be orthonormal.
+    val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
+
+    val input = (u %*%: diagv(spectrum)) %*% tr.t
+    val drmInput = drmParallelize(m = input, numPartitions = 2)
+
+    // Calculate just first 10 principal factors and reduce dimensionality.
+    // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
+    // ensure to zero stochastic error and assert only functional correctness of the method's pca-
+    // specific additions.
+    val k = 10
+
+    // Calculate just first 10 principal factors and reduce dimensionality.
+    var (drmPCA, _, s) = dspca(A = drmInput, k = 10, p = spectrumLen, q = 1)
+    // Un-normalized pca data:
+    drmPCA = drmPCA %*% diagv(s)
+
+    val pca = drmPCA.checkpoint(CacheHint.NONE).collect
+
+    // Of course, once we calculated the pca, the spectrum is going to be different since our originally
+    // generated input was not centered. So here, we'd just brute-solve pca to verify
+    val xi = input.colMeans()
+    for (r <- 0 until input.nrow) input(r, ::) -= xi
+    var (pcaControl, _, sControl) = svd(m = input)
+    pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
+
+    printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
+    printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
+
+    (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
index 6152426..ac46a9e 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
@@ -19,7 +19,6 @@ package org.apache.mahout.sparkbindings.drm
 
 import org.scalatest.{Matchers, FunSuite}
 import org.apache.mahout.math._
-import decompositions._
 import scalabindings._
 import drm._
 import RLikeOps._
@@ -31,7 +30,6 @@ import org.apache.mahout.math.Matrices
 import org.apache.mahout.sparkbindings.{SparkEngine, blas}
 import org.apache.spark.storage.StorageLevel
 import org.apache.mahout.math.drm.logical.{OpAtx, OpAtB, OpAtA}
-import scala.util.Random
 
 /** R-like DRM DSL operation tests */
 class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
@@ -313,41 +311,6 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
     (inCoreC - inCoreCControl).norm should be < 1E-10
   }
 
-  test("C = A + B, identically partitioned") {
-
-    val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2)
-
-    printf("A.nrow=%d.\n",A.rdd.count())
-
-    // Create B which would be identically partitioned to A. mapBlock() by default will do the trick.
-    val B = A.mapBlock() {
-      case (keys, block) =>
-        val bBlock = block.like() := ((r,c,v) => util.Random.nextDouble())
-        keys -> bBlock
-    }
-        // Prevent repeated computation non-determinism
-        .checkpoint()
-
-    val inCoreB = B.collect
-
-    printf("A=\n%s\n", inCoreA)
-    printf("B=\n%s\n", inCoreB)
-
-    val C = A + B
-
-    val inCoreC = C.collect
-
-    printf("C=\n%s\n", inCoreC)
-
-    // Actual
-    val inCoreCControl = inCoreA + inCoreB
-
-    (inCoreC - inCoreCControl).norm should be < 1E-10
-  }
-
-
   test("C = A + B side test 1") {
 
     val inCoreA = dense((1, 2), (3, 4))

http://git-wip-us.apache.org/repos/asf/mahout/blob/26d3db6b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
index d9e89bc..2d9ed76 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
@@ -14,7 +14,7 @@ trait MahoutLocalContext extends MahoutSuite with LoggerConfiguration {
   override protected def beforeEach() {
     super.beforeEach()
 
-    mahoutCtx = mahoutSparkContext(masterUrl = "local[2]",
+    mahoutCtx = mahoutSparkContext(masterUrl = "local[3]",
       appName = "MahoutLocalContext",
       // Do not run MAHOUT_HOME jars in unit tests.
       addMahoutJars = false,


Mime
View raw message