mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlyubi...@apache.org
Subject [3/3] git commit: MAHOUT-1529 closes PR #1
Date Tue, 27 May 2014 19:14:53 GMT
MAHOUT-1529 closes PR #1

Squashed commit of the following:

commit e7b27280333fe12c94f3f5675c876f56e9e60728
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Tue May 27 11:01:13 2014 -0700

    License, comments

commit bfca581389d91fb9c41db8c49e256ff694335fd1
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Fri May 23 16:07:01 2014 -0700

    Fixing  shell

commit c3b8aa4bf7d8939aa0800ce12b7f63e01f9686b9
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Fri May 23 13:36:20 2014 -0700

    - MahoutContext

commit a6461ac22a46793ab0bcdfcafe827d808f9e1810
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Fri May 23 12:13:40 2014 -0700

    rebasing changes in MAHOUT-1529 branch on to new git master (applied thru selective patching, may, but hopefully, doesn't revert any commits in between in spark/ and math-scala/ modules


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/8714a0f7
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/8714a0f7
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/8714a0f7

Branch: refs/heads/master
Commit: 8714a0f722663ea5cb16c14c5b8a01e57574cd93
Parents: 5a1c2cc
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Authored: Tue May 27 12:13:29 2014 -0700
Committer: Dmitriy Lyubimov <dlyubimov@apache.org>
Committed: Tue May 27 12:13:29 2014 -0700

----------------------------------------------------------------------
 math-scala/pom.xml                              |   1 -
 .../org/apache/mahout/math/drm/BCast.scala      |  23 ++
 .../org/apache/mahout/math/drm/CacheHint.scala  |  19 ++
 .../mahout/math/drm/CheckpointedDrm.scala       |  36 +++
 .../mahout/math/drm/CheckpointedOps.scala       |  39 +++
 .../mahout/math/drm/DistributedContext.scala    |  27 +++
 .../mahout/math/drm/DistributedEngine.scala     | 158 ++++++++++++
 .../org/apache/mahout/math/drm/DrmLike.scala    |  48 ++++
 .../org/apache/mahout/math/drm/DrmLikeOps.scala |  84 +++++++
 .../apache/mahout/math/drm/RLikeDrmOps.scala    |  92 +++++++
 .../mahout/math/drm/decompositions/DQR.scala    |  57 +++++
 .../mahout/math/drm/decompositions/DSPCA.scala  | 153 ++++++++++++
 .../mahout/math/drm/decompositions/DSSVD.scala  |  83 +++++++
 .../math/drm/logical/AbstractBinaryOp.scala     |  37 +++
 .../math/drm/logical/AbstractUnaryOp.scala      |  36 +++
 .../math/drm/logical/CheckpointAction.scala     |  47 ++++
 .../apache/mahout/math/drm/logical/OpAB.scala   |  41 ++++
 .../mahout/math/drm/logical/OpABAnyKey.scala    |  41 ++++
 .../apache/mahout/math/drm/logical/OpABt.scala  |  42 ++++
 .../apache/mahout/math/drm/logical/OpAewB.scala |  39 +++
 .../mahout/math/drm/logical/OpAewScalar.scala   |  38 +++
 .../apache/mahout/math/drm/logical/OpAt.scala   |  33 +++
 .../apache/mahout/math/drm/logical/OpAtA.scala  |  36 +++
 .../mahout/math/drm/logical/OpAtAnyKey.scala    |  34 +++
 .../apache/mahout/math/drm/logical/OpAtB.scala  |  42 ++++
 .../apache/mahout/math/drm/logical/OpAtx.scala  |  41 ++++
 .../apache/mahout/math/drm/logical/OpAx.scala   |  42 ++++
 .../mahout/math/drm/logical/OpMapBlock.scala    |  41 ++++
 .../mahout/math/drm/logical/OpRowRange.scala    |  36 +++
 .../math/drm/logical/OpTimesLeftMatrix.scala    |  43 ++++
 .../math/drm/logical/OpTimesRightMatrix.scala   |  46 ++++
 .../org/apache/mahout/math/drm/package.scala    | 125 ++++++++++
 .../apache/mahout/math/scalabindings/SSVD.scala | 165 -------------
 .../scalabindings/decompositions/SSVD.scala     | 165 +++++++++++++
 .../mahout/math/scalabindings/package.scala     |   1 +
 .../sparkbindings/shell/MahoutSparkILoop.scala  |  33 ++-
 .../mahout/sparkbindings/shell/Main.scala       |  17 ++
 spark-shell/src/test/mahout/simple.mscala       |  29 ++-
 spark/pom.xml                                   |  95 ++++----
 .../sparkbindings/SparkDistributedContext.scala |  30 +++
 .../mahout/sparkbindings/SparkEngine.scala      | 240 +++++++++++++++++++
 .../apache/mahout/sparkbindings/blas/ABt.scala  |   5 +-
 .../apache/mahout/sparkbindings/blas/AewB.scala |   2 +-
 .../mahout/sparkbindings/blas/AinCoreB.scala    |  15 +-
 .../apache/mahout/sparkbindings/blas/At.scala   |   2 +-
 .../apache/mahout/sparkbindings/blas/AtA.scala  |   4 +-
 .../apache/mahout/sparkbindings/blas/AtB.scala  |   3 +-
 .../apache/mahout/sparkbindings/blas/Ax.scala   |  19 +-
 .../mahout/sparkbindings/blas/DrmRddOps.scala   |   2 +-
 .../mahout/sparkbindings/blas/MapBlock.scala    |  43 ++++
 .../mahout/sparkbindings/blas/Slicing.scala     |   2 +-
 .../mahout/sparkbindings/blas/package.scala     |   1 -
 .../sparkbindings/decompositions/DQR.scala      |  56 -----
 .../sparkbindings/decompositions/DSPCA.scala    | 153 ------------
 .../sparkbindings/decompositions/DSSVD.scala    |  83 -------
 .../mahout/sparkbindings/drm/CacheHint.scala    |  20 --
 .../sparkbindings/drm/CheckpointedDrm.scala     |  39 ---
 .../sparkbindings/drm/CheckpointedDrmBase.scala | 161 -------------
 .../drm/CheckpointedDrmSpark.scala              | 164 +++++++++++++
 .../drm/CheckpointedDrmSparkOps.scala           |  16 ++
 .../sparkbindings/drm/CheckpointedOps.scala     |  63 -----
 .../mahout/sparkbindings/drm/DrmLike.scala      |  46 ----
 .../mahout/sparkbindings/drm/DrmLikeOps.scala   |  85 -------
 .../mahout/sparkbindings/drm/DrmRddInput.scala  |   1 +
 .../mahout/sparkbindings/drm/RLikeDrmOps.scala  |  98 --------
 .../mahout/sparkbindings/drm/SparkBCast.scala   |  25 ++
 .../mahout/sparkbindings/drm/package.scala      | 226 ++---------------
 .../drm/plan/AbstractBinaryOp.scala             |  37 ---
 .../drm/plan/AbstractUnaryOp.scala              |  34 ---
 .../drm/plan/CheckpointAction.scala             | 207 ----------------
 .../mahout/sparkbindings/drm/plan/OpAB.scala    |  43 ----
 .../sparkbindings/drm/plan/OpABAnyKey.scala     |  41 ----
 .../mahout/sparkbindings/drm/plan/OpABt.scala   |  43 ----
 .../mahout/sparkbindings/drm/plan/OpAewB.scala  |  39 ---
 .../sparkbindings/drm/plan/OpAewScalar.scala    |  38 ---
 .../mahout/sparkbindings/drm/plan/OpAt.scala    |  34 ---
 .../mahout/sparkbindings/drm/plan/OpAtA.scala   |  37 ---
 .../sparkbindings/drm/plan/OpAtAnyKey.scala     |  34 ---
 .../mahout/sparkbindings/drm/plan/OpAtB.scala   |  42 ----
 .../mahout/sparkbindings/drm/plan/OpAtx.scala   |  42 ----
 .../mahout/sparkbindings/drm/plan/OpAx.scala    |  42 ----
 .../sparkbindings/drm/plan/OpMapBlock.scala     |  58 -----
 .../sparkbindings/drm/plan/OpRowRange.scala     |  37 ---
 .../drm/plan/OpTimesLeftMatrix.scala            |  44 ----
 .../drm/plan/OpTimesRightMatrix.scala           |  46 ----
 .../mahout/sparkbindings/drm/plan/package.scala |  24 --
 .../io/MahoutKryoRegistrator.scala              |   6 +-
 .../apache/mahout/sparkbindings/package.scala   |  61 ++++-
 .../mahout/sparkbindings/blas/ABtSuite.scala    |   7 +-
 .../mahout/sparkbindings/blas/AewBSuite.scala   |  14 +-
 .../mahout/sparkbindings/blas/AtASuite.scala    |   4 +-
 .../mahout/sparkbindings/blas/AtSuite.scala     |  12 +-
 .../decompositions/MathSuite.scala              |   7 +-
 .../sparkbindings/drm/DrmLikeOpsSuite.scala     |   6 +-
 .../mahout/sparkbindings/drm/DrmLikeSuite.scala |   6 +-
 .../sparkbindings/drm/RLikeDrmOpsSuite.scala    |  30 +--
 .../sparkbindings/test/MahoutLocalContext.scala |   7 +-
 97 files changed, 2607 insertions(+), 2244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index 4604b7d..95fe2c7 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -167,7 +167,6 @@
       <artifactId>mahout-math</artifactId>
     </dependency>
 
-
     <!--  3rd-party -->
     <dependency>
       <groupId>log4j</groupId>

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
new file mode 100644
index 0000000..850614457
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+/** Broadcast variable abstraction */
+trait BCast[T] {
+  def value:T
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
new file mode 100644
index 0000000..ac763f9
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
@@ -0,0 +1,19 @@
+package org.apache.mahout.math.drm
+
+object CacheHint extends Enumeration {
+
+  type CacheHint = Value
+
+  val NONE,
+  DISK_ONLY,
+  DISK_ONLY_2,
+  MEMORY_ONLY,
+  MEMORY_ONLY_2,
+  MEMORY_ONLY_SER,
+  MEMORY_ONLY_SER_2,
+  MEMORY_AND_DISK,
+  MEMORY_AND_DISK_2,
+  MEMORY_AND_DISK_SER,
+  MEMORY_AND_DISK_SER_2 = Value
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
new file mode 100644
index 0000000..0266944
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import org.apache.mahout.math.Matrix
+
+/**
+ * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be
+ * therefore collected or saved.
+ * @tparam K matrix key type (e.g. the keys of sequence files once persisted)
+ */
+trait CheckpointedDrm[K] extends DrmLike[K] {
+
+  def collect: Matrix
+
+  def writeDRM(path: String)
+
+  /** If this checkpoint is already declared cached, uncache. */
+  def uncache()
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
new file mode 100644
index 0000000..fa1ccfd
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Vector
+
+
+/**
+ * Additional experimental operations over CheckpointedDRM implementation. I will possibly move them up to
+ * the DRMBase once they stabilize.
+ *
+ */
+class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
+
+
+  /** Column sums. At this point this runs on checkpoint and collects in-core vector. */
+  def colSums(): Vector = drm.context.colSums(drm)
+
+  /** Column Means */
+  def colMeans(): Vector = drm.context.colMeans(drm)
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
new file mode 100644
index 0000000..39bab90
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import java.io.Closeable
+
+/** Distributed context (a.k.a. distributed session handle) */
+trait DistributedContext extends Closeable {
+
+  val engine:DistributedEngine
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
new file mode 100644
index 0000000..0e76d87
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import logical._
+import org.apache.mahout.math.{Matrix, Vector}
+import DistributedEngine._
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+
+/** Abstraction of optimizer/distributed engine */
+trait DistributedEngine {
+
+  /**
+   * First optimization pass. Return physical plan that we can pass to exec(). This rewrite may
+   * introduce logical constructs (including engine-specific ones) that user DSL cannot even produce
+   * per se.
+   * <P>
+   *   
+   * A particular physical engine implementation may choose to either use the default rewrites or
+   * build its own rewriting rules.
+   * <P>
+   */
+  def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = pass3(pass2(pass1(action)))
+
+  /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */
+  def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K]
+
+  /** Engine-specific colSums implementation based on a checkpoint. */
+  def colSums[K:ClassTag](drm:CheckpointedDrm[K]):Vector
+
+  /** Engine-specific colMeans implementation based on a checkpoint. */
+  def colMeans[K:ClassTag](drm:CheckpointedDrm[K]):Vector
+
+  /** Broadcast support */
+  def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector]
+
+  /** Broadcast support */
+  def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix]
+
+  /** Load DRM from hdfs (as in Mahout DRM format) */
+  def drmFromHDFS (path: String)(implicit sc: DistributedContext): CheckpointedDrm[_]
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+  def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
+      (implicit sc: DistributedContext): CheckpointedDrm[Int]
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+  def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
+      (implicit sc: DistributedContext): CheckpointedDrm[String]
+
+  /** This creates an empty DRM with specified number of partitions and cardinality. */
+  def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
+      (implicit sc: DistributedContext): CheckpointedDrm[Int]
+
+  /** Creates empty DRM with non-trivial height */
+  def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
+      (implicit sc: DistributedContext): CheckpointedDrm[Long]
+}
+
+object DistributedEngine {
+
+  /** This is mostly multiplication operations rewrites */
+  private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+
+    action match {
+      case OpAB(OpAt(a), b) if (a == b) => OpAtA(pass1(a))
+      case OpABAnyKey(OpAtAnyKey(a), b) if (a == b) => OpAtA(pass1(a))
+
+      // For now, rewrite left-multiply via transpositions, i.e.
+      // inCoreA %*% B = (B' %*% inCoreA')'
+      case op@OpTimesLeftMatrix(a, b) =>
+        OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t))
+
+      // Stop at checkpoints
+      case cd: CheckpointedDrm[_] => action
+
+      // For everything else we just pass-thru the operator arguments to optimizer
+      case uop: AbstractUnaryOp[_, K] =>
+        uop.A = pass1(uop.A)(uop.classTagA)
+        uop
+      case bop: AbstractBinaryOp[_, _, K] =>
+        bop.A = pass1(bop.A)(bop.classTagA)
+        bop.B = pass1(bop.B)(bop.classTagB)
+        bop
+    }
+  }
+
+  /** This would remove stuff like A.t.t that previous step may have created */
+  private def pass2[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+    action match {
+      // A.t.t => A
+      case OpAt(top@OpAt(a)) => pass2(a)(top.classTagA)
+
+      // Stop at checkpoints
+      case cd: CheckpointedDrm[_] => action
+
+      // For everything else we just pass-thru the operator arguments to optimizer
+      case uop: AbstractUnaryOp[_, K] =>
+        uop.A = pass2(uop.A)(uop.classTagA)
+        uop
+      case bop: AbstractBinaryOp[_, _, K] =>
+        bop.A = pass2(bop.A)(bop.classTagA)
+        bop.B = pass2(bop.B)(bop.classTagB)
+        bop
+    }
+  }
+
+  /** Some further rewrites that are conditioned on A.t.t removal */
+  private def pass3[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+    action match {
+
+      // matrix products.
+      case OpAB(a, OpAt(b)) => OpABt(pass3(a), pass3(b))
+
+      // AtB cases that make sense.
+      case OpAB(OpAt(a), b) if (a.partitioningTag == b.partitioningTag) => OpAtB(pass3(a), pass3(b))
+      case OpABAnyKey(OpAtAnyKey(a), b) => OpAtB(pass3(a), pass3(b))
+
+      // Need some cost to choose between the following.
+
+      case OpAB(OpAt(a), b) => OpAtB(pass3(a), pass3(b))
+      //      case OpAB(OpAt(a), b) => OpAt(OpABt(OpAt(pass1(b)), pass1(a)))
+      case OpAB(a, b) => OpABt(pass3(a), OpAt(pass3(b)))
+      // Rewrite A'x
+      case op@OpAx(op1@OpAt(a), x) => OpAtx(pass3(a)(op1.classTagA), x)
+
+      // Stop at checkpoints
+      case cd: CheckpointedDrm[_] => action
+
+      // For everything else we just pass-thru the operator arguments to optimizer
+      case uop: AbstractUnaryOp[_, K] =>
+        uop.A = pass3(uop.A)(uop.classTagA)
+        uop
+      case bop: AbstractBinaryOp[_, _, K] =>
+        bop.A = pass3(bop.A)(bop.classTagA)
+        bop.B = pass3(bop.B)(bop.classTagB)
+        bop
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
new file mode 100644
index 0000000..8e0db1e
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+
+/**
+ *
+ * Basic spark DRM trait.
+ *
+ * Since we already call the package "sparkbindings", I will not use stem "spark" with classes in
+ * this package. Spark backing is already implied.
+ *
+ */
+trait DrmLike[K] {
+
+  protected[mahout] def partitioningTag:Long
+
+  protected[mahout] val context:DistributedContext
+
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long
+
+  /** R-like syntax for number of columns */
+  def ncol: Int
+
+  /**
+   * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
+   * and writing down Spark graph lineage since last checkpointed DRM.
+   */
+  def checkpoint(cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): CheckpointedDrm[K]
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
new file mode 100644
index 0000000..35e28af
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.drm.logical.{OpMapBlock, OpRowRange}
+
+/** Common Drm ops */
+class DrmLikeOps[K : ClassTag](protected[drm] val drm: DrmLike[K]) {
+
+  /**
+   * Map matrix block-wise vertically. Blocks of the new matrix can be modified original block
+   * matrices; or they could be completely new matrices with new keyset. In the latter case, output
+   * matrix width must be specified with <code>ncol</code> parameter.<P>
+   *
+   * New block heights must be of the same height as the original geometry.<P>
+   *
+   * @param ncol new matrix' width (only needed if width changes).
+   * @param bmf
+   * @tparam R
+   * @return
+   */
+  def mapBlock[R : ClassTag](ncol: Int = -1)
+      (bmf: BlockMapFunc[K, R]): DrmLike[R] =
+    new OpMapBlock[K, R](A = drm, bmf = bmf, _ncol = ncol)
+
+
+  /**
+   * Slicing the DRM. Should eventually work just like in-core drm (e.g. A(0 until 5, 5 until 15)).<P>
+   *
+   * The all-range is denoted by '::', e.g.: A(::, 0 until 5).<P>
+   *
+   * Row range is currently unsupported except for the all-range. When it will be fully supported,
+   * the input must be Int-keyed, i.e. of DrmLike[Int] type for non-all-range specifications.
+   *
+   * @param rowRange Row range. This must be '::' (all-range) unless matrix rows are keyed by Int key.
+   * @param colRange col range. Must be a sub-range of <code>0 until ncol</code>. '::' denotes all-range.
+   */
+  def apply(rowRange: Range, colRange: Range): DrmLike[K] = {
+
+    import RLikeDrmOps._
+    import RLikeOps._
+
+    val rowSrc: DrmLike[K] = if (rowRange != ::) {
+
+      if (implicitly[ClassTag[Int]] == implicitly[ClassTag[K]]) {
+
+        assert(rowRange.head >= 0 && rowRange.last < drm.nrow, "rows range out of range")
+        val intKeyed = drm.asInstanceOf[DrmLike[Int]]
+
+        new OpRowRange(A = intKeyed, rowRange = rowRange).asInstanceOf[DrmLike[K]]
+
+      } else throw new IllegalArgumentException("non-all row range is only supported for Int-keyed DRMs.")
+
+    } else drm
+
+    if (colRange != ::) {
+
+      assert(colRange.head >= 0 && colRange.last < drm.ncol, "col range out of range")
+
+      // Use mapBlock operator to do in-core subranging.
+      rowSrc.mapBlock(ncol = colRange.length)({
+        case (keys, block) => keys -> block(::, colRange)
+      })
+
+    } else rowSrc
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
new file mode 100644
index 0000000..f46d15c
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Vector, Matrix}
+import org.apache.mahout.math.drm.logical._
+
+class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
+
+  import RLikeDrmOps._
+
+  def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '+')
+
+  def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '-')
+
+  def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '*')
+
+  def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = '/')
+
+  def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+")
+
+  def -(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-")
+
+  def -:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-:")
+
+  def *(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "*")
+
+  def /(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/")
+
+  def /:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/:")
+
+  def :%*%(that: DrmLike[Int]): DrmLike[K] = OpAB[K](A = this.drm, B = that)
+
+  def %*%[B: ClassTag](that: DrmLike[B]): DrmLike[K] = OpABAnyKey[B, K](A = this.drm, B = that)
+
+  def %*%(that: DrmLike[Int]): DrmLike[K] = this :%*% that
+
+  def :%*%(that: Matrix): DrmLike[K] = OpTimesRightMatrix[K](A = this.drm, right = that)
+
+  def %*%(that: Matrix): DrmLike[K] = this :%*% that
+
+  def :%*%(that: Vector): DrmLike[K] = OpAx(A = this.drm, x = that)
+
+  def %*%(that: Vector): DrmLike[K] = :%*%(that)
+
+  def t: DrmLike[Int] = OpAtAnyKey(A = drm)
+}
+
+class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
+
+  override def t: DrmLike[Int] = OpAt(A = drm)
+
+  def %*%:[K: ClassTag](that: DrmLike[K]): DrmLike[K] = OpAB[K](A = that, B = this.drm)
+
+  def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm)
+
+
+}
+
+object RLikeDrmOps {
+  implicit def drmInt2RLikeOps(drm: DrmLike[Int]): RLikeDrmIntOps = new RLikeDrmIntOps(drm)
+
+  implicit def drm2RLikeOps[K: ClassTag](drm: DrmLike[K]): RLikeDrmOps[K] = new RLikeDrmOps[K](drm)
+
+  implicit def rlikeOps2Drm[K: ClassTag](ops: RLikeDrmOps[K]): DrmLike[K] = ops.drm
+
+  implicit def ops2Drm[K: ClassTag](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm
+
+  implicit def cp2cpops[K: ClassTag](cp: CheckpointedDrm[K]): CheckpointedOps[K] = new CheckpointedOps(cp)
+
+  /**
+   * This is probably dangerous since it triggers implicit checkpointing with default storage level
+   * setting.
+   */
+  implicit def drm2cpops[K: ClassTag](drm: DrmLike[K]): CheckpointedOps[K] = new CheckpointedOps(drm.checkpoint())
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
new file mode 100644
index 0000000..34ae345
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DQR.scala
@@ -0,0 +1,57 @@
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.log4j.Logger
+
+object DQR {
+
+  private val log = Logger.getLogger(DQR.getClass)
+
+  /**
+   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+   * controlled (<5000 or so). <P>
+   *
+   * It is recommended to checkpoint A since it does two passes over it. <P>
+   *
+   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+   * their RDD should be able to zip successfully.
+   */
+  def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
+
+    if (A.ncol > 5000)
+      log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
+
+    implicit val ctx = A.context
+
+    val AtA = (A.t %*% A).checkpoint()
+    val inCoreAtA = AtA.collect
+
+    if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
+
+    val ch = chol(inCoreAtA)
+    val inCoreR = (ch.getL cloned) t
+
+    if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
+
+    if (checkRankDeficiency && !ch.isPositiveDefinite)
+      throw new IllegalArgumentException("R is rank-deficient.")
+
+    val bcastAtA = drmBroadcast(inCoreAtA)
+
+    // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
+    // decompose A'A in the backend again.
+
+    // Compute Q = A*inv(L') -- we can do it blockwise.
+    val Q = A.mapBlock() {
+      case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
+    }
+
+    Q -> inCoreR
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
new file mode 100644
index 0000000..9e33416
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSPCA.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSPCA {
+
+  /**
+   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+   * document of the MAHOUT-817.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations (hint: use either 0 or 1)
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = {
+
+    val drmA = A.checkpoint()
+    implicit val ctx = A.context
+
+    val m = drmA.nrow
+    val n = drmA.ncol
+    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+    val pfxed = safeToNonNegInt((m min n) - k min p)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    // Dataset mean
+    val xi = drmA.colMeans
+
+    // We represent Omega by its seed.
+    val omegaSeed = RandomUtils.getRandom().nextInt()
+    val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
+
+    // This done in front in a single-threaded fashion for now. Even though it doesn't require any
+    // memory beyond that is required to keep xi around, it still might be parallelized to backs
+    // for significantly big n and r. TODO
+    val s_o = omega.t %*% xi
+
+    val bcastS_o = drmBroadcast(s_o)
+    val bcastXi = drmBroadcast(xi)
+
+    var drmY = drmA.mapBlock(ncol = r) {
+      case (keys, blockA) =>
+        val s_o:Vector = bcastS_o
+        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+        for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
+        keys -> blockY
+    }
+        // Checkpoint Y
+        .checkpoint()
+
+    var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+    var s_q = drmQ.colSums()
+    var bcastVarS_q = drmBroadcast(s_q)
+
+    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+    // still be identically partitioned.
+    var drmBt = (drmA.t %*% drmQ).checkpoint()
+
+    var s_b = (drmBt.t %*% xi).collect(::, 0)
+    var bcastVarS_b = drmBroadcast(s_b)
+
+    for (i <- 0 until q) {
+
+      // These closures don't seem to live well with outside-scope vars. This doesn't record closure
+      // attributes correctly. So we create additional set of vals for broadcast vars to properly 
+      // create readonly closure attributes in this very scope.
+      val bcastS_q = bcastVarS_q
+      val bcastS_b = bcastVarS_b
+      val bcastXib = bcastXi
+
+      // Fix Bt as B' -= xi cross s_q
+      drmBt = drmBt.mapBlock() {
+        case (keys, block) =>
+          val s_q: Vector = bcastS_q
+          val xi: Vector = bcastXib
+          keys.zipWithIndex.foreach {
+            case (key, idx) => block(idx, ::) -= s_q * xi(key)
+          }
+          keys -> block
+      }
+
+      drmY.uncache()
+      drmQ.uncache()
+
+      drmY = (drmA %*% drmBt)
+          // Fix Y by subtracting s_b from each row of the AB'
+          .mapBlock() {
+        case (keys, block) =>
+          val s_b: Vector = bcastS_b
+          for (row <- 0 until block.nrow) block(row, ::) -= s_b
+          keys -> block
+      }
+          // Checkpoint Y
+          .checkpoint()
+
+      drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+      s_q = drmQ.colSums()
+      bcastVarS_q = drmBroadcast(s_q)
+
+      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+      // identically partitioned anymore.
+      drmBt = (drmA.t %*% drmQ).checkpoint()
+
+      s_b = (drmBt.t %*% xi).collect(::, 0)
+      bcastVarS_b = drmBroadcast(s_b)
+    }
+
+    val c = s_q cross s_b
+    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
+        c - c.t + (s_q cross s_q) * (xi dot xi)
+    val (inCoreUHat, d) = eigen(inCoreBBt)
+    val s = d.sqrt
+
+    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+    val drmU = drmQ %*% inCoreUHat
+    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
new file mode 100644
index 0000000..0da9ec7
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/decompositions/DSSVD.scala
@@ -0,0 +1,83 @@
+package org.apache.mahout.math.drm.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSSVD {
+
+  /**
+   * Distributed Stochastic Singular Value decomposition algorithm.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = {
+
+    val drmA = A.checkpoint()
+
+    val m = drmA.nrow
+    val n = drmA.ncol
+    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+    val pfxed = safeToNonNegInt((m min n) - k min p)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    // We represent Omega by its seed.
+    val omegaSeed = RandomUtils.getRandom().nextInt()
+
+    // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
+    // instantiate the Omega random matrix view in the backend instead. That way serialized closure
+    // is much more compact.
+    var drmY = drmA.mapBlock(ncol = r) {
+      case (keys, blockA) =>
+        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+        keys -> blockY
+    }
+
+    var drmQ = dqrThin(drmY.checkpoint())._1
+    // Checkpoint Q if last iteration
+    if (q == 0) drmQ = drmQ.checkpoint()
+
+    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+    // still be identically partitioned.
+    var drmBt = drmA.t %*% drmQ
+    // Checkpoint B' if last iteration
+    if (q == 0) drmBt = drmBt.checkpoint()
+
+    for (i <- 0  until q) {
+      drmY = drmA %*% drmBt
+      drmQ = dqrThin(drmY.checkpoint())._1
+      // Checkpoint Q if last iteration
+      if (i == q - 1) drmQ = drmQ.checkpoint()
+
+      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+      // identically partitioned anymore.
+      drmBt = drmA.t %*% drmQ
+      // Checkpoint B' if last iteration
+      if (i == q - 1) drmBt = drmBt.checkpoint()
+    }
+
+    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect
+    val (inCoreUHat, d) = eigen(inCoreBBt)
+    val s = d.sqrt
+
+    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+    val drmU = drmQ %*% inCoreUHat
+    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
new file mode 100644
index 0000000..c2371d1
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.{DistributedContext, DrmLike}
+
+abstract class AbstractBinaryOp[A: ClassTag, B: ClassTag, K: ClassTag]
+    extends CheckpointAction[K] with DrmLike[K] {
+
+  protected[drm] var A: DrmLike[A]
+  protected[drm] var B: DrmLike[B]
+  protected[mahout] lazy val context: DistributedContext = A.context
+
+  // These are explicit evidence export. Sometimes scala falls over to figure that on its own.
+  def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
+
+  def classTagB: ClassTag[B] = implicitly[ClassTag[B]]
+
+  def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
new file mode 100644
index 0000000..eb5ef9a
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.{DistributedContext, DrmLike}
+
+/** Abstract unary operator */
+abstract class AbstractUnaryOp[A: ClassTag, K: ClassTag]
+    extends CheckpointAction[K] with DrmLike[K] {
+
+  protected[drm] var A: DrmLike[A]
+
+  protected[mahout] lazy val context: DistributedContext = A.context
+
+  def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
+
+  def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
+
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
new file mode 100644
index 0000000..aa3a3b9
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import scala.util.Random
+import org.apache.mahout.math.drm._
+
+/** Implementation of distributed expression checkpoint and optimizer. */
+abstract class CheckpointAction[K: ClassTag] extends DrmLike[K] {
+
+  protected[mahout] lazy val partitioningTag: Long = Random.nextLong()
+
+  private[mahout] var cp:Option[CheckpointedDrm[K]] = None
+
+  def isIdenticallyPartitioned(other:DrmLike[_]) =
+    partitioningTag!= 0L && partitioningTag == other.partitioningTag
+
+  /**
+   * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
+   * and writing down Spark graph lineage since last checkpointed DRM.
+   */
+  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = cp match {
+    case None =>
+      val physPlan = context.toPhysical(context.optimizerRewrite(this), cacheHint)
+      cp = Some(physPlan)
+      physPlan
+    case Some(cp) => cp
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
new file mode 100644
index 0000000..804a00e
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical AB */
+case class OpAB[K: ClassTag ](
+    override var A: DrmLike[K],
+    override var B: DrmLike[Int])
+    extends AbstractBinaryOp[K, Int, K] {
+
+  assert(A.ncol == B.nrow, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = B.ncol
+
+  /** Non-zero element count */
+  def nNonZero: Long =
+  // TODO: for purposes of cost calculation, approximate based on operands
+    throw new UnsupportedOperationException
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
new file mode 100644
index 0000000..f131f3f
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical AB */
+case class OpABAnyKey[B:ClassTag, K: ClassTag ](
+    override var A: DrmLike[K],
+    override var B: DrmLike[B])
+    extends AbstractBinaryOp[K, B, K] {
+
+  assert(A.ncol == B.nrow, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = B.ncol
+
+  /** Non-zero element count */
+  def nNonZero: Long =
+  // TODO: for purposes of cost calculation, approximate based on operands
+    throw new UnsupportedOperationException
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
new file mode 100644
index 0000000..f6503ed
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm._
+
+/** Logical AB' */
+case class OpABt[K: ClassTag](
+    override var A: DrmLike[K],
+    override var B: DrmLike[Int])
+    extends AbstractBinaryOp[K,Int,K]  {
+
+  assert(A.ncol == B.ncol, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = safeToNonNegInt(B.nrow)
+
+  /** Non-zero element count */
+  def nNonZero: Long =
+  // TODO: for purposes of cost calculation, approximate based on operands
+    throw new UnsupportedOperationException
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
new file mode 100644
index 0000000..d07172a
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** DRM elementwise operator */
+case class OpAewB[K: ClassTag](
+    override var A: DrmLike[K],
+    override var B: DrmLike[K],
+    val op: Char
+    ) extends AbstractBinaryOp[K, K, K] {
+
+  assert(A.ncol == B.ncol, "arguments must have same number of columns")
+  assert(A.nrow == B.nrow, "arguments must have same number of rows")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
new file mode 100644
index 0000000..91e0dd4
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Operator denoting expressions like 5.0 - A or A * 5.6 */
+case class OpAewScalar[K: ClassTag](
+    override var A: DrmLike[K],
+    val scalar: Double,
+    val op: String
+    ) extends AbstractUnaryOp[K,K] {
+
+  override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
new file mode 100644
index 0000000..3239ad2
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.drm._
+
+/** Logical A' */
+case class OpAt(
+    override var A: DrmLike[Int])
+    extends AbstractUnaryOp[Int, Int] {
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.ncol
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = safeToNonNegInt(A.nrow)
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
new file mode 100644
index 0000000..c7c6046
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** A'A */
+case class OpAtA[K: ClassTag](
+    override var A: DrmLike[K]
+    ) extends AbstractUnaryOp[K, Int] {
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.ncol
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol
+
+  /** Non-zero element count */
+  def nNonZero: Long = throw new UnsupportedOperationException
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
new file mode 100644
index 0000000..4e1dd5c
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm._
+
+/** Logical A' for any row key to support A'A optimizations */
+case class OpAtAnyKey[A: ClassTag](
+    override var A: DrmLike[A])
+    extends AbstractUnaryOp[A, Int] {
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.ncol
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = safeToNonNegInt(A.nrow)
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
new file mode 100644
index 0000000..ef3ae6b
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical A'B */
+case class OpAtB[A: ClassTag](
+    override var A: DrmLike[A],
+    override var B: DrmLike[A])
+    extends AbstractBinaryOp[A, A, Int] {
+
+  assert(A.nrow == B.nrow, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.ncol
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = B.ncol
+
+  /** Non-zero element count */
+  def nNonZero: Long =
+  // TODO: for purposes of cost calculation, approximate based on operands
+    throw new UnsupportedOperationException
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
new file mode 100644
index 0000000..36769c7
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.Vector
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+
+/** Logical A'x. */
+case class OpAtx(
+    override var A: DrmLike[Int],
+    val x: Vector
+    ) extends AbstractUnaryOp[Int, Int] {
+
+  override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+  assert(A.nrow == x.length, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = safeToNonNegInt(A.ncol)
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = 1
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
new file mode 100644
index 0000000..a726989
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Vector
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical Ax. */
+case class OpAx[K: ClassTag](
+    override var A: DrmLike[K],
+    val x: Vector
+    ) extends AbstractUnaryOp[K, K] {
+
+  override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+  assert(A.ncol == x.length, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = 1
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
new file mode 100644
index 0000000..8e4362d
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.{BlockMapFunc, DrmLike}
+
+class OpMapBlock[S: ClassTag, R: ClassTag](
+    override var A: DrmLike[S],
+    val bmf: BlockMapFunc[S, R],
+    val _ncol: Int = -1,
+    val _nrow: Long = -1
+    ) extends AbstractUnaryOp[S, R] {
+
+
+  override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = if (_nrow >= 0) _nrow else A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = if (_ncol >= 0) _ncol else A.ncol
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
new file mode 100644
index 0000000..697bbd3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical row-range slicing */
+case class OpRowRange(
+    override var A: DrmLike[Int],
+    val rowRange: Range
+    ) extends AbstractUnaryOp[Int, Int] {
+
+  assert(rowRange.head >= 0 && rowRange.last < A.nrow, "row range out of range")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = rowRange.length
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
new file mode 100644
index 0000000..1ca79b3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical Times-left over in-core matrix operand */
+case class OpTimesLeftMatrix(
+    val left: Matrix,
+    override var A: DrmLike[Int]
+    ) extends AbstractUnaryOp[Int, Int] {
+
+  assert(left.ncol == A.nrow, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = left.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol
+
+  /** Non-zero element count */
+  // TODO
+  def nNonZero: Long = throw new UnsupportedOperationException
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
new file mode 100644
index 0000000..c55f7f0
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.DrmLike
+
+/** Logical times-right over in-core matrix operand. */
+case class OpTimesRightMatrix[K: ClassTag](
+    override var A: DrmLike[K],
+    val right: Matrix
+    ) extends AbstractUnaryOp[K, K] {
+
+  override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+  assert(A.ncol == right.nrow, "Incompatible operand geometry")
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = right.ncol
+
+  /** Non-zero element count */
+  // TODO
+  def nNonZero: Long = throw new UnsupportedOperationException
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
new file mode 100644
index 0000000..768bb1c
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.decompositions.{DSPCA, DSSVD, DQR}
+
+package object drm {
+
+  /** Drm row-wise tuple */
+  type DrmTuple[K] = (K, Vector)
+
+  /** Drm block-wise tuple: Array of row keys and the matrix block. */
+  type BlockifiedDrmTuple[K] = (Array[K], _ <: Matrix)
+
+
+  /** Block-map func */
+  type BlockMapFunc[S, R] = BlockifiedDrmTuple[S] => BlockifiedDrmTuple[R]
+
+  /** CacheHint type */
+  //  type CacheHint = CacheHint.CacheHint
+
+  def safeToNonNegInt(x: Long): Int = {
+    assert(x == x << -31 >>> -31, "transformation from long to Int is losing signficant bits, or is a negative number")
+    x.toInt
+  }
+
+  /** Broadcast support API */
+  def drmBroadcast(m:Matrix)(implicit ctx:DistributedContext):BCast[Matrix] = ctx.drmBroadcast(m)
+
+  /** Broadcast support API */
+  def drmBroadcast(v:Vector)(implicit ctx:DistributedContext):BCast[Vector] = ctx.drmBroadcast(v)
+
+  /** Load DRM from hdfs (as in Mahout DRM format) */
+  def drmFromHDFS (path: String)(implicit ctx: DistributedContext): CheckpointedDrm[_] = ctx.drmFromHDFS(path)
+
+  /** Shortcut to parallelizing matrices with indices, ignore row labels. */
+  def drmParallelize(m: Matrix, numPartitions: Int = 1)
+      (implicit sc: DistributedContext): CheckpointedDrm[Int] = drmParallelizeWithRowIndices(m, numPartitions)(sc)
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+  def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
+      (implicit ctx: DistributedContext): CheckpointedDrm[Int] = ctx.drmParallelizeWithRowIndices(m, numPartitions)
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+  def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
+      (implicit ctx: DistributedContext): CheckpointedDrm[String] = ctx.drmParallelizeWithRowLabels(m, numPartitions)
+
+  /** This creates an empty DRM with specified number of partitions and cardinality. */
+  def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
+      (implicit ctx: DistributedContext): CheckpointedDrm[Int] = ctx.drmParallelizeEmpty(nrow, ncol, numPartitions)
+
+  /** Creates empty DRM with non-trivial height */
+  def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
+      (implicit ctx: DistributedContext): CheckpointedDrm[Long] = ctx.drmParallelizeEmptyLong(nrow, ncol, numPartitions)
+
+  /** Implicit broadcast -> value conversion. */
+  implicit def bcast2val[T](bcast:BCast[T]):T = bcast.value
+
+  /** Just throw all engine operations into context as well. */
+  implicit def ctx2engine(ctx:DistributedContext):DistributedEngine = ctx.engine
+
+  implicit def drm2drmCpOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedOps[K] =
+    new CheckpointedOps[K](drm)
+
+  implicit def drm2Checkpointed[K](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint()
+
+  // ============== Decompositions ===================
+
+  /**
+   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+   * controlled (<5000 or so). <P>
+   *
+   * It is recommended to checkpoint A since it does two passes over it. <P>
+   *
+   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+   * their RDD should be able to zip successfully.
+   */
+  def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) =
+    DQR.dqrThin(A, checkRankDeficiency)
+
+  /**
+   * Distributed Stochastic Singular Value decomposition algorithm.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(A, k, p, q)
+
+  /**
+   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+   * document of the MAHOUT-817.
+   *
+   * @param A input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations (hint: use either 0 or 1)
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(A, k, p, q)
+
+
+}


Mime
View raw message