mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ssla...@apache.org
Subject [4/9] mahout git commit: MAHOUT-1681: Renamed mahout-math-scala to mahout-samsara
Date Tue, 14 Apr 2015 06:28:49 GMT
http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/samsara/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
new file mode 100644
index 0000000..32515f1
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.drm.DistributedContext
+
+/** Extended by a platform specific version of this class to create a Mahout CLI driver. */
+abstract class MahoutDriver {
+
+  implicit protected var mc: DistributedContext = _
+  implicit protected var parser: MahoutOptionParser = _
+
+  var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
+
+  /** must be overriden to setup the DistributedContext mc*/
+  protected def start() : Unit
+
+  /** Override (optionally) for special cleanup */
+  protected def stop(): Unit = {
+    if (!_useExistingContext) mc.close
+  }
+
+  /** This is where you do the work, call start first, then before exiting call stop */
+  protected def process(): Unit
+
+  /** Parse command line and call process */
+  def main(args: Array[String]): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/samsara/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
new file mode 100644
index 0000000..3b5affd
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.drivers
+
+import scopt.OptionParser
+
+import scala.collection.immutable
+
+/**
+ * Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
+ * keep both standarized.
+ * @param programName Name displayed in help message, the name by which the driver is invoked.
+ * @note options are engine neutral by convention. See the engine specific extending class for
+ *       to add Spark or other engine options.
+ */
+class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
+
+  // build options from some stardard CLI param groups
+  // Note: always put the driver specific options at the last so they can override any previous options!
+  var opts = Map.empty[String, Any]
+
+  override def showUsageOnError = true
+
+  def parseIOOptions(numInputs: Int = 1) = {
+    opts = opts ++ MahoutOptionParser.FileIOOptions
+    note("Input, output options")
+    opt[String]('i', "input") required() action { (x, options) =>
+      options + ("input" -> x)
+    } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs" +
+      " (required)")
+
+    if (numInputs == 2) {
+      opt[String]("input2") abbr ("i2") action { (x, options) =>
+        options + ("input2" -> x)
+      } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" " +
+        "(optional). Default: empty.")
+    }
+
+    opt[String]('o', "output") required() action { (x, options) =>
+      if (x.endsWith("/")) {
+        options + ("output" -> x)
+      } else {
+        options + ("output" -> (x + "/"))
+      }
+    } text ("Path for output directory, any HDFS supported URI (required)")
+
+  }
+
+  def parseGenericOptions() = {
+    opts = opts ++ MahoutOptionParser.GenericOptions
+    opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
+      options + ("randomSeed" -> x)
+    } validate { x =>
+      if (x > 0) success else failure("Option --randomSeed must be > 0")
+    }
+
+    //output both input IndexedDatasets
+    opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
+      options + ("writeAllDatasets" -> true)
+    }//Hidden option, though a user might want this.
+  }
+
+  def parseElementInputSchemaOptions() = {
+    //Input text file schema--not driver specific but input data specific, elements input,
+    // not rows of IndexedDatasets
+    opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
+    note("\nInput text file schema options:")
+    opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[ ,\\t]\"") action {
+      (x, options) =>
+        options + ("inDelim" -> x)
+    }
+
+    opt[String]("filter1") abbr ("f1") action { (x, options) =>
+      options + ("filter1" -> x)
+    } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). " +
+      "Default: no filter, all data is used")
+
+    opt[String]("filter2") abbr ("f2") action { (x, options) =>
+      options + ("filter2" -> x)
+    } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). " +
+      "If not present no secondary dataset is collected")
+
+    opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) =>
+      options + ("rowIDColumn" -> x)
+    } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate {
+      x =>
+        if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
+    }
+
+    opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) =>
+      options + ("itemIDColumn" -> x)
+    } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate {
+      x =>
+        if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
+    }
+
+    opt[Int]("filterColumn") abbr ("fc") action { (x, options) =>
+      options + ("filterColumn" -> x)
+    } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no " +
+      "filter") validate { x =>
+      if (x >= -1) success else failure("Option --filterColNum must be >= -1")
+    }
+
+    note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or" +
+      " \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+
+    //check for column consistency
+    checkConfig { options: Map[String, Any] =>
+      if (options("filterColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int]
+        || options("filterColumn").asInstanceOf[Int] == options("rowIDColumn").asInstanceOf[Int]
+        || options("rowIDColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int])
+        failure("The row, item, and filter positions must be unique.") else success
+    }
+
+    //check for filter consistency
+    checkConfig { options: Map[String, Any] =>
+      if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String]
+        && options("filter2").asInstanceOf[String] != null.asInstanceOf[String]
+        && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String])
+        failure ("If using filters they must be unique.") else success
+    }
+
+  }
+
+  def parseFileDiscoveryOptions() = {
+    //File finding strategy--not driver specific
+    opts = opts ++ MahoutOptionParser.FileDiscoveryOptions
+    note("\nFile discovery options:")
+    opt[Unit]('r', "recursive") action { (_, options) =>
+      options + ("recursive" -> true)
+    } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
+
+    opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
+      options + ("filenamePattern" -> x)
+    } text ("Regex to match in determining input files (optional). Default: filename in the --input option " +
+      "or \"^part-.*\" if --input is a directory")
+
+  }
+
+  def parseIndexedDatasetFormatOptions() = {
+    opts = opts ++ MahoutOptionParser.TextDelimitedIndexedDatasetOptions
+    note("\nOutput text file schema options:")
+    opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
+      options + ("rowKeyDelim" -> x)
+    } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
+
+    opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
+      options + ("columnIdStrengthDelim" -> x)
+    } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
+
+    opt[String]("elementDelim") abbr ("td") action { (x, options) =>
+      options + ("elementDelim" -> x)
+    } text ("Separates vector element values in the values list (optional). Default: \" \"")
+
+    opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
+      options + ("omitStrength" -> true)
+    } text ("Do not write the strength to the output files (optional), Default: false.")
+    note("This option is used to output indexable data for creating a search engine recommender.")
+
+    note("\nDefault delimiters will produce output of the form: " +
+      "\"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
+  }
+
+}
+
+/**
+ * Companion object defines default option groups for reference in any driver that needs them.
+ * @note not all options are platform neutral so other platforms can add default options here if desired
+ */
+object MahoutOptionParser {
+
+  // set up the various default option groups
+  final val GenericOptions = immutable.HashMap[String, Any](
+    "randomSeed" -> System.currentTimeMillis().toInt,
+    "writeAllDatasets" -> false)
+
+  final val SparkOptions = immutable.HashMap[String, Any](
+    "master" -> "local",
+    "sparkExecutorMem" -> "",
+    "appName" -> "Generic Spark App, Change this.")
+
+  final val FileIOOptions = immutable.HashMap[String, Any](
+    "input" -> null.asInstanceOf[String],
+    "input2" -> null.asInstanceOf[String],
+    "output" -> null.asInstanceOf[String])
+
+  final val FileDiscoveryOptions = immutable.HashMap[String, Any](
+    "recursive" -> false,
+    "filenamePattern" -> "^part-.*")
+
+  final val TextDelimitedElementsOptions = immutable.HashMap[String, Any](
+    "rowIDColumn" -> 0,
+    "itemIDColumn" -> 1,
+    "filterColumn" -> -1,
+    "filter1" -> null.asInstanceOf[String],
+    "filter2" -> null.asInstanceOf[String],
+    "inDelim" -> "[,\t ]")
+
+  final val TextDelimitedIndexedDatasetOptions = immutable.HashMap[String, Any](
+    "rowKeyDelim" -> "\t",
+    "columnIdStrengthDelim" -> ":",
+    "elementDelim" -> " ",
+    "omitStrength" -> false)
+}
+
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/samsara/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
new file mode 100644
index 0000000..6557ab0
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.cf
+
+import org.apache.mahout.math._
+import org.apache.mahout.math.indexeddataset.IndexedDataset
+import scalabindings._
+import RLikeOps._
+import drm._
+import RLikeDrmOps._
+import scala.collection.JavaConversions._
+import org.apache.mahout.math.stats.LogLikelihood
+import collection._
+import org.apache.mahout.math.function.{VectorFunction, Functions}
+
+import scala.util.Random
+
+
+/**
+ * Based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
+ * available at http://www.mapr.com/practical-machine-learning
+ *
+ * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
+ * Scalable Similarity-Based Neighborhood Methods with MapReduce
+ * ACM Conference on Recommender Systems 2012"
+ */
+object SimilarityAnalysis extends Serializable {
+
+  /** Compares (Int,Double) pairs by the second value */
+  private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
+
+  /**
+   * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
+   * and returns a list of similarity and cross-similarity matrices
+   * @param drmARaw Primary interaction matrix
+   * @param randomSeed when kept to a constant will make repeatable downsampling
+   * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
+   * @param maxNumInteractions max number of interactions after downsampling, default: 500
+   * @return a list of [[org.apache.mahout.math.drm.DrmLike]] containing downsampled DRMs for cooccurrence and
+   *         cross-cooccurrence
+   */
+  def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
+                    maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
+
+    implicit val distributedContext = drmARaw.context
+
+    // Apply selective downsampling, pin resulting matrix
+    val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
+
+    // num users, which equals the maximum number of interactions per item
+    val numUsers = drmA.nrow.toInt
+
+    // Compute & broadcast the number of interactions per thing in A
+    val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn)
+
+    // Compute co-occurrence matrix A'A
+    val drmAtA = drmA.t %*% drmA
+
+    // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix
+    val drmSimilarityAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing,
+      bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false)
+
+    var similarityMatrices = List(drmSimilarityAtA)
+
+    // Now look at cross-co-occurrences
+    for (drmBRaw <- drmBs) {
+      // Down-sample and pin other interaction matrix
+      val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint()
+
+      // Compute & broadcast the number of interactions per thing in B
+      val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn)
+
+      // Compute cross-co-occurrence matrix A'B
+      val drmAtB = drmA.t %*% drmB
+
+      val drmSimilarityAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing,
+        bcastInteractionsPerItemA, bcastInteractionsPerThingB)
+
+      similarityMatrices = similarityMatrices :+ drmSimilarityAtB
+
+      drmB.uncache()
+    }
+
+    // Unpin downsampled interaction matrix
+    drmA.uncache()
+
+    // Return list of similarity matrices
+    similarityMatrices
+  }
+
+  /**
+   * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... and returns
+   * a list of similarity and cross-similarity matrices. Somewhat easier to use method, which handles the ID
+   * dictionaries correctly
+   * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary
+   * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
+   * @param maxInterestingItemsPerThing max similarities per items
+   * @param maxNumInteractions max number of input items per item
+   * @return a list of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] containing downsampled
+   *         IndexedDatasets for cooccurrence and cross-cooccurrence
+   */
+  def cooccurrencesIDSs(indexedDatasets: Array[IndexedDataset],
+      randomSeed: Int = 0xdeadbeef,
+      maxInterestingItemsPerThing: Int = 50,
+      maxNumInteractions: Int = 500):
+    List[IndexedDataset] = {
+    val drms = indexedDatasets.map(_.matrix.asInstanceOf[DrmLike[Int]])
+    val primaryDrm = drms(0)
+    val secondaryDrms = drms.drop(1)
+    val coocMatrices = cooccurrences(primaryDrm, randomSeed, maxInterestingItemsPerThing,
+      maxNumInteractions, secondaryDrms)
+    val retIDSs = coocMatrices.iterator.zipWithIndex.map {
+      case( drm, i ) =>
+        indexedDatasets(0).create(drm, indexedDatasets(0).columnIDs, indexedDatasets(i).columnIDs)
+    }
+    retIDSs.toList
+  }
+
+  /**
+   * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a DRM of rows and similar rows
+   * @param drmARaw Primary interaction matrix
+   * @param randomSeed when kept to a constant will make repeatable downsampling
+   * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
+   * @param maxNumInteractions max number of interactions after downsampling, default: 500
+   */
+  def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50,
+                    maxNumInteractions: Int = 500): DrmLike[Int] = {
+
+    implicit val distributedContext = drmARaw.context
+
+    // Apply selective downsampling, pin resulting matrix
+    val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
+
+    // num columns, which equals the maximum number of interactions per item
+    val numCols = drmA.ncol
+
+    // Compute & broadcast the number of interactions per row in A
+    val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerRow)
+
+    // Compute row similarity cooccurrence matrix AA'
+    val drmAAt = drmA %*% drmA.t
+
+    // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix
+    val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow,
+      bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false)
+
+    drmSimilaritiesAAt
+  }
+
+  /**
+   * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows.
+   * Uses IndexedDatasets, which handle external ID dictionaries properly
+   * @param indexedDataset compare each row to every other
+   * @param randomSeed  use default to make repeatable, otherwise pass in system time or some randomizing seed
+   * @param maxInterestingSimilaritiesPerRow max elements returned in each row
+   * @param maxObservationsPerRow max number of input elements to use
+   */
+  def rowSimilarityIDS(indexedDataset: IndexedDataset, randomSeed: Int = 0xdeadbeef,
+      maxInterestingSimilaritiesPerRow: Int = 50,
+      maxObservationsPerRow: Int = 500):
+    IndexedDataset = {
+    val coocMatrix = rowSimilarity(indexedDataset.matrix, randomSeed, maxInterestingSimilaritiesPerRow,
+      maxObservationsPerRow)
+    indexedDataset.create(coocMatrix, indexedDataset.rowIDs, indexedDataset.rowIDs)
+  }
+
+   /** Compute loglikelihood ratio see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details */
+  def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
+    numInteractionsWithAandB: Long, numInteractions: Long) = {
+
+    val k11 = numInteractionsWithAandB
+    val k12 = numInteractionsWithA - numInteractionsWithAandB
+    val k21 = numInteractionsWithB - numInteractionsWithAandB
+    val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
+
+    LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
+
+  }
+
+  def computeSimilarities(drm: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
+                        bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector],
+                        crossCooccurrence: Boolean = true) = {
+    drm.mapBlock() {
+      case (keys, block) =>
+
+        val llrBlock = block.like()
+        val numInteractionsB: Vector = bcastNumInteractionsB
+        val numInteractionsA: Vector = bcastNumInteractionsA
+
+        for (index <- 0 until keys.size) {
+
+          val thingB = keys(index)
+
+          // PriorityQueue to select the top-k items
+          val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
+
+          block(index, ::).nonZeroes().foreach { elem =>
+            val thingA = elem.index
+            val cooccurrences = elem.get
+
+            // exclude co-occurrences of the item with itself
+            if (crossCooccurrence || thingB != thingA) {
+              // Compute loglikelihood ratio
+              val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
+                cooccurrences.toLong, numUsers)
+
+              val candidate = thingA -> llr
+
+              // legacy hadoop code maps values to range (0..1) via
+              // val normailizedLLR = 1.0 - (1.0 / (1.0 + llr))
+              // val candidate = thingA -> normailizedLLR
+
+              // Enqueue item with score, if belonging to the top-k
+              if (topItemsPerThing.size < maxInterestingItemsPerThing) {
+                topItemsPerThing.enqueue(candidate)
+              } else if (orderByScore.lt(candidate, topItemsPerThing.head)) {
+                topItemsPerThing.dequeue()
+                topItemsPerThing.enqueue(candidate)
+              }
+            }
+          }
+
+          // Add top-k interesting items to the output matrix
+          topItemsPerThing.dequeueAll.foreach {
+            case (otherThing, llrScore) =>
+              llrBlock(index, otherThing) = llrScore
+          }
+        }
+
+        keys -> llrBlock
+    }
+  }
+
+  /**
+   * Selectively downsample rows and items with an anomalous amount of interactions, inspired by
+   * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
+   *
+   * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not
+   * @param drmM matrix to downsample
+   * @param seed random number generator seed, keep to a constant if repeatability is neccessary
+   * @param maxNumInteractions number of elements in a row of the returned matrix
+   * @return the downsampled DRM
+   */
+  def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
+
+    implicit val distributedContext = drmM.context
+
+    // Pin raw interaction matrix
+    val drmI = drmM.checkpoint()
+
+    // Broadcast vector containing the number of interactions with each thing
+    val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn)
+
+    val downSampledDrmI = drmI.mapBlock() {
+      case (keys, block) =>
+        val numInteractions: Vector = bcastNumInteractions
+
+        // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of
+        //failures
+        val random = new Random(MurmurHash.hash(keys(0), seed))
+
+        val downsampledBlock = block.like()
+
+        // Downsample the interaction vector of each row
+        for (rowIndex <- 0 until keys.size) {
+
+          val interactionsInRow = block(rowIndex, ::)
+
+          val numInteractionsPerRow = interactionsInRow.getNumNonZeroElements()
+
+          val perRowSampleRate = math.min(maxNumInteractions, numInteractionsPerRow) / numInteractionsPerRow
+
+          interactionsInRow.nonZeroes().foreach { elem =>
+            val numInteractionsWithThing = numInteractions(elem.index)
+            val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing
+
+            if (random.nextDouble() <= math.min(perRowSampleRate, perThingSampleRate)) {
+              // We ignore the original interaction value and create a binary 0-1 matrix
+              // as we only consider whether interactions happened or did not happen
+              downsampledBlock(rowIndex, elem.index) = 1
+            }
+          }
+        }
+
+        keys -> downsampledBlock
+    }
+
+    // Unpin raw interaction matrix
+    drmI.uncache()
+
+    downSampledDrmI
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala b/samsara/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
new file mode 100644
index 0000000..4e2f45c
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math._
+import drm._
+import scalabindings._
+import RLikeDrmOps._
+import RLikeOps._
+import scala.util.Random
+import org.apache.log4j.Logger
+import math._
+import org.apache.mahout.common.RandomUtils
+
+/** Simple ALS factorization algotithm. To solve, use train() method. */
+private[math] object ALS {
+
+  private val log = Logger.getLogger(ALS.getClass)
+
+  /**
+   * ALS training result. <P>
+   *
+   * <code>drmU %*% drmV.t</code> is supposed to approximate the input.
+   *
+   * @param drmU U matrix
+   * @param drmV V matrix
+   * @param iterationsRMSE RMSE values afeter each of iteration performed
+   */
+  class Result[K: ClassTag](val drmU: DrmLike[K], val drmV: DrmLike[Int], val iterationsRMSE: Iterable[Double]) {
+    def toTuple = (drmU, drmV, iterationsRMSE)
+  }
+
+  /** Result class for in-core results */
+  class InCoreResult(val inCoreU: Matrix, inCoreV: Matrix, val iterationsRMSE: Iterable[Double]) {
+    def toTuple = (inCoreU, inCoreV, iterationsRMSE)
+  }
+
+  /**
+   * Run Distributed ALS.
+   * <P>
+   *
+   * Example:
+   *
+   * <pre>
+   * val (u,v,errors) = als(input, k).toTuple
+   * </pre>
+   *
+   * ALS runs until (rmse[i-1]-rmse[i])/rmse[i-1] < convergenceThreshold, or i==maxIterations,
+   * whichever earlier.
+   * <P>
+   *
+   * @param drmA The input matrix
+   * @param k required rank of decomposition (number of cols in U and V results)
+   * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this
+   *                             value. If <=0 then we won't compute RMSE and use convergence test.
+   * @param lambda regularization rate
+   * @param maxIterations maximum iterations to run regardless of convergence
+   * @tparam K row key type of the input (100 is probably more than enough)
+   * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result}
+   */
+  def dals[K: ClassTag](
+      drmA: DrmLike[K],
+      k: Int = 50,
+      lambda: Double = 0.0,
+      maxIterations: Int = 10,
+      convergenceThreshold: Double = 0.10
+      ): Result[K] = {
+
+    assert(convergenceThreshold < 1.0, "convergenceThreshold")
+    assert(maxIterations >= 1, "maxIterations")
+
+    val drmAt = drmA.t
+
+    // Initialize U and V so that they are identically distributed to A or A'
+    var drmU = drmA.mapBlock(ncol = k) {
+      case (keys, block) =>
+        val rnd = RandomUtils.getRandom()
+        val uBlock = Matrices.symmetricUniformView(block.nrow, k, rnd.nextInt()) * 0.01
+        keys -> uBlock
+    }
+
+    var drmV: DrmLike[Int] = null
+    var rmseIterations: List[Double] = Nil
+
+    // ALS iterator
+    var stop = false
+    var i = 0
+    while (!stop && i < maxIterations) {
+
+      // Alternate. This is really what ALS is.
+      if (drmV != null) drmV.uncache()
+      drmV = (drmAt %*% drmU %*% solve(drmU.t %*% drmU -: diag(lambda, k))).checkpoint()
+
+      drmU.uncache()
+      drmU = (drmA %*% drmV %*% solve(drmV.t %*% drmV -: diag(lambda, k))).checkpoint()
+
+      // Check if we are requested to do a convergence test; and do it if yes.
+      if (convergenceThreshold > 0) {
+
+        val rmse = (drmA - drmU %*% drmV.t).norm / sqrt(drmA.ncol * drmA.nrow)
+
+        if (i > 0) {
+          val rmsePrev = rmseIterations.last
+          val convergence = (rmsePrev - rmse) / rmsePrev
+
+          if (convergence < 0) {
+            log.warn("Rmse increase of %f. Should not happen.".format(convergence))
+            // I guess error growth can happen in ideal data case?
+            stop = true
+          } else if (convergence < convergenceThreshold) {
+            stop = true
+          }
+        }
+        rmseIterations :+= rmse
+      }
+
+      i += 1
+    }
+
+    new Result(drmU, drmV, rmseIterations)
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala b/samsara/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
new file mode 100644
index 0000000..7caa3dd
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.log4j.Logger
+
+object DQR {
+
+  private val log = Logger.getLogger(DQR.getClass)
+
+  /**
+   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+   * controlled (<5000 or so). <P>
+   *
+   * It is recommended to checkpoint A since it does two passes over it. <P>
+   *
+   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+   * their RDD should be able to zip successfully.
+   */
+  def dqrThin[K: ClassTag](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
+
+    if (drmA.ncol > 5000)
+      log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
+
+    implicit val ctx = drmA.context
+
+    val AtA = (drmA.t %*% drmA).checkpoint()
+    val inCoreAtA = AtA.collect
+
+    if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
+
+    val ch = chol(inCoreAtA)
+    val inCoreR = (ch.getL cloned) t
+
+    if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
+
+    if (checkRankDeficiency && !ch.isPositiveDefinite)
+      throw new IllegalArgumentException("R is rank-deficient.")
+
+    val bcastAtA = drmBroadcast(inCoreAtA)
+
+    // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
+    // decompose A'A in the backend again.
+
+    // Compute Q = A*inv(L') -- we can do it blockwise.
+    val Q = drmA.mapBlock() {
+      case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
+    }
+
+    Q -> inCoreR
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala b/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
new file mode 100644
index 0000000..de7402d
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSPCA {
+
+  /**
+   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+   * document of the MAHOUT-817.
+   *
+   * @param drmA input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations (hint: use either 0 or 1)
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dspca[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = {
+
+    val drmAcp = drmA.checkpoint()
+    implicit val ctx = drmAcp.context
+
+    val m = drmAcp.nrow
+    val n = drmAcp.ncol
+    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+    val pfxed = safeToNonNegInt((m min n) - k min p)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    // Dataset mean
+    val xi = drmAcp.colMeans
+
+    // We represent Omega by its seed.
+    val omegaSeed = RandomUtils.getRandom().nextInt()
+    val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
+
+    // This done in front in a single-threaded fashion for now. Even though it doesn't require any
+    // memory beyond that is required to keep xi around, it still might be parallelized to backs
+    // for significantly big n and r. TODO
+    val s_o = omega.t %*% xi
+
+    val bcastS_o = drmBroadcast(s_o)
+    val bcastXi = drmBroadcast(xi)
+
+    var drmY = drmAcp.mapBlock(ncol = r) {
+      case (keys, blockA) =>
+        val s_o:Vector = bcastS_o
+        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+        for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
+        keys -> blockY
+    }
+        // Checkpoint Y
+        .checkpoint()
+
+    var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+    var s_q = drmQ.colSums()
+    var bcastVarS_q = drmBroadcast(s_q)
+
+    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+    // still be identically partitioned.
+    var drmBt = (drmAcp.t %*% drmQ).checkpoint()
+
+    var s_b = (drmBt.t %*% xi).collect(::, 0)
+    var bcastVarS_b = drmBroadcast(s_b)
+
+    for (i <- 0 until q) {
+
+      // These closures don't seem to live well with outside-scope vars. This doesn't record closure
+      // attributes correctly. So we create additional set of vals for broadcast vars to properly 
+      // create readonly closure attributes in this very scope.
+      val bcastS_q = bcastVarS_q
+      val bcastS_b = bcastVarS_b
+      val bcastXib = bcastXi
+
+      // Fix Bt as B' -= xi cross s_q
+      drmBt = drmBt.mapBlock() {
+        case (keys, block) =>
+          val s_q: Vector = bcastS_q
+          val xi: Vector = bcastXib
+          keys.zipWithIndex.foreach {
+            case (key, idx) => block(idx, ::) -= s_q * xi(key)
+          }
+          keys -> block
+      }
+
+      drmY.uncache()
+      drmQ.uncache()
+
+      drmY = (drmAcp %*% drmBt)
+          // Fix Y by subtracting s_b from each row of the AB'
+          .mapBlock() {
+        case (keys, block) =>
+          val s_b: Vector = bcastS_b
+          for (row <- 0 until block.nrow) block(row, ::) -= s_b
+          keys -> block
+      }
+          // Checkpoint Y
+          .checkpoint()
+
+      drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
+
+      s_q = drmQ.colSums()
+      bcastVarS_q = drmBroadcast(s_q)
+
+      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+      // identically partitioned anymore.
+      drmBt = (drmAcp.t %*% drmQ).checkpoint()
+
+      s_b = (drmBt.t %*% xi).collect(::, 0)
+      bcastVarS_b = drmBroadcast(s_b)
+    }
+
+    val c = s_q cross s_b
+    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
+        c - c.t + (s_q cross s_q) * (xi dot xi)
+    val (inCoreUHat, d) = eigen(inCoreBBt)
+    val s = d.sqrt
+
+    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+    val drmU = drmQ %*% inCoreUHat
+    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala b/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
new file mode 100644
index 0000000..1abfb87
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
@@ -0,0 +1,82 @@
+package org.apache.mahout.math.decompositions
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Matrix, Matrices, Vector}
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import org.apache.mahout.common.RandomUtils
+
+object DSSVD {
+
+  /**
+   * Distributed Stochastic Singular Value decomposition algorithm.
+   *
+   * @param drmA input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dssvd[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = {
+
+    val drmAcp = drmA.checkpoint()
+
+    val m = drmAcp.nrow
+    val n = drmAcp.ncol
+    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
+    val pfxed = safeToNonNegInt((m min n) - k min p)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    // We represent Omega by its seed.
+    val omegaSeed = RandomUtils.getRandom().nextInt()
+
+    // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
+    // instantiate the Omega random matrix view in the backend instead. That way serialized closure
+    // is much more compact.
+    var drmY = drmAcp.mapBlock(ncol = r) {
+      case (keys, blockA) =>
+        val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
+        keys -> blockY
+    }
+
+    var drmQ = dqrThin(drmY.checkpoint())._1
+    // Checkpoint Q if last iteration
+    if (q == 0) drmQ = drmQ.checkpoint()
+
+    // This actually should be optimized as identically partitioned map-side A'B since A and Q should
+    // still be identically partitioned.
+    var drmBt = drmAcp.t %*% drmQ
+    // Checkpoint B' if last iteration
+    if (q == 0) drmBt = drmBt.checkpoint()
+
+    for (i <- 0  until q) {
+      drmY = drmAcp %*% drmBt
+      drmQ = dqrThin(drmY.checkpoint())._1
+      // Checkpoint Q if last iteration
+      if (i == q - 1) drmQ = drmQ.checkpoint()
+
+      // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
+      // identically partitioned anymore.
+      drmBt = drmAcp.t %*% drmQ
+      // Checkpoint B' if last iteration
+      if (i == q - 1) drmBt = drmBt.checkpoint()
+    }
+
+    val (inCoreUHat, d) = eigen(drmBt.t %*% drmBt)
+    val s = d.sqrt
+
+    // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
+    // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
+    val drmU = drmQ %*% inCoreUHat
+    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
+
+    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala b/samsara/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
new file mode 100644
index 0000000..80385a3
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.math._
+import org.apache.mahout.math.{Matrices, Matrix}
+import org.apache.mahout.common.RandomUtils
+import org.apache.log4j.Logger
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+
+private[math] object SSVD {
+
+  private val log = Logger.getLogger(SSVD.getClass)
+
+  /**
+   * In-core SSVD algorithm.
+   *
+   * @param a input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s)
+   */
+  def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+    val m = a.nrow
+    val n = a.ncol
+    if (k > min(m, n))
+      throw new IllegalArgumentException(
+        "k cannot be greater than smaller of m,n")
+    val pfxed = min(p, min(m, n) - k)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    val rnd = RandomUtils.getRandom
+    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+    var y = a %*% omega
+    var yty = y.t %*% y
+    val at = a.t
+    var ch = chol(yty)
+    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+    var bt = ch.solveRight(at %*% y)
+
+    // Power iterations
+    for (i <- 0 until q) {
+      y = a %*% bt
+      yty = y.t %*% y
+      ch = chol(yty)
+      bt = ch.solveRight(at %*% y)
+    }
+
+    val bbt = bt.t %*% bt
+    val (uhat, d) = eigen(bbt)
+
+    val s = d.sqrt
+    val u = ch.solveRight(y) %*% uhat
+    val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+  }
+
+  /**
+   * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
+   * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
+   * to save some memory for sparse inputs by removing direct mean subtraction.<P>
+   *
+   * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
+   * If retaining distances and orignal scaled variances not that important, the normalized PCA space
+   * is just U.
+   *
+   * Important: data points are considered to be rows.
+   *
+   * @param a input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s)
+   */
+  def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+    val m = a.nrow
+    val n = a.ncol
+    if (k > min(m, n))
+      throw new IllegalArgumentException(
+        "k cannot be greater than smaller of m,n")
+    val pfxed = min(p, min(m, n) - k)
+
+    // Actual decomposition rank
+    val r = k + pfxed
+
+    val rnd = RandomUtils.getRandom
+    val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+    // Dataset mean
+    val xi = a.colMeans()
+
+    if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
+
+    var y = a %*% omega
+
+    // Fixing y
+    val s_o = omega.t %*% xi
+    y := ((r,c,v) => v - s_o(c))
+
+    var yty = y.t %*% y
+    var ch = chol(yty)
+//    assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+
+    // This is implicit Q of QR(Y)
+    var qm = ch.solveRight(y)
+    var bt = a.t %*% qm
+    var s_q = qm.colSums()
+    var s_b = bt.t %*% xi
+
+    // Power iterations
+    for (i <- 0 until q) {
+
+      // Fix bt
+      bt -= xi cross s_q
+
+      y = a %*% bt
+
+      // Fix Y again.
+      y := ((r,c,v) => v - s_b(c))
+
+      yty = y.t %*% y
+      ch = chol(yty)
+      qm = ch.solveRight(y)
+      bt = a.t %*% qm
+      s_q = qm.colSums()
+      s_b = bt.t %*% xi
+    }
+
+    val c = s_q cross s_b
+
+    // BB' computation becomes
+    val bbt = bt.t %*% bt -c - c.t +  (s_q cross s_q) * (xi dot xi)
+
+    val (uhat, d) = eigen(bbt)
+
+    val s = d.sqrt
+    val u = qm %*% uhat
+    val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+    (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/decompositions/package.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/decompositions/package.scala b/samsara/src/main/scala/org/apache/mahout/math/decompositions/package.scala
new file mode 100644
index 0000000..a7b829f
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/decompositions/package.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+/**
+ * This package holds all decomposition and factorization-like methods, all that we were able to make
+ * distributed engine-independent so far, anyway.
+ */
+package object decompositions {
+
+  // ================ In-core decompositions ===================
+
+  /**
+   * In-core SSVD algorithm.
+   *
+   * @param a input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s)
+   */
+  def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = SSVD.ssvd(a, k, p, q)
+
+  /**
+   * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
+   * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
+   * to save some memory for sparse inputs by removing direct mean subtraction.<P>
+   *
+   * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
+   * If retaining distances and orignal scaled variances not that important, the normalized PCA space
+   * is just U.
+   *
+   * Important: data points are considered to be rows.
+   *
+   * @param a input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s)
+   */
+  def spca(a: Matrix, k: Int, p: Int = 15, q: Int = 0) =
+    SSVD.spca(a = a, k = k, p = p, q = q)
+
+  // ============== Distributed decompositions ===================
+
+  /**
+   * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
+   * controlled (<5000 or so). <P>
+   *
+   * It is recommended to checkpoint A since it does two passes over it. <P>
+   *
+   * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
+   * their RDD should be able to zip successfully.
+   */
+  def dqrThin[K: ClassTag](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) =
+    DQR.dqrThin(drmA, checkRankDeficiency)
+
+  /**
+   * Distributed Stochastic Singular Value decomposition algorithm.
+   *
+   * @param drmA input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dssvd[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(drmA, k, p, q)
+
+  /**
+   * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
+   * document of the MAHOUT-817.
+   *
+   * @param drmA input matrix A
+   * @param k request SSVD rank
+   * @param p oversampling parameter
+   * @param q number of power iterations (hint: use either 0 or 1)
+   * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
+   *         e.g. save them to hdfs in order to trigger their computation.
+   */
+  def dspca[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
+  (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(drmA, k, p, q)
+
+  /** Result for distributed ALS-type two-component factorization algorithms */
+  type FactorizationResult[K] = ALS.Result[K]
+
+  /** Result for distributed ALS-type two-component factorization algorithms, in-core matrices */
+  type FactorizationResultInCore = ALS.InCoreResult
+  
+  /**
+   * Run ALS.
+   * <P>
+   *
+   * Example:
+   *
+   * <pre>
+   * val (u,v,errors) = als(input, k).toTuple
+   * </pre>
+   *
+   * ALS runs until (rmse[i-1]-rmse[i])/rmse[i-1] < convergenceThreshold, or i==maxIterations,
+   * whichever earlier.
+   * <P>
+   *
+   * @param drmA The input matrix
+   * @param k required rank of decomposition (number of cols in U and V results)
+   * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this
+   *                             value. If <=0 then we won't compute RMSE and use convergence test.
+   * @param lambda regularization rate
+   * @param maxIterations maximum iterations to run regardless of convergence
+   * @tparam K row key type of the input (100 is probably more than enough)
+   * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result}
+   */
+  def dals[K: ClassTag](
+      drmA: DrmLike[K],
+      k: Int = 50,
+      lambda: Double = 0.0,
+      maxIterations: Int = 10,
+      convergenceThreshold: Double = 0.10
+      ): FactorizationResult[K] =
+    ALS.dals(drmA, k, lambda, maxIterations, convergenceThreshold)
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/BCast.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/BCast.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/BCast.scala
new file mode 100644
index 0000000..850614457
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/drm/BCast.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+/** Broadcast variable abstraction */
+trait BCast[T] {
+  def value:T
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
new file mode 100644
index 0000000..ac763f9
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
@@ -0,0 +1,19 @@
+package org.apache.mahout.math.drm
+
+object CacheHint extends Enumeration {
+
+  type CacheHint = Value
+
+  val NONE,
+  DISK_ONLY,
+  DISK_ONLY_2,
+  MEMORY_ONLY,
+  MEMORY_ONLY_2,
+  MEMORY_ONLY_SER,
+  MEMORY_ONLY_SER_2,
+  MEMORY_AND_DISK,
+  MEMORY_AND_DISK_2,
+  MEMORY_AND_DISK_SER,
+  MEMORY_AND_DISK_SER_2 = Value
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
new file mode 100644
index 0000000..7f97481
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import org.apache.mahout.math.Matrix
+import scala.reflect.ClassTag
+
+/**
+ * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be
+ * therefore collected or saved.
+ * @tparam K matrix key type (e.g. the keys of sequence files once persisted)
+ */
+trait CheckpointedDrm[K] extends DrmLike[K] {
+
+  def collect: Matrix
+
+  def dfsWrite(path: String)
+
+  /** If this checkpoint is already declared cached, uncache. */
+  def uncache(): this.type
+
+  /**
+   * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+   * implementation knows it
+   */
+  def keyClassTag: ClassTag[K]
+
+
+  /** changes the number of rows without touching the underlying data */
+  def newRowCardinality(n: Int): CheckpointedDrm[K]
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
new file mode 100644
index 0000000..8c3911f
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math._
+
+
+/**
+ * Additional experimental operations over CheckpointedDRM implementation. I will possibly move them up to
+ * the DRMBase once they stabilize.
+ *
+ */
+class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
+
+
+  /** Column sums. At this point this runs on checkpoint and collects in-core vector. */
+  def colSums(): Vector = drm.context.colSums(drm)
+
+  /** Column clounts. Counts the non-zero values. At this point this runs on checkpoint and collects in-core vector. */
+  def numNonZeroElementsPerColumn(): Vector = drm.context.numNonZeroElementsPerColumn(drm)
+
+  /** Column Means */
+  def colMeans(): Vector = drm.context.colMeans(drm)
+
+  def norm():Double = drm.context.norm(drm)
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
new file mode 100644
index 0000000..39bab90
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import java.io.Closeable
+
+/** Distributed context (a.k.a. distributed session handle) */
+trait DistributedContext extends Closeable {
+
+  val engine:DistributedEngine
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
new file mode 100644
index 0000000..dd5b101
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import com.google.common.collect.{HashBiMap, BiMap}
+import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetElementReadSchema, DefaultIndexedDatasetReadSchema, Schema, IndexedDataset}
+
+import scala.reflect.ClassTag
+import logical._
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import RLikeDrmOps._
+import DistributedEngine._
+import org.apache.mahout.math.scalabindings._
+import org.apache.log4j.Logger
+
+/** Abstraction of optimizer/distributed engine */
+trait DistributedEngine {
+
+  /**
+   * First optimization pass. Return physical plan that we can pass to exec(). This rewrite may
+   * introduce logical constructs (including engine-specific ones) that user DSL cannot even produce
+   * per se.
+   * <P>
+   *   
+   * A particular physical engine implementation may choose to either use the default rewrites or
+   * build its own rewriting rules.
+   * <P>
+   */
+  def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = pass3(pass2(pass1(action)))
+
+  /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */
+  def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K]
+
+  /** Engine-specific colSums implementation based on a checkpoint. */
+  def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector
+
+  /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */
+  def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector
+
+  /** Engine-specific colMeans implementation based on a checkpoint. */
+  def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector
+
+  def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double
+
+  /** Broadcast support */
+  def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector]
+
+  /** Broadcast support */
+  def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix]
+
+  /**
+   * Load DRM from hdfs (as in Mahout DRM format).
+   * <P/>
+   * @param path The DFS path to load from
+   * @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
+   */
+  def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_]
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+  def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
+      (implicit sc: DistributedContext): CheckpointedDrm[Int]
+
+  /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+  def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
+      (implicit sc: DistributedContext): CheckpointedDrm[String]
+
+  /** This creates an empty DRM with specified number of partitions and cardinality. */
+  def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
+      (implicit sc: DistributedContext): CheckpointedDrm[Int]
+
+  /** Creates empty DRM with non-trivial height */
+  def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
+      (implicit sc: DistributedContext): CheckpointedDrm[Long]
+  /**
+   * Load IndexedDataset from text delimited format.
+   * @param src comma delimited URIs to read from
+   * @param schema defines format of file(s)
+   */
+  def indexedDatasetDFSRead(src: String,
+      schema: Schema = DefaultIndexedDatasetReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit sc: DistributedContext):
+    IndexedDataset
+
+  /**
+   * Load IndexedDataset from text delimited format, one element per line
+   * @param src comma delimited URIs to read from
+   * @param schema defines format of file(s)
+   */
+  def indexedDatasetDFSReadElements(src: String,
+      schema: Schema = DefaultIndexedDatasetElementReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit sc: DistributedContext):
+    IndexedDataset
+
+}
+
+object DistributedEngine {
+
+  private val log = Logger.getLogger(DistributedEngine.getClass)
+
+  /** This is mostly multiplication operations rewrites */
+  private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+
+    action match {
+      case OpAB(OpAt(a), b) if (a == b) => OpAtA(pass1(a))
+      case OpABAnyKey(OpAtAnyKey(a), b) if (a == b) => OpAtA(pass1(a))
+
+      // For now, rewrite left-multiply via transpositions, i.e.
+      // inCoreA %*% B = (B' %*% inCoreA')'
+      case op@OpTimesLeftMatrix(a, b) =>
+        OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t))
+
+      // Add vertical row index concatenation for rbind() on DrmLike[Int] fragments
+      case op@OpRbind(a, b) if (implicitly[ClassTag[K]] == ClassTag.Int) =>
+
+        // Make sure closure sees only local vals, not attributes. We need to do these ugly casts
+        // around because compiler could not infer that K is the same as Int, based on if() above.
+        val ma = safeToNonNegInt(a.nrow)
+        val bAdjusted = new OpMapBlock[Int, Int](
+          A = pass1(b.asInstanceOf[DrmLike[Int]]),
+          bmf = {
+            case (keys, block) => keys.map(_ + ma) -> block
+          },
+          identicallyPartitioned = false
+        )
+        val aAdjusted = a.asInstanceOf[DrmLike[Int]]
+        OpRbind(pass1(aAdjusted), bAdjusted).asInstanceOf[DrmLike[K]]
+
+      // Stop at checkpoints
+      case cd: CheckpointedDrm[_] => action
+
+      // For everything else we just pass-thru the operator arguments to optimizer
+      case uop: AbstractUnaryOp[_, K] =>
+        uop.A = pass1(uop.A)(uop.classTagA)
+        uop
+      case bop: AbstractBinaryOp[_, _, K] =>
+        bop.A = pass1(bop.A)(bop.classTagA)
+        bop.B = pass1(bop.B)(bop.classTagB)
+        bop
+    }
+  }
+
+  /** This would remove stuff like A.t.t that previous step may have created */
+  private def pass2[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+    action match {
+      // A.t.t => A
+      case OpAt(top@OpAt(a)) => pass2(a)(top.classTagA)
+
+      // Stop at checkpoints
+      case cd: CheckpointedDrm[_] => action
+
+      // For everything else we just pass-thru the operator arguments to optimizer
+      case uop: AbstractUnaryOp[_, K] =>
+        uop.A = pass2(uop.A)(uop.classTagA)
+        uop
+      case bop: AbstractBinaryOp[_, _, K] =>
+        bop.A = pass2(bop.A)(bop.classTagA)
+        bop.B = pass2(bop.B)(bop.classTagB)
+        bop
+    }
+  }
+
+  /** Some further rewrites that are conditioned on A.t.t removal */
+  private def pass3[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
+    action match {
+
+      // matrix products.
+      case OpAB(a, OpAt(b)) => OpABt(pass3(a), pass3(b))
+
+      // AtB cases that make sense.
+      case OpAB(OpAt(a), b) if (a.partitioningTag == b.partitioningTag) => OpAtB(pass3(a), pass3(b))
+      case OpABAnyKey(OpAtAnyKey(a), b) => OpAtB(pass3(a), pass3(b))
+
+      // Need some cost to choose between the following.
+
+      case OpAB(OpAt(a), b) => OpAtB(pass3(a), pass3(b))
+      //      case OpAB(OpAt(a), b) => OpAt(OpABt(OpAt(pass1(b)), pass1(a)))
+      case OpAB(a, b) => OpABt(pass3(a), OpAt(pass3(b)))
+
+      // Rewrite A'x
+      case op@OpAx(op1@OpAt(a), x) => OpAtx(pass3(a)(op1.classTagA), x)
+
+      // Stop at checkpoints
+      case cd: CheckpointedDrm[_] => action
+
+      // For everything else we just pass-thru the operator arguments to optimizer
+      case uop: AbstractUnaryOp[_, K] =>
+        uop.A = pass3(uop.A)(uop.classTagA)
+        uop
+      case bop: AbstractBinaryOp[_, _, K] =>
+        bop.A = pass3(bop.A)(bop.classTagA)
+        bop.B = pass3(bop.B)(bop.classTagB)
+        bop
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala
new file mode 100644
index 0000000..e5cf563
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import RLikeDrmOps._
+import scala.reflect.ClassTag
+
+class DrmDoubleScalarOps(val x:Double) extends AnyVal{
+
+  def +[K:ClassTag](that:DrmLike[K]) = that + x
+
+  def *[K:ClassTag](that:DrmLike[K]) = that * x
+
+  def -[K:ClassTag](that:DrmLike[K]) = x -: that
+
+  def /[K:ClassTag](that:DrmLike[K]) = x /: that
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
new file mode 100644
index 0000000..b9c50b0
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+
+
+/**
+ *
+ * Basic spark DRM trait.
+ *
+ * Since we already call the package "sparkbindings", I will not use stem "spark" with classes in
+ * this package. Spark backing is already implied.
+ *
+ */
+trait DrmLike[K] {
+
+  protected[mahout] def partitioningTag: Long
+
+  protected[mahout] def canHaveMissingRows: Boolean
+
+  /**
+   * Distributed context, can be implicitly converted to operations on [[org.apache.mahout.math.drm.
+   * DistributedEngine]].
+   */
+  val context:DistributedContext
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long
+
+  /** R-like syntax for number of columns */
+  def ncol: Int
+
+  /**
+   * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
+   * and writing down Spark graph lineage since last checkpointed DRM.
+   */
+  def checkpoint(cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): CheckpointedDrm[K]
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
new file mode 100644
index 0000000..bc937d6
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.drm.logical.{OpPar, OpMapBlock, OpRowRange}
+
+/** Common Drm ops */
+class DrmLikeOps[K: ClassTag](protected[drm] val drm: DrmLike[K]) {
+
+  /**
+   * Parallelism adjustments. <P/>
+   *
+   * Change only one of parameters from default value to choose new parallelism adjustment strategy.
+   * <P/>
+   *
+   * E.g. use
+   * <pre>
+   *   drmA.par(auto = true)
+   * </pre>
+   * to use automatic parallelism adjustment.
+   * <P/>
+   *
+   * Parallelism here in API is fairly abstract concept, and actual value interpretation is left for
+   * a particular backend strategy. However, it is usually equivalent to number of map tasks or data
+   * splits.
+   * <P/>
+   *
+   * @param min If changed from default, ensures the product has at least that much parallelism.
+   * @param exact if changed from default, ensures the pipeline product has exactly that much
+   *              parallelism.
+   * @param auto If changed from default, engine-specific automatic parallelism adjustment strategy
+   *             is applied.
+   */
+  def par(min: Int = -1, exact: Int = -1, auto: Boolean = false) = {
+    assert(min >= 0 || exact >= 0 || auto, "Invalid argument")
+    OpPar(drm, minSplits = min, exactSplits = exact)
+  }
+
+  /**
+   * Map matrix block-wise vertically. Blocks of the new matrix can be modified original block
+   * matrices; or they could be completely new matrices with new keyset. In the latter case, output
+   * matrix width must be specified with <code>ncol</code> parameter.<P>
+   *
+   * New block heights must be of the same height as the original geometry.<P>
+   *
+   * @param ncol new matrix' width (only needed if width changes).
+   * @param bmf
+   * @tparam R
+   * @return
+   */
+  def mapBlock[R: ClassTag](ncol: Int = -1, identicallyParitioned: Boolean = true)
+      (bmf: BlockMapFunc[K, R]): DrmLike[R] =
+    new OpMapBlock[K, R](
+      A = drm,
+      bmf = bmf,
+      _ncol = ncol,
+      identicallyPartitioned = identicallyParitioned
+    )
+
+
+  /**
+   * Slicing the DRM. Should eventually work just like in-core drm (e.g. A(0 until 5, 5 until 15)).<P>
+   *
+   * The all-range is denoted by '::', e.g.: A(::, 0 until 5).<P>
+   *
+   * Row range is currently unsupported except for the all-range. When it will be fully supported,
+   * the input must be Int-keyed, i.e. of DrmLike[Int] type for non-all-range specifications.
+   *
+   * @param rowRange Row range. This must be '::' (all-range) unless matrix rows are keyed by Int key.
+   * @param colRange col range. Must be a sub-range of <code>0 until ncol</code>. '::' denotes all-range.
+   */
+  def apply(rowRange: Range, colRange: Range): DrmLike[K] = {
+
+    import RLikeDrmOps._
+    import RLikeOps._
+
+    val rowSrc: DrmLike[K] = if (rowRange != ::) {
+
+      if (implicitly[ClassTag[Int]] == implicitly[ClassTag[K]]) {
+
+        assert(rowRange.head >= 0 && rowRange.last < drm.nrow, "rows range out of range")
+        val intKeyed = drm.asInstanceOf[DrmLike[Int]]
+
+        new OpRowRange(A = intKeyed, rowRange = rowRange).asInstanceOf[DrmLike[K]]
+
+      } else throw new IllegalArgumentException("non-all row range is only supported for Int-keyed DRMs.")
+
+    } else drm
+
+    if (colRange != ::) {
+
+      assert(colRange.head >= 0 && colRange.last < drm.ncol, "col range out of range")
+
+      // Use mapBlock operator to do in-core subranging.
+      rowSrc.mapBlock(ncol = colRange.length)({
+        case (keys, block) => keys -> block(::, colRange)
+      })
+
+    } else rowSrc
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
new file mode 100644
index 0000000..380f4eb
--- /dev/null
+++ b/samsara/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.{Vector, Matrix}
+import org.apache.mahout.math.drm.logical._
+
+class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
+
+  import RLikeDrmOps._
+
+  def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "+")
+
+  def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "-")
+
+  def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "*")
+
+  def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "/")
+
+  def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+")
+
+  def +:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+")
+
+  def -(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-")
+
+  def -:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-:")
+
+  def *(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "*")
+
+  def *:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "*")
+
+  def /(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/")
+
+  def /:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/:")
+
+  def :%*%(that: DrmLike[Int]): DrmLike[K] = OpAB[K](A = this.drm, B = that)
+
+  def %*%[B: ClassTag](that: DrmLike[B]): DrmLike[K] = OpABAnyKey[B, K](A = this.drm, B = that)
+
+  def %*%(that: DrmLike[Int]): DrmLike[K] = this :%*% that
+
+  def :%*%(that: Matrix): DrmLike[K] = OpTimesRightMatrix[K](A = this.drm, right = that)
+
+  def %*%(that: Matrix): DrmLike[K] = this :%*% that
+
+  def :%*%(that: Vector): DrmLike[K] = OpAx(A = this.drm, x = that)
+
+  def %*%(that: Vector): DrmLike[K] = :%*%(that)
+
+  def t: DrmLike[Int] = OpAtAnyKey(A = drm)
+
+  def cbind(that: DrmLike[K]) = OpCbind(A = this.drm, B = that)
+
+  def rbind(that: DrmLike[K]) = OpRbind(A = this.drm, B = that)
+}
+
+class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
+
+  import org.apache.mahout.math._
+  import scalabindings._
+  import RLikeOps._
+  import RLikeDrmOps._
+  import scala.collection.JavaConversions._
+
+  override def t: DrmLike[Int] = OpAt(A = drm)
+
+  def %*%:[K: ClassTag](that: DrmLike[K]): DrmLike[K] = OpAB[K](A = that, B = this.drm)
+
+  def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm)
+
+  /** Row sums. This is of course applicable to Int-keyed distributed matrices only. */
+  def rowSums(): Vector = {
+    drm.mapBlock(ncol = 1) { case (keys, block) =>
+      // Collect block-wise rowsums and output them as one-column matrix.
+      keys -> dense(block.rowSums).t
+    }
+      .collect(::, 0)
+  }
+
+  /** Counts the non-zeros elements in each row returning a vector of the counts */
+  def numNonZeroElementsPerRow(): Vector = {
+    drm.mapBlock(ncol = 1) { case (keys, block) =>
+      // Collect block-wise row non-zero counts and output them as a one-column matrix.
+      keys -> dense(block.numNonZeroElementsPerRow).t
+    }
+      .collect(::, 0)
+  }
+
+  /** Row means */
+  def rowMeans(): Vector = {
+    drm.mapBlock(ncol = 1) { case (keys, block) =>
+      // Collect block-wise row means and output them as one-column matrix.
+      keys -> dense(block.rowMeans).t
+    }
+        .collect(::, 0)
+  }
+
+  /** Return diagonal vector */
+  def diagv: Vector = {
+    require(drm.ncol == drm.nrow, "Must be square to extract diagonal")
+    drm.mapBlock(ncol = 1) { case (keys, block) =>
+      keys -> dense(for (r <- block.view) yield r(keys(r.index))).t
+    }
+        .collect(::, 0)
+  }
+
+}
+
+object RLikeDrmOps {
+
+  implicit def double2ScalarOps(x:Double) = new DrmDoubleScalarOps(x)
+
+  implicit def drmInt2RLikeOps(drm: DrmLike[Int]): RLikeDrmIntOps = new RLikeDrmIntOps(drm)
+
+  implicit def drm2RLikeOps[K: ClassTag](drm: DrmLike[K]): RLikeDrmOps[K] = new RLikeDrmOps[K](drm)
+
+  implicit def rlikeOps2Drm[K: ClassTag](ops: RLikeDrmOps[K]): DrmLike[K] = ops.drm
+
+  implicit def ops2Drm[K: ClassTag](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm
+
+  // Removed in move to 1.2.1 PR #74 https://github.com/apache/mahout/pull/74/files
+  // Not sure why.
+  // implicit def cp2cpops[K: ClassTag](cp: CheckpointedDrm[K]): CheckpointedOps[K] = new CheckpointedOps(cp)
+
+  /**
+   * This is probably dangerous since it triggers implicit checkpointing with default storage level
+   * setting.
+   */
+  implicit def drm2cpops[K: ClassTag](drm: DrmLike[K]): CheckpointedOps[K] = new CheckpointedOps(drm.checkpoint())
+}


Mime
View raw message