mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlyubi...@apache.org
Subject git commit: MAHOUT-1566: (Experimental) Regular ALS factorizer with conversion tests, optimizer enhancements and bug fixes. This closes apache/mahout#6.
Date Wed, 04 Jun 2014 21:05:05 GMT
Repository: mahout
Updated Branches:
  refs/heads/master 127bd0134 -> 656878ea6


MAHOUT-1566: (Experimental) Regular ALS factorizer with conversion tests, optimizer enhancements and bug fixes.
This closes apache/mahout#6.


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/656878ea
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/656878ea
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/656878ea

Branch: refs/heads/master
Commit: 656878ea69adf7663015156b55ad5c9ea3ec7e4f
Parents: 127bd01
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Authored: Wed Jun 4 14:03:43 2014 -0700
Committer: Dmitriy Lyubimov <dlyubimov@apache.org>
Committed: Wed Jun 4 14:03:43 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../mahout/math/drm/CheckpointedOps.scala       |   3 +-
 .../mahout/math/drm/DistributedEngine.scala     |  11 +-
 .../org/apache/mahout/math/drm/DrmLike.scala    |   2 +-
 .../org/apache/mahout/math/drm/DrmLikeOps.scala |   9 +-
 .../apache/mahout/math/drm/RLikeDrmOps.scala    |   9 +-
 .../mahout/math/drm/decompositions/DQR.scala    |  57 ------
 .../mahout/math/drm/decompositions/DSPCA.scala  | 153 ----------------
 .../mahout/math/drm/decompositions/DSSVD.scala  |  83 ---------
 .../apache/mahout/math/drm/logical/OpAewB.scala |   9 +-
 .../mahout/math/drm/logical/OpMapBlock.scala    |   8 +-
 .../org/apache/mahout/math/drm/package.scala    |  79 ++++-----
 .../mahout/math/scalabindings/MatrixOps.scala   |   2 +
 .../math/scalabindings/RLikeMatrixOps.scala     |  12 +-
 .../scalabindings/decompositions/SSVD.scala     | 165 -----------------
 .../mahout/math/scalabindings/package.scala     |  21 ---
 .../mahout/math/scalabindings/MathSuite.scala   |  76 --------
 .../mahout/sparkbindings/SparkEngine.scala      |  35 ++--
 .../apache/mahout/sparkbindings/blas/ABt.scala  |  35 ++--
 .../apache/mahout/sparkbindings/blas/AewB.scala | 159 ++++++++++++-----
 .../drm/CheckpointedDrmSpark.scala              |   8 +-
 .../mahout/sparkbindings/drm/package.scala      |  28 ++-
 .../apache/mahout/sparkbindings/package.scala   |   4 +-
 .../mahout/sparkbindings/blas/ABtSuite.scala    |   6 +-
 .../mahout/sparkbindings/blas/AewBSuite.scala   |  16 +-
 .../decompositions/MathSuite.scala              | 176 -------------------
 .../sparkbindings/drm/RLikeDrmOpsSuite.scala    |  37 ++++
 .../sparkbindings/test/MahoutLocalContext.scala |   2 +-
 28 files changed, 320 insertions(+), 887 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index bea124a..3b2e61b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 1.0 - unreleased
 
+  MAHOUT-1566: (Experimental) Regular ALS factorizer with conversion tests, optimizer enhancements and bug fixes (dlyubimov)
+
   MAHOUT-1537: Minor fixes to spark-shell (Anand Avati via dlyubimov)
 
   MAHOUT-1529: Finalize abstraction of distributed logical plans from backend operations (dlyubimov)

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
index fa1ccfd..edd0cfc 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
@@ -18,7 +18,7 @@
 package org.apache.mahout.math.drm
 
 import scala.reflect.ClassTag
-import org.apache.mahout.math.Vector
+import org.apache.mahout.math._
 
 
 /**
@@ -35,5 +35,6 @@ class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
   /** Column Means */
   def colMeans(): Vector = drm.context.colMeans(drm)
 
+  def norm():Double = drm.context.norm(drm)
 }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index 0e76d87..5ffee9d 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -19,10 +19,11 @@ package org.apache.mahout.math.drm
 
 import scala.reflect.ClassTag
 import logical._
-import org.apache.mahout.math.{Matrix, Vector}
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
 import DistributedEngine._
 import org.apache.mahout.math.scalabindings._
-import RLikeOps._
 
 /** Abstraction of optimizer/distributed engine */
 trait DistributedEngine {
@@ -43,10 +44,12 @@ trait DistributedEngine {
   def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K]
 
   /** Engine-specific colSums implementation based on a checkpoint. */
-  def colSums[K:ClassTag](drm:CheckpointedDrm[K]):Vector
+  def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector
 
   /** Engine-specific colMeans implementation based on a checkpoint. */
-  def colMeans[K:ClassTag](drm:CheckpointedDrm[K]):Vector
+  def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector
+
+  def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double
 
   /** Broadcast support */
   def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector]

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
index 8e0db1e..7fbfc12 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
@@ -28,7 +28,7 @@ package org.apache.mahout.math.drm
  */
 trait DrmLike[K] {
 
-  protected[mahout] def partitioningTag:Long
+  protected[mahout] def partitioningTag: Long
 
   protected[mahout] val context:DistributedContext
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
index 35e28af..328805a 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
@@ -36,9 +36,14 @@ class DrmLikeOps[K : ClassTag](protected[drm] val drm: DrmLike[K]) {
    * @tparam R
    * @return
    */
-  def mapBlock[R : ClassTag](ncol: Int = -1)
+  def mapBlock[R: ClassTag](ncol: Int = -1, identicallyParitioned: Boolean = true)
       (bmf: BlockMapFunc[K, R]): DrmLike[R] =
-    new OpMapBlock[K, R](A = drm, bmf = bmf, _ncol = ncol)
+    new OpMapBlock[K, R](
+      A = drm,
+      bmf = bmf,
+      _ncol = ncol,
+      identicallyPartitioned = identicallyParitioned
+    )
 
 
   /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
index f46d15c..7ac5577 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -25,13 +25,13 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
 
   import RLikeDrmOps._
 
-  def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '+')
+  def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "+")
 
-  def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '-')
+  def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "-")
 
-  def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '*')
+  def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "*")
 
-  def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '/')
+  def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "/")
 
   def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+")
 
@@ -70,7 +70,6 @@ class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
 
   def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm)
 
-
 }
 
 object RLikeDrmOps {

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
deleted file mode 100644
index 34ae345..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.mahout.math.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import RLikeDrmOps._
-import org.apache.log4j.Logger
-
-object DQR {
-
-  private val log = Logger.getLogger(DQR.getClass)
-
-  /**
-   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
-   * controlled (<5000 or so). <P>
-   *
-   * It is recommended to checkpoint A since it does two passes over it. <P>
-   *
-   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
-   * their RDD should be able to zip successfully.
-   */
-  def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
-
-    if (A.ncol > 5000)
-      log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
-
-    implicit val ctx = A.context
-
-    val AtA = (A.t %*% A).checkpoint()
-    val inCoreAtA = AtA.collect
-
-    if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
-
-    val ch = chol(inCoreAtA)
-    val inCoreR = (ch.getL cloned) t
-
-    if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
-
-    if (checkRankDeficiency && !ch.isPositiveDefinite)
-      throw new IllegalArgumentException("R is rank-deficient.")
-
-    val bcastAtA = drmBroadcast(inCoreAtA)
-
-    // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
-    // decompose A'A in the backend again.
-
-    // Compute Q = A*inv(L') -- we can do it blockwise.
-    val Q = A.mapBlock() {
-      case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
-    }
-
-    Q -> inCoreR
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
deleted file mode 100644
index 9e33416..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-
-object DSPCA {
-
-  /**
-   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
-   * document of the MAHOUT-817.
-   *
-   * @param A input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations (hint: use either 0 or 1)
-   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
-   *         e.g. save them to hdfs in order to trigger their computation.
-   */
-  def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
-  (DrmLike[K], DrmLike[Int], Vector) = {
-
-    val drmA = A.checkpoint()
-    implicit val ctx = A.context
-
-    val m = drmA.nrow
-    val n = drmA.ncol
-    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
-    val pfxed = safeToNonNegInt((m min n) - k min p)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    // Dataset mean
-    val xi = drmA.colMeans
-
-    // We represent Omega by its seed.
-    val omegaSeed = RandomUtils.getRandom().nextInt()
-    val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
-
-    // This done in front in a single-threaded fashion for now. Even though it doesn't require any
-    // memory beyond that is required to keep xi around, it still might be parallelized to backs
-    // for significantly big n and r. TODO
-    val s_o = omega.t %*% xi
-
-    val bcastS_o = drmBroadcast(s_o)
-    val bcastXi = drmBroadcast(xi)
-
-    var drmY = drmA.mapBlock(ncol = r) {
-      case (keys, blockA) =>
-        val s_o:Vector = bcastS_o
-        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
-        for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
-        keys -> blockY
-    }
-        // Checkpoint Y
-        .checkpoint()
-
-    var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
-
-    var s_q = drmQ.colSums()
-    var bcastVarS_q = drmBroadcast(s_q)
-
-    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
-    // still be identically partitioned.
-    var drmBt = (drmA.t %*% drmQ).checkpoint()
-
-    var s_b = (drmBt.t %*% xi).collect(::, 0)
-    var bcastVarS_b = drmBroadcast(s_b)
-
-    for (i <- 0 until q) {
-
-      // These closures don't seem to live well with outside-scope vars. This doesn't record closure
-      // attributes correctly. So we create additional set of vals for broadcast vars to properly 
-      // create readonly closure attributes in this very scope.
-      val bcastS_q = bcastVarS_q
-      val bcastS_b = bcastVarS_b
-      val bcastXib = bcastXi
-
-      // Fix Bt as B' -= xi cross s_q
-      drmBt = drmBt.mapBlock() {
-        case (keys, block) =>
-          val s_q: Vector = bcastS_q
-          val xi: Vector = bcastXib
-          keys.zipWithIndex.foreach {
-            case (key, idx) => block(idx, ::) -= s_q * xi(key)
-          }
-          keys -> block
-      }
-
-      drmY.uncache()
-      drmQ.uncache()
-
-      drmY = (drmA %*% drmBt)
-          // Fix Y by subtracting s_b from each row of the AB'
-          .mapBlock() {
-        case (keys, block) =>
-          val s_b: Vector = bcastS_b
-          for (row <- 0 until block.nrow) block(row, ::) -= s_b
-          keys -> block
-      }
-          // Checkpoint Y
-          .checkpoint()
-
-      drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
-
-      s_q = drmQ.colSums()
-      bcastVarS_q = drmBroadcast(s_q)
-
-      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
-      // identically partitioned anymore.
-      drmBt = (drmA.t %*% drmQ).checkpoint()
-
-      s_b = (drmBt.t %*% xi).collect(::, 0)
-      bcastVarS_b = drmBroadcast(s_b)
-    }
-
-    val c = s_q cross s_b
-    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
-        c - c.t + (s_q cross s_q) * (xi dot xi)
-    val (inCoreUHat, d) = eigen(inCoreBBt)
-    val s = d.sqrt
-
-    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
-    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
-    val drmU = drmQ %*% inCoreUHat
-    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
-
-    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
deleted file mode 100644
index 0da9ec7..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.mahout.math.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-
-object DSSVD {
-
-  /**
-   * Distributed Stochastic Singular Value decomposition algorithm.
-   *
-   * @param A input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
-   *         e.g. save them to hdfs in order to trigger their computation.
-   */
-  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
-  (DrmLike[K], DrmLike[Int], Vector) = {
-
-    val drmA = A.checkpoint()
-
-    val m = drmA.nrow
-    val n = drmA.ncol
-    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
-    val pfxed = safeToNonNegInt((m min n) - k min p)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    // We represent Omega by its seed.
-    val omegaSeed = RandomUtils.getRandom().nextInt()
-
-    // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
-    // instantiate the Omega random matrix view in the backend instead. That way serialized closure
-    // is much more compact.
-    var drmY = drmA.mapBlock(ncol = r) {
-      case (keys, blockA) =>
-        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
-        keys -> blockY
-    }
-
-    var drmQ = dqrThin(drmY.checkpoint())._1
-    // Checkpoint Q if last iteration
-    if (q == 0) drmQ = drmQ.checkpoint()
-
-    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
-    // still be identically partitioned.
-    var drmBt = drmA.t %*% drmQ
-    // Checkpoint B' if last iteration
-    if (q == 0) drmBt = drmBt.checkpoint()
-
-    for (i <- 0  until q) {
-      drmY = drmA %*% drmBt
-      drmQ = dqrThin(drmY.checkpoint())._1
-      // Checkpoint Q if last iteration
-      if (i == q - 1) drmQ = drmQ.checkpoint()
-
-      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
-      // identically partitioned anymore.
-      drmBt = drmA.t %*% drmQ
-      // Checkpoint B' if last iteration
-      if (i == q - 1) drmBt = drmBt.checkpoint()
-    }
-
-    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect
-    val (inCoreUHat, d) = eigen(inCoreBBt)
-    val s = d.sqrt
-
-    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
-    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
-    val drmU = drmQ %*% inCoreUHat
-    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
-
-    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
index d07172a..da7c0d5 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
@@ -19,17 +19,24 @@ package org.apache.mahout.math.drm.logical
 
 import scala.reflect.ClassTag
 import org.apache.mahout.math.drm.DrmLike
+import scala.util.Random
 
 /** DRM elementwise operator */
 case class OpAewB[K: ClassTag](
     override var A: DrmLike[K],
     override var B: DrmLike[K],
-    val op: Char
+    val op: String
     ) extends AbstractBinaryOp[K, K, K] {
 
+
+
   assert(A.ncol == B.ncol, "arguments must have same number of columns")
   assert(A.nrow == B.nrow, "arguments must have same number of rows")
 
+  override protected[mahout] lazy val partitioningTag: Long =
+    if (A.partitioningTag == B.partitioningTag) A.partitioningTag
+    else Random.nextLong()
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
index 8e4362d..7299d9e 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
@@ -21,16 +21,18 @@ import scala.reflect.ClassTag
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm.{BlockMapFunc, DrmLike}
+import scala.util.Random
 
 class OpMapBlock[S: ClassTag, R: ClassTag](
     override var A: DrmLike[S],
     val bmf: BlockMapFunc[S, R],
     val _ncol: Int = -1,
-    val _nrow: Long = -1
+    val _nrow: Long = -1,
+    identicallyPartitioned:Boolean
     ) extends AbstractUnaryOp[S, R] {
 
-
-  override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+  override protected[mahout] lazy val partitioningTag: Long =
+    if (identicallyPartitioned) A.partitioningTag else Random.nextLong()
 
   /** R-like syntax for number of rows. */
   def nrow: Long = if (_nrow >= 0) _nrow else A.nrow

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index 768bb1c..02e8b7a 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -18,7 +18,9 @@
 package org.apache.mahout.math
 
 import scala.reflect.ClassTag
-import org.apache.mahout.math.drm.decompositions.{DSPCA, DSSVD, DQR}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.decompositions.{DSSVD, DSPCA, DQR}
 
 package object drm {
 
@@ -70,56 +72,45 @@ package object drm {
       (implicit ctx: DistributedContext): CheckpointedDrm[Long] = ctx.drmParallelizeEmptyLong(nrow, ncol, numPartitions)
 
   /** Implicit broadcast -> value conversion. */
-  implicit def bcast2val[T](bcast:BCast[T]):T = bcast.value
+  implicit def bcast2val[T](bcast: BCast[T]): T = bcast.value
 
   /** Just throw all engine operations into context as well. */
-  implicit def ctx2engine(ctx:DistributedContext):DistributedEngine = ctx.engine
+  implicit def ctx2engine(ctx: DistributedContext): DistributedEngine = ctx.engine
 
   implicit def drm2drmCpOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedOps[K] =
     new CheckpointedOps[K](drm)
 
-  implicit def drm2Checkpointed[K](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint()
-
-  // ============== Decompositions ===================
-
-  /**
-   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
-   * controlled (<5000 or so). <P>
-   *
-   * It is recommended to checkpoint A since it does two passes over it. <P>
-   *
-   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
-   * their RDD should be able to zip successfully.
-   */
-  def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) =
-    DQR.dqrThin(A, checkRankDeficiency)
-
   /**
-   * Distributed Stochastic Singular Value decomposition algorithm.
-   *
-   * @param A input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
-   *         e.g. save them to hdfs in order to trigger their computation.
+   * We assume that whenever computational action is invoked without explicit checkpoint, the user
+   * doesn't imply caching
    */
-  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
-  (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(A, k, p, q)
-
-  /**
-   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
-   * document of the MAHOUT-817.
-   *
-   * @param A input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations (hint: use either 0 or 1)
-   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
-   *         e.g. save them to hdfs in order to trigger their computation.
-   */
-  def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
-  (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(A, k, p, q)
-
+  implicit def drm2Checkpointed[K: ClassTag](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint(CacheHint.NONE)
+
+  /** Implicit conversion to in-core with NONE caching of the result. */
+  implicit def drm2InCore[K: ClassTag](drm: DrmLike[K]): Matrix = drm.collect
+
+  /** Do vertical concatenation of collection of blockified tuples */
+  def rbind[K: ClassTag](blocks: Iterable[BlockifiedDrmTuple[K]]): BlockifiedDrmTuple[K] = {
+    assert(blocks.nonEmpty, "rbind: 0 blocks passed in")
+    if (blocks.size == 1) {
+      // No coalescing required.
+      blocks.head
+    } else {
+      // compute total number of rows in a new block
+      val m = blocks.view.map(_._2.nrow).sum
+      val n = blocks.head._2.ncol
+      val coalescedBlock = blocks.head._2.like(m, n)
+      val coalescedKeys = new Array[K](m)
+      var row = 0
+      for (elem <- blocks.view) {
+        val block = elem._2
+        val rowEnd = row + block.nrow
+        coalescedBlock(row until rowEnd, ::) := block
+        elem._1.copyToArray(coalescedKeys, row)
+        row = rowEnd
+      }
+      coalescedKeys -> coalescedBlock
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
index a872240..48c2048 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
@@ -51,6 +51,8 @@ class MatrixOps(val m: Matrix) {
 
   def -(that: Matrix) = cloned -= that
 
+  def -:(that: Matrix) = that - m
+
   // m.plus(that)?
 
   def +(that: Double) = cloned += that

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
index 23c2ac2..af53c51 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeMatrixOps.scala
@@ -20,7 +20,7 @@ import org.apache.mahout.math.{Vector, Matrix}
 import scala.collection.JavaConversions._
 import RLikeOps._
 
-class RLikeMatrixOps(_m: Matrix) extends MatrixOps(_m) {
+class RLikeMatrixOps(m: Matrix) extends MatrixOps(m) {
 
   /**
    * matrix-matrix multiplication
@@ -47,12 +47,14 @@ class RLikeMatrixOps(_m: Matrix) extends MatrixOps(_m) {
 
   def *(that: Double) = cloned *= that
 
-  def /(that:Matrix) = cloned /= that
+  def /(that: Matrix) = cloned /= that
 
-  def /(that:Double) = cloned /= that
+  def /:(that: Matrix) = that / m
+
+  def /(that: Double) = cloned /= that
 
   /** 1.0 /: A is eqivalent to R's 1.0/A */
-  def /:(that:Double) = that /=: cloned
+  def /:(that: Double) = that /=: cloned
 
   /**
    * in-place Hadamard product. We probably don't want to use assign
@@ -82,7 +84,7 @@ class RLikeMatrixOps(_m: Matrix) extends MatrixOps(_m) {
   }
 
   /** 1.0 /=: A is equivalent to A = 1.0/A in R */
-  def /=:(that:Double) = {
+  def /=:(that: Double) = {
     m.foreach(that /=: _.vector())
     m
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
deleted file mode 100644
index 80385a3..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.decompositions
-
-import scala.math._
-import org.apache.mahout.math.{Matrices, Matrix}
-import org.apache.mahout.common.RandomUtils
-import org.apache.log4j.Logger
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-
-private[math] object SSVD {
-
-  private val log = Logger.getLogger(SSVD.getClass)
-
-  /**
-   * In-core SSVD algorithm.
-   *
-   * @param a input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s)
-   */
-  def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
-    val m = a.nrow
-    val n = a.ncol
-    if (k > min(m, n))
-      throw new IllegalArgumentException(
-        "k cannot be greater than smaller of m,n")
-    val pfxed = min(p, min(m, n) - k)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    val rnd = RandomUtils.getRandom
-    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
-    var y = a %*% omega
-    var yty = y.t %*% y
-    val at = a.t
-    var ch = chol(yty)
-    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
-    var bt = ch.solveRight(at %*% y)
-
-    // Power iterations
-    for (i <- 0 until q) {
-      y = a %*% bt
-      yty = y.t %*% y
-      ch = chol(yty)
-      bt = ch.solveRight(at %*% y)
-    }
-
-    val bbt = bt.t %*% bt
-    val (uhat, d) = eigen(bbt)
-
-    val s = d.sqrt
-    val u = ch.solveRight(y) %*% uhat
-    val v = bt %*% (uhat %*%: diagv(1 /: s))
-
-    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
-  }
-
-  /**
-   * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
-   * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
-   * to save some memory for sparse inputs by removing direct mean subtraction.<P>
-   *
-   * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
-   * If retaining distances and orignal scaled variances not that important, the normalized PCA space
-   * is just U.
-   *
-   * Important: data points are considered to be rows.
-   *
-   * @param a input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s)
-   */
-  def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
-    val m = a.nrow
-    val n = a.ncol
-    if (k > min(m, n))
-      throw new IllegalArgumentException(
-        "k cannot be greater than smaller of m,n")
-    val pfxed = min(p, min(m, n) - k)
-
-    // Actual decomposition rank
-    val r = k + pfxed
-
-    val rnd = RandomUtils.getRandom
-    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
-    // Dataset mean
-    val xi = a.colMeans()
-
-    if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
-
-    var y = a %*% omega
-
-    // Fixing y
-    val s_o = omega.t %*% xi
-    y := ((r,c,v) => v - s_o(c))
-
-    var yty = y.t %*% y
-    var ch = chol(yty)
-//    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
-
-    // This is implicit Q of QR(Y)
-    var qm = ch.solveRight(y)
-    var bt = a.t %*% qm
-    var s_q = qm.colSums()
-    var s_b = bt.t %*% xi
-
-    // Power iterations
-    for (i <- 0 until q) {
-
-      // Fix bt
-      bt -= xi cross s_q
-
-      y = a %*% bt
-
-      // Fix Y again.
-      y := ((r,c,v) => v - s_b(c))
-
-      yty = y.t %*% y
-      ch = chol(yty)
-      qm = ch.solveRight(y)
-      bt = a.t %*% qm
-      s_q = qm.colSums()
-      s_b = bt.t %*% xi
-    }
-
-    val c = s_q cross s_b
-
-    // BB' computation becomes
-    val bbt = bt.t %*% bt -c - c.t +  (s_q cross s_q) * (xi dot xi)
-
-    val (uhat, d) = eigen(bbt)
-
-    val s = d.sqrt
-    val u = qm %*% uhat
-    val v = bt %*% (uhat %*%: diagv(1 /: s))
-
-    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
index 4599146..2b7773b 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
@@ -281,26 +281,5 @@ package object scalabindings {
     x(::, 0)
   }
 
-  def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = SSVD.ssvd(a, k, p, q)
-
-  /**
-   * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
-   * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
-   * to save some memory for sparse inputs by removing direct mean subtraction.<P>
-   *
-   * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
-   * If retaining distances and orignal scaled variances not that important, the normalized PCA space
-   * is just U.
-   *
-   * Important: data points are considered to be rows.
-   *
-   * @param a input matrix A
-   * @param k request SSVD rank
-   * @param p oversampling parameter
-   * @param q number of power iterations
-   * @return (U,V,s)
-   */
-  def spca(a: Matrix, k: Int, p: Int = 15, q: Int = 0) =
-    SSVD.spca(a = a, k = k, p = p, q = q)
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
index d1171cc..b10cde3 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
@@ -193,82 +193,6 @@ class MathSuite extends FunSuite with MahoutSuite {
 
   }
 
-  test("ssvd") {
-    val a = dense(
-      (1, 2, 3),
-      (3, 4, 5),
-      (-2, 6, 7),
-      (-3, 8, 9)
-    )
-
-    val rank = 2
-    val (u, v, s) = ssvd(a, k = rank, q = 1)
-
-    val (uControl, vControl, sControl) = svd(a)
-
-    printf("U:\n%s\n", u)
-    printf("U-control:\n%s\n", uControl)
-    printf("V:\n%s\n", v)
-    printf("V-control:\n%s\n", vControl)
-    printf("Sigma:\n%s\n", s)
-    printf("Sigma-control:\n%s\n", sControl)
-
-    (s - sControl(0 until rank)).norm(2) should be < 1E-7
-
-    // Singular vectors may be equivalent down to a sign only.
-    (u.norm - uControl(::, 0 until rank).norm).abs should be < 1E-7
-    (v.norm - vControl(::, 0 until rank).norm).abs should be < 1E-7
-
-  }
-
-  test("spca") {
-
-    import math._
-
-    val rnd = RandomUtils.getRandom
-
-    // Number of points
-    val m =  500
-    // Length of actual spectrum
-    val spectrumLen = 40
-
-    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
-    printf("spectrum:%s\n", spectrum)
-
-    val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
-        ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
-
-    // PCA Rotation matrix -- should also be orthonormal.
-    val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
-
-    val input = (u %*%: diagv(spectrum)) %*% tr.t
-
-    // Calculate just first 10 principal factors and reduce dimensionality.
-    // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
-    // ensure to zero stochastic error and assert only functional correctness of the method's pca-
-    // specific additions.
-    val k = 10
-    var (pca, _, s) = spca(a = input, k = k, p=spectrumLen, q = 1)
-    printf("Svs:%s\n",s)
-    // Un-normalized pca data:
-    pca = pca %*%: diagv(s)
-
-    // Of course, once we calculated the pca, the spectrum is going to be different since our originally
-    // generated input was not centered. So here, we'd just brute-solve pca to verify
-    val xi = input.colMeans()
-    for (r <- 0 until input.nrow) input(r, ::) -= xi
-    var (pcaControl, _, sControl) = svd(m = input)
-
-    printf("Svs-control:%s\n",sControl)
-    pcaControl = (pcaControl %*%: diagv(sControl))(::,0 until k)
-
-    printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
-    printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
-
-    (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
-
-  }
-
   test("random uniform") {
     val omega1 = Matrices.symmetricUniformView(2, 3, 1234)
     val omega2 = Matrices.symmetricUniformView(2, 3, 1234)

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 0c904ab..3a03e58 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -17,9 +17,9 @@
 
 package org.apache.mahout.sparkbindings
 
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import scalabindings._
 import RLikeOps._
-import org.apache.mahout.math.drm._
 import org.apache.mahout.math.drm.logical._
 import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput}
 import org.apache.mahout.math._
@@ -30,6 +30,8 @@ import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
 import scala.Some
 import scala.collection.JavaConversions._
 import org.apache.spark.SparkContext
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.RLikeDrmOps._
 
 /** Spark-specific non-drm-method operations */
 object SparkEngine extends DistributedEngine {
@@ -51,7 +53,17 @@ object SparkEngine extends DistributedEngine {
   }
 
   /** Engine-specific colMeans implementation based on a checkpoint. */
-  def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector = if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
+  override def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector =
+    if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
+
+  override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double =
+    drm.rdd
+        // Compute sum of squares of each vector
+        .map {
+      case (key, v) => v dot v
+    }
+        .reduce(_ + _)
+
 
   /**
    * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
@@ -123,7 +135,10 @@ object SparkEngine extends DistributedEngine {
 
     {
       implicit def getWritable(x: Any): Writable = val2key()
-      new CheckpointedDrmSpark(rdd.map(t => (key2val(t._1), t._2)))(km.asInstanceOf[ClassTag[Any]])
+      new CheckpointedDrmSpark(
+        rdd = rdd.map(t => (key2val(t._1), t._2)),
+        _cacheStorageLevel = StorageLevel.MEMORY_ONLY
+      )(km.asInstanceOf[ClassTag[Any]])
     }
   }
 
@@ -210,16 +225,8 @@ object SparkEngine extends DistributedEngine {
       case op@OpAtA(a) => AtA.at_a(op, tr2phys(a)(op.classTagA))
       case op@OpAx(a, x) => Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
       case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
-      case op@OpAewB(a, b, '+') => AewB.a_plus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAewB(a, b, '-') => AewB.a_minus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAewB(a, b, '*') => AewB.a_hadamard_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAewB(a, b, '/') => AewB.a_eldiv_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAewScalar(a, s, "+") => AewB.a_plus_scalar(op, tr2phys(a)(op.classTagA), s)
-      case op@OpAewScalar(a, s, "-") => AewB.a_minus_scalar(op, tr2phys(a)(op.classTagA), s)
-      case op@OpAewScalar(a, s, "-:") => AewB.scalar_minus_a(op, tr2phys(a)(op.classTagA), s)
-      case op@OpAewScalar(a, s, "*") => AewB.a_times_scalar(op, tr2phys(a)(op.classTagA), s)
-      case op@OpAewScalar(a, s, "/") => AewB.a_div_scalar(op, tr2phys(a)(op.classTagA), s)
-      case op@OpAewScalar(a, s, "/:") => AewB.scalar_div_a(op, tr2phys(a)(op.classTagA), s)
+      case op@OpAewB(a, b, opId) => AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAewScalar(a, s, _) => AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s)
       case op@OpRowRange(a, _) => Slicing.rowRange(op, tr2phys(a)(op.classTagA))
       case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
       // Custom operators, we just execute them

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 97873bd..1e3f286 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -65,16 +65,22 @@ object ABt {
     // Blockify everything.
     val blocksA = srcA.toBlockifiedDrmRdd()
 
-        // mark row-blocks with group id
+        // Mark row-blocks with group id
         .mapPartitionsWithIndex((part, iter) => {
-      val rowBlockId = part
-      val (blockKeys, block) = iter.next()
 
-      // Each partition must have exactly one block due to implementation guarantees of blockify()
-      iter.ensuring(!_.hasNext)
+      if (iter.isEmpty) {
+        Iterator.empty
+      } else {
+
+        val rowBlockId = part
+        val (blockKeys, block) = iter.next()
 
-      // the output is (row block id, array of row keys, and the matrix representing the block).
-      Iterator((rowBlockId, blockKeys, block))
+        // Each partition must have exactly one block due to implementation guarantees of blockify()
+        assert(!iter.hasNext, "Partition #%d is expected to have at most 1 block at AB'.".format(part))
+
+        // the output is (row block id, array of row keys, and the matrix representing the block).
+        Iterator((rowBlockId, blockKeys, block))
+      }
     })
 
     val blocksB = srcB.toBlockifiedDrmRdd()
@@ -96,7 +102,7 @@ object ABt {
 
 
     // The plan.
-    val blockifiedRdd :BlockifiedDrmRdd[K] = blocksA
+    var blockifiedRdd :BlockifiedDrmRdd[K] = blocksA
 
         // Build Cartesian. It may require a bit more memory there at tasks.
         .cartesian(blocksB)
@@ -119,7 +125,7 @@ object ABt {
         // Combine -- this is probably the most efficient
         .combineByKey[(Array[K],Matrix)](
 
-          createCombiner = (t:(Array[K],Array[Int],Matrix)) => t match {
+          createCombiner = (t: (Array[K], Array[Int], Matrix)) => t match {
             case (rowKeys, colKeys, blockProd) =>
 
               // Accumulator is a row-wise block of sparse vectors.
@@ -148,11 +154,20 @@ object ABt {
           mergeCombiners = (c1: (Array[K], Matrix), c2: (Array[K], Matrix)) => {
             c1._2 += c2._2
             c1
-          })
+          },
+
+          // Cartesian will tend to produce much more partitions that we actually need for co-grouping,
+          // and as a result, we may see empty partitions than we actually need.
+          numPartitions = numProductPartitions
+        )
 
         // Combine leaves residual block key -- we don't need that.
         .map(_._2)
 
+    // This may produce more than one block per partition. Most implementation rely on convention of
+    // having at most one block per partition.
+    blockifiedRdd = rbind(blockifiedRdd)
+
     new DrmRddInput(blockifiedSrc = Some(blockifiedRdd))
   }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index ec6e99e..384b986 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -24,67 +24,91 @@ import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.{Matrix, Vector}
 import org.apache.mahout.math.drm.logical.{OpAewScalar, OpAewB}
+import org.apache.log4j.Logger
+import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFuncScalar, ReduceFunc}
 
 /** Elementwise drm-drm operators */
 object AewB {
 
-  @inline
-  def a_plus_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] =
-    a_ew_b(op, srcA, srcB, reduceFunc = (a, b) => a += b)
+  private val log = Logger.getLogger(AewB.getClass)
 
-  @inline
-  def a_minus_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] =
-    a_ew_b(op, srcA, srcB, reduceFunc = (a, b) => a -= b)
+  /**
+   * Set to false to disallow in-place elementwise operations in case side-effects and non-idempotent
+   * computations become a problem.
+   */
+  final val PROPERTY_AEWB_INPLACE = "mahout.math.AewB.inplace"
 
-  @inline
-  def a_hadamard_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] =
-    a_ew_b(op, srcA, srcB, reduceFunc = (a, b) => a *= b)
+  type ReduceFunc = (Vector, Vector) => Vector
 
-  @inline
-  def a_eldiv_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] =
-    a_ew_b(op, srcA, srcB, reduceFunc = (a, b) => a /= b)
+  type ReduceFuncScalar = (Matrix, Double) => Matrix
 
-  @inline
-  def a_plus_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
-    a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => A += s)
-
-  @inline
-  def a_minus_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
-    a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => A -= s)
-
-  @inline
-  def scalar_minus_a[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
-    a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => s -=: A)
+  private[blas] def getEWOps() = {
+    val inplaceProp = System.getProperty(PROPERTY_AEWB_INPLACE, "true").toBoolean
+    if (inplaceProp) InplaceEWOps else CloningEWOps
+  }
 
-  @inline
-  def a_times_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
-    a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => A *= s)
+  /** Elementwise matrix-matrix operator, now handles both non- and identically partitioned */
+  def a_ew_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
 
-  @inline
-  def a_div_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
-    a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => A /= s)
+    val ewOps = getEWOps()
+    val opId = op.op
 
-  @inline
-  def scalar_div_a[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] =
-    a_ew_scalar(op, srcA, scalar, reduceFunc = (A, s) => s /=: A)
+    val reduceFunc = opId match {
+      case "+" => ewOps.plus
+      case "-" => ewOps.minus
+      case "*" => ewOps.hadamard
+      case "/" => ewOps.eldiv
+      case default => throw new IllegalArgumentException("Unsupported elementwise operator:%s.".format(opId))
+    }
 
-  /** Parallel way of this operation (this assumes different partitioning of the sources */
-  private[blas] def a_ew_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K],
-      reduceFunc: (Vector, Vector) => Vector): DrmRddInput[K] = {
     val a = srcA.toDrmRdd()
     val b = srcB.toDrmRdd()
-    val rdd = a
-        .cogroup(b, numPartitions = a.partitions.size max b.partitions.size)
-        .map({
-      case (key, (vectorSeqA, vectorSeqB)) =>
-        key -> reduceFunc(vectorSeqA.reduce(reduceFunc), vectorSeqB.reduce(reduceFunc))
-    })
+
+    // Check if A and B are identically partitioned AND keyed. if they are, then just perform zip
+    // instead of join, and apply the op map-side. Otherwise, perform join and apply the op
+    // reduce-side.
+    val rdd = if (op.isIdenticallyPartitioned(op.A)) {
+
+      log.debug("applying zipped elementwise")
+
+      a
+          .zip(b)
+          .map {
+        case ((keyA, vectorA), (keyB, vectorB)) =>
+          assert(keyA == keyB, "inputs are claimed identically partitioned, but they are not identically keyed")
+          keyA -> reduceFunc(vectorA, vectorB)
+      }
+    } else {
+
+      log.debug("applying elementwise as join")
+
+      a
+          .cogroup(b, numPartitions = a.partitions.size max b.partitions.size)
+          .map({
+        case (key, (vectorSeqA, vectorSeqB)) =>
+          key -> reduceFunc(vectorSeqA.reduce(reduceFunc), vectorSeqB.reduce(reduceFunc))
+      })
+    }
 
     new DrmRddInput(rowWiseSrc = Some(op.ncol -> rdd))
   }
 
-  private[blas] def a_ew_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double,
-      reduceFunc: (Matrix, Double) => Matrix): DrmRddInput[K] = {
+  /** Physical algorithm to handle matrix-scalar operators like A - s or s -: A */
+  def a_ew_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double):
+  DrmRddInput[K] = {
+
+    val ewOps = getEWOps()
+    val opId = op.op
+
+    val reduceFunc = opId match {
+      case "+" => ewOps.plusScalar
+      case "-" => ewOps.minusScalar
+      case "*" => ewOps.timesScalar
+      case "/" => ewOps.divScalar
+      case "-:" => ewOps.scalarMinus
+      case "/:" => ewOps.scalarDiv
+      case default => throw new IllegalArgumentException("Unsupported elementwise operator:%s.".format(opId))
+    }
     val a = srcA.toBlockifiedDrmRdd()
     val rdd = a
         .map({
@@ -92,6 +116,55 @@ object AewB {
     })
     new DrmRddInput[K](blockifiedSrc = Some(rdd))
   }
+}
+
+trait EWOps {
+
+  val plus: ReduceFunc
 
+  val minus: ReduceFunc
 
+  val hadamard: ReduceFunc
+
+  val eldiv: ReduceFunc
+
+  val plusScalar: ReduceFuncScalar
+
+  val minusScalar: ReduceFuncScalar
+
+  val scalarMinus: ReduceFuncScalar
+
+  val timesScalar: ReduceFuncScalar
+
+  val divScalar: ReduceFuncScalar
+
+  val scalarDiv: ReduceFuncScalar
+
+}
+
+object InplaceEWOps extends EWOps {
+  val plus: ReduceFunc = (a, b) => a += b
+  val minus: ReduceFunc = (a, b) => a -= b
+  val hadamard: ReduceFunc = (a, b) => a *= b
+  val eldiv: ReduceFunc = (a, b) => a /= b
+  val plusScalar: ReduceFuncScalar = (A, s) => A += s
+  val minusScalar: ReduceFuncScalar = (A, s) => A -= s
+  val scalarMinus: ReduceFuncScalar = (A, s) => s -=: A
+  val timesScalar: ReduceFuncScalar = (A, s) => A *= s
+  val divScalar: ReduceFuncScalar = (A, s) => A /= s
+  val scalarDiv: ReduceFuncScalar = (A, s) => s /=: A
 }
+
+object CloningEWOps extends EWOps {
+  val plus: ReduceFunc = (a, b) => a + b
+  val minus: ReduceFunc = (a, b) => a - b
+  val hadamard: ReduceFunc = (a, b) => a * b
+  val eldiv: ReduceFunc = (a, b) => a / b
+  val plusScalar: ReduceFuncScalar = (A, s) => A + s
+  val minusScalar: ReduceFuncScalar = (A, s) => A - s
+  val scalarMinus: ReduceFuncScalar = (A, s) => s -: A
+  val timesScalar: ReduceFuncScalar = (A, s) => A * s
+  val divScalar: ReduceFuncScalar = (A, s) => A / s
+  val scalarDiv: ReduceFuncScalar = (A, s) => s /: A
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 2d80fe3..1e2028d 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -19,8 +19,9 @@ package org.apache.mahout.sparkbindings.drm
 
 import org.apache.mahout.math._
 import math._
-import org.apache.mahout.math.scalabindings._
+import scalabindings._
 import RLikeOps._
+import drm._
 import scala.collection.JavaConversions._
 import org.apache.spark.storage.StorageLevel
 import reflect._
@@ -95,8 +96,9 @@ class CheckpointedDrmSpark[K: ClassTag](
 
     val intRowIndices = implicitly[ClassTag[K]] == implicitly[ClassTag[Int]]
 
-    val cols = rdd.map(_._2.length).fold(0)(max(_, _))
-    val rows = if (intRowIndices) rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1 else rdd.count().toInt
+    val cols = ncol
+    val rows = safeToNonNegInt(nrow)
+
 
     // since currently spark #collect() requires Serializeable support,
     // we serialize DRM vectors into byte arrays on backend and restore Vector

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
index 322d67c..37a9ac2 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
@@ -23,13 +23,14 @@ import scala.collection.JavaConversions._
 import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
 import org.apache.log4j.Logger
 import java.lang.Math
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{FilteredRDD, RDD}
 import scala.reflect.ClassTag
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.mahout.math.drm._
 import SparkContext._
+import org.apache.mahout.math
 
 
 package object drm {
@@ -56,8 +57,9 @@ package object drm {
 
     rdd.mapPartitions(iter => {
 
-      if (!iter.hasNext) Iterator.empty
-      else {
+      if (iter.isEmpty) {
+        Iterator.empty
+      } else {
 
         val data = iter.toIterable
         val keys = data.map(t => t._1).toArray[K]
@@ -70,24 +72,34 @@ package object drm {
     })
   }
 
+  /** Performs rbind() on all blocks inside same partition to ensure there's only one block here. */
+  private[sparkbindings] def rbind[K: ClassTag](rdd: BlockifiedDrmRdd[K]): BlockifiedDrmRdd[K] =
+    rdd.mapPartitions(iter => {
+      if (iter.isEmpty) {
+        Iterator.empty
+      } else {
+        Iterator(math.drm.rbind(iter.toIterable))
+      }
+    })
+
   private[sparkbindings] def deblockify[K: ClassTag](rdd: BlockifiedDrmRdd[K]): DrmRdd[K] =
 
   // Just flat-map rows, connect with the keys
-    rdd.flatMap({
+    rdd.flatMap {
       case (blockKeys: Array[K], block: Matrix) =>
 
         blockKeys.ensuring(blockKeys.size == block.nrow)
-        blockKeys.view.zipWithIndex.map({
+        blockKeys.view.zipWithIndex.map {
           case (key, idx) =>
-            var v = block(idx, ::)
+            var v = block(idx, ::) // This is just a view!
 
             // If a view rather than a concrete vector, clone into a concrete vector in order not to
             // attempt to serialize outer matrix when we save it (Although maybe most often this
             // copying is excessive?)
             // if (v.isInstanceOf[MatrixVectorView]) v = v.cloned
             key -> v
-        })
+        }
+    }
 
-    })
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index 34d81cf..c4ef0d3 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -29,6 +29,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.broadcast.Broadcast
 import org.apache.mahout.math.{VectorWritable, Vector, MatrixWritable, Matrix}
 import org.apache.hadoop.io.Writable
+import org.apache.spark.storage.StorageLevel
 
 /** Public api for Spark-specific operators */
 package object sparkbindings {
@@ -179,7 +180,8 @@ package object sparkbindings {
     new CheckpointedDrmSpark[K](
       rdd = rdd,
       _nrow = nrow,
-      _ncol = ncol
+      _ncol = ncol,
+      _cacheStorageLevel = StorageLevel.NONE
     )
 
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
index 0834145..52e2b35 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
@@ -33,13 +33,15 @@ class ABtSuite extends FunSuite with MahoutLocalContext {
   test("ABt") {
     val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
     val inCoreB = dense((3, 4, 5), (5, 6, 7))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val B = drmParallelize(m = inCoreB)
+    val A = drmParallelize(m = inCoreA, numPartitions = 3)
+    val B = drmParallelize(m = inCoreB, numPartitions = 2)
 
     val op = new OpABt(A, B)
 
     val drm = new CheckpointedDrmSpark(ABt.abt(op, srcA = A, srcB = B), op.nrow, op.ncol)
 
+    printf("AB' num partitions = %d.\n", drm.rdd.partitions.size)
+
     val inCoreMControl = inCoreA %*% inCoreB.t
     val inCoreM = drm.collect
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
index 2efa5ff..389ef65 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
@@ -36,9 +36,9 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
     val A = drmParallelize(m = inCoreA, numPartitions = 2)
     val B = drmParallelize(m = inCoreB)
 
-    val op = new OpAewB(A, B, '*')
+    val op = new OpAewB(A, B, "*")
 
-    val M = new CheckpointedDrmSpark(AewB.a_hadamard_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+    val M = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
 
     val inCoreM = M.collect
     val inCoreMControl = inCoreA * inCoreB
@@ -53,9 +53,9 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
     val A = drmParallelize(m = inCoreA, numPartitions = 2)
     val B = drmParallelize(m = inCoreB)
 
-    val op = new OpAewB(A, B, '+')
+    val op = new OpAewB(A, B, "+")
 
-    val M = new CheckpointedDrmSpark(AewB.a_plus_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+    val M = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
 
     val inCoreM = M.collect
     val inCoreMControl = inCoreA + inCoreB
@@ -70,9 +70,9 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
     val A = drmParallelize(m = inCoreA, numPartitions = 2)
     val B = drmParallelize(m = inCoreB)
 
-    val op = new OpAewB(A, B, '-')
+    val op = new OpAewB(A, B, "-")
 
-    val M = new CheckpointedDrmSpark(AewB.a_minus_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+    val M = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
 
     val inCoreM = M.collect
     val inCoreMControl = inCoreA - inCoreB
@@ -87,9 +87,9 @@ class AewBSuite extends FunSuite with MahoutLocalContext {
     val A = drmParallelize(m = inCoreA, numPartitions = 2)
     val B = drmParallelize(m = inCoreB)
 
-    val op = new OpAewB(A, B, '/')
+    val op = new OpAewB(A, B, "/")
 
-    val M = new CheckpointedDrmSpark(AewB.a_eldiv_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
+    val M = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = A, srcB = B), op.nrow, op.ncol)
 
     val inCoreM = M.collect
     val inCoreMControl = inCoreA / inCoreB

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
deleted file mode 100644
index 2369441..0000000
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.decompositions
-
-import org.scalatest.{Matchers, FunSuite}
-import org.apache.mahout.sparkbindings.test.MahoutLocalContext
-import org.apache.mahout.math._
-import drm._
-import scalabindings._
-import RLikeOps._
-import RLikeDrmOps._
-import org.apache.mahout.sparkbindings._
-import org.apache.mahout.common.RandomUtils
-
-class MathSuite extends FunSuite with Matchers with MahoutLocalContext {
-
-  test("thin distributed qr") {
-
-    val inCoreA = dense(
-      (1, 2, 3, 4),
-      (2, 3, 4, 5),
-      (3, -4, 5, 6),
-      (4, 5, 6, 7),
-      (8, 6, 7, 8)
-    )
-
-    val A = drmParallelize(inCoreA, numPartitions = 2)
-    val (drmQ, inCoreR) = dqrThin(A, checkRankDeficiency = false)
-
-    // Assert optimizer still knows Q and A are identically partitioned
-    drmQ.partitioningTag should equal (A.partitioningTag)
-
-    drmQ.rdd.partitions.size should be(A.rdd.partitions.size)
-
-    // Should also be zippable
-    drmQ.rdd.zip(other = A.rdd)
-
-    val inCoreQ = drmQ.collect
-
-    printf("A=\n%s\n", inCoreA)
-    printf("Q=\n%s\n", inCoreQ)
-    printf("R=\n%s\n", inCoreR)
-
-    val (qControl, rControl) = qr(inCoreA)
-    printf("qControl=\n%s\n", qControl)
-    printf("rControl=\n%s\n", rControl)
-
-    // Validate with Cholesky
-    val ch = chol(inCoreA.t %*% inCoreA)
-    printf("A'A=\n%s\n", inCoreA.t %*% inCoreA)
-    printf("L:\n%s\n", ch.getL)
-
-    val rControl2 = (ch.getL cloned).t
-    val qControl2 = ch.solveRight(inCoreA)
-    printf("qControl2=\n%s\n", qControl2)
-    printf("rControl2=\n%s\n", rControl2)
-
-    // Housholder approach seems to be a little bit more stable
-    (rControl - inCoreR).norm should be < 1E-5
-    (qControl - inCoreQ).norm should be < 1E-5
-
-    // Assert identicity with in-core Cholesky-based -- this should be tighter.
-    (rControl2 - inCoreR).norm should be < 1E-10
-    (qControl2 - inCoreQ).norm should be < 1E-10
-
-    // Assert orhtogonality:
-    // (a) Q[,j] dot Q[,j] == 1.0 for all j
-    // (b) Q[,i] dot Q[,j] == 0.0 for all i != j
-    for (col <- 0 until inCoreQ.ncol)
-      ((inCoreQ(::, col) dot inCoreQ(::, col)) - 1.0).abs should be < 1e-10
-    for (col1 <- 0 until inCoreQ.ncol - 1; col2 <- col1 + 1 until inCoreQ.ncol)
-      (inCoreQ(::, col1) dot inCoreQ(::, col2)).abs should be < 1e-10
-
-
-  }
-
-  test("dssvd - the naive-est - q=0") {
-    dssvdNaive(q = 0)
-  }
-
-  test("ddsvd - naive - q=1") {
-    dssvdNaive(q = 1)
-  }
-
-  test("ddsvd - naive - q=2") {
-    dssvdNaive(q = 2)
-  }
-
-
-  def dssvdNaive(q: Int) {
-    val inCoreA = dense(
-      (1, 2, 3, 4),
-      (2, 3, 4, 5),
-      (3, -4, 5, 6),
-      (4, 5, 6, 7),
-      (8, 6, 7, 8)
-    )
-    val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
-    val (drmU, drmV, s) = dssvd(drmA, k = 4, q = q)
-    val (inCoreU, inCoreV) = (drmU.collect, drmV.collect)
-
-    printf("U:\n%s\n", inCoreU)
-    printf("V:\n%s\n", inCoreV)
-    printf("Sigma:\n%s\n", s)
-
-    (inCoreA - (inCoreU %*%: diagv(s)) %*% inCoreV.t).norm should be < 1E-5
-  }
-
-  test("dspca") {
-
-    import math._
-
-    val rnd = RandomUtils.getRandom
-
-    // Number of points
-    val m =  500
-    // Length of actual spectrum
-    val spectrumLen = 40
-
-    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
-    printf("spectrum:%s\n", spectrum)
-
-    val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
-        ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
-
-    // PCA Rotation matrix -- should also be orthonormal.
-    val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
-
-    val input = (u %*%: diagv(spectrum)) %*% tr.t
-    val drmInput = drmParallelize(m = input, numPartitions = 2)
-
-    // Calculate just first 10 principal factors and reduce dimensionality.
-    // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
-    // ensure to zero stochastic error and assert only functional correctness of the method's pca-
-    // specific additions.
-    val k = 10
-
-    // Calculate just first 10 principal factors and reduce dimensionality.
-    var (drmPCA, _, s) = dspca(A = drmInput, k = 10, p = spectrumLen, q = 1)
-    // Un-normalized pca data:
-    drmPCA = drmPCA %*% diagv(s)
-
-    val pca = drmPCA.checkpoint(CacheHint.NONE).collect
-
-    // Of course, once we calculated the pca, the spectrum is going to be different since our originally
-    // generated input was not centered. So here, we'd just brute-solve pca to verify
-    val xi = input.colMeans()
-    for (r <- 0 until input.nrow) input(r, ::) -= xi
-    var (pcaControl, _, sControl) = svd(m = input)
-    pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
-
-    printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
-    printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
-
-    (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
-
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
index ac46a9e..6152426 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.mahout.sparkbindings.drm
 
 import org.scalatest.{Matchers, FunSuite}
 import org.apache.mahout.math._
+import decompositions._
 import scalabindings._
 import drm._
 import RLikeOps._
@@ -30,6 +31,7 @@ import org.apache.mahout.math.Matrices
 import org.apache.mahout.sparkbindings.{SparkEngine, blas}
 import org.apache.spark.storage.StorageLevel
 import org.apache.mahout.math.drm.logical.{OpAtx, OpAtB, OpAtA}
+import scala.util.Random
 
 /** R-like DRM DSL operation tests */
 class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
@@ -311,6 +313,41 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
     (inCoreC - inCoreCControl).norm should be < 1E-10
   }
 
+  test("C = A + B, identically partitioned") {
+
+    val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2)
+
+    printf("A.nrow=%d.\n",A.rdd.count())
+
+    // Create B which would be identically partitioned to A. mapBlock() by default will do the trick.
+    val B = A.mapBlock() {
+      case (keys, block) =>
+        val bBlock = block.like() := ((r,c,v) => util.Random.nextDouble())
+        keys -> bBlock
+    }
+        // Prevent repeated computation non-determinism
+        .checkpoint()
+
+    val inCoreB = B.collect
+
+    printf("A=\n%s\n", inCoreA)
+    printf("B=\n%s\n", inCoreB)
+
+    val C = A + B
+
+    val inCoreC = C.collect
+
+    printf("C=\n%s\n", inCoreC)
+
+    // Actual
+    val inCoreCControl = inCoreA + inCoreB
+
+    (inCoreC - inCoreCControl).norm should be < 1E-10
+  }
+
+
   test("C = A + B side test 1") {
 
     val inCoreA = dense((1, 2), (3, 4))

http://git-wip-us.apache.org/repos/asf/mahout/blob/656878ea/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
index 2d9ed76..d9e89bc 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
@@ -14,7 +14,7 @@ trait MahoutLocalContext extends MahoutSuite with LoggerConfiguration {
   override protected def beforeEach() {
     super.beforeEach()
 
-    mahoutCtx = mahoutSparkContext(masterUrl = "local[3]",
+    mahoutCtx = mahoutSparkContext(masterUrl = "local[2]",
       appName = "MahoutLocalContext",
       // Do not run MAHOUT_HOME jars in unit tests.
       addMahoutJars = false,


Mime
View raw message