mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smar...@apache.org
Subject [15/37] mahout git commit: MAHOUT-1660 MAHOUT-1713 MAHOUT-1714 MAHOUT-1715 MAHOUT-1716 MAHOUT-1717 MAHOUT-1718 MAHOUT-1719 MAHOUT-1720 MAHOUT-1721 MAHOUT-1722 MAHOUT-1723 MAHOUT-1724 MAHOUT-1725 MAHOUT-1726 MAHOUT-1727 MAHOUT-1728 MAHOUT-1729 MAHOUT-1730
Date Sat, 01 Aug 2015 03:27:02 GMT
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 595cd66..41e966b 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -24,51 +24,59 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm.logical._
-import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput}
+import org.apache.mahout.sparkbindings.drm.{cpDrmGeneric2DrmRddInput, CheckpointedDrmSpark, DrmRddInput}
 import org.apache.mahout.math._
+import scala.Predef
 import scala.reflect.ClassTag
+import scala.reflect.classTag
 import org.apache.spark.storage.StorageLevel
 import org.apache.mahout.sparkbindings.blas._
 import org.apache.hadoop.io._
-import scala.Some
-import scala.collection.JavaConversions._
+import collection._
+import JavaConversions._
 import org.apache.mahout.math.drm._
 import org.apache.mahout.math.drm.RLikeDrmOps._
 import org.apache.spark.rdd.RDD
 import org.apache.mahout.common.{Hadoop1HDFSUtil, HDFSUtil}
 
+
 /** Spark-specific non-drm-method operations */
 object SparkEngine extends DistributedEngine {
 
   // By default, use Hadoop 1 utils
   var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
 
-  def colSums[K:ClassTag](drm: CheckpointedDrm[K]): Vector = {
+  def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
     val n = drm.ncol
 
     drm.rdd
+
       // Throw away keys
       .map(_._2)
+
       // Fold() doesn't work with kryo still. So work around it.
-      .mapPartitions(iter => {
-      val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) => acc += v)
+      .mapPartitions(iter ⇒ {
+      val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) ⇒  acc += v)
       Iterator(acc)
     })
+
       // Since we preallocated new accumulator vector per partition, this must not cause any side
       // effects now.
       .reduce(_ += _)
   }
 
-  def numNonZeroElementsPerColumn[K:ClassTag](drm: CheckpointedDrm[K]): Vector = {
+  def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
     val n = drm.ncol
 
     drm.rdd
+
       // Throw away keys
       .map(_._2)
+
       // Fold() doesn't work with kryo still. So work around it.
-      .mapPartitions(iter => {
-      val acc = ((new DenseVector(n): Vector) /: iter) { (acc, v) =>
-        v.nonZeroes().foreach { elem => acc(elem.index) += 1 }
+      .mapPartitions(iter ⇒ {
+      val acc = ((new DenseVector(n): Vector) /: iter) { (acc, v) ⇒
+        v.nonZeroes().foreach { elem ⇒  acc(elem.index) += 1}
         acc
       }
       Iterator(acc)
@@ -79,17 +87,25 @@ object SparkEngine extends DistributedEngine {
   }
 
   /** Engine-specific colMeans implementation based on a checkpoint. */
-  override def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector =
+  override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector =
     if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
 
   override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double =
     drm.rdd
-        // Compute sum of squares of each vector
-        .map {
-      case (key, v) => v dot v
+      // Compute sum of squares of each vector
+      .map {
+      case (key, v) ⇒ v dot v
     }
-        .reduce(_ + _)
+      .reduce(_ + _)
+
 
+  /** Optional engine-specific all reduce tensor operation. */
+  override def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf:
+  BlockReduceFunc): Matrix = {
+
+    import drm._
+    drm.toBlockifiedDrmRdd(ncol = drm.ncol).map(bmf(_)).reduce(rf)
+  }
 
   /**
    * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
@@ -104,10 +120,10 @@ object SparkEngine extends DistributedEngine {
   def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
 
     // Spark-specific Physical Plan translation.
-    val rdd = tr2phys(plan)
+    val rddInput = tr2phys(plan)
 
     val newcp = new CheckpointedDrmSpark(
-      rdd = rdd,
+      rddInput = rddInput,
       _nrow = plan.nrow,
       _ncol = plan.ncol,
       _cacheStorageLevel = cacheHint2Spark(ch),
@@ -131,7 +147,13 @@ object SparkEngine extends DistributedEngine {
    *
    * @return DRM[Any] where Any is automatically translated to value type
    */
-  def drmDfsRead (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = {
+  def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = {
+
+    // Require that context is actually Spark context.
+    require(sc.isInstanceOf[SparkDistributedContext], "Supplied context must be for the Spark backend.")
+
+    // Extract spark context -- we need it for some operations.
+    implicit val ssc = sc.asInstanceOf[SparkDistributedContext].sc
 
     val drmMetadata = hdfsUtils.readDrmHeader(path)
     val k2vFunc = drmMetadata.keyW2ValFunc
@@ -140,8 +162,8 @@ object SparkEngine extends DistributedEngine {
     // Hadoop we must do it right after read operation).
     val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions = parMin)
 
-        // Immediately convert keys and value writables into value types.
-        .map { case (wKey, wVec) => k2vFunc(wKey) -> wVec.get()}
+      // Immediately convert keys and value writables into value types.
+      .map { case (wKey, wVec) ⇒ k2vFunc(wKey) -> wVec.get()}
 
     // Wrap into a DRM type with correct matrix row key class tag evident.
     drmWrap(rdd = rdd, cacheHint = CacheHint.NONE)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]])
@@ -149,67 +171,141 @@ object SparkEngine extends DistributedEngine {
 
   /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
   def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
-      (implicit sc: DistributedContext)
+                                  (implicit sc: DistributedContext)
   : CheckpointedDrm[Int] = {
-    new CheckpointedDrmSpark(rdd = parallelizeInCore(m, numPartitions))
+    new CheckpointedDrmSpark(rddInput = parallelizeInCore(m, numPartitions), _nrow = m.nrow, _ncol = m.ncol)
   }
 
   private[sparkbindings] def parallelizeInCore(m: Matrix, numPartitions: Int = 1)
-      (implicit sc: DistributedContext): DrmRdd[Int] = {
+                                              (implicit sc: DistributedContext): DrmRdd[Int] = {
 
-    val p = (0 until m.nrow).map(i => i -> m(i, ::))
+    val p = (0 until m.nrow).map(i => i → m(i, ::))
     sc.parallelize(p, numPartitions)
 
   }
 
   /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
   def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
-      (implicit sc: DistributedContext)
+                                 (implicit sc: DistributedContext)
   : CheckpointedDrm[String] = {
 
     val rb = m.getRowLabelBindings
-    val p = for (i: String <- rb.keySet().toIndexedSeq) yield i -> m(rb(i), ::)
+    val p = for (i: String ← rb.keySet().toIndexedSeq) yield i → m(rb(i), ::)
 
-    new CheckpointedDrmSpark(rdd = sc.parallelize(p, numPartitions))
+    new CheckpointedDrmSpark(rddInput = sc.parallelize(p, numPartitions), _nrow = m.nrow, _ncol = m.ncol)
   }
 
   /** This creates an empty DRM with specified number of partitions and cardinality. */
   def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
-      (implicit sc: DistributedContext): CheckpointedDrm[Int] = {
-    val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
+                         (implicit sc: DistributedContext): CheckpointedDrm[Int] = {
+    val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part ⇒ {
       val partNRow = (nrow - 1) / numPartitions + 1
       val partStart = partNRow * part
       val partEnd = Math.min(partStart + partNRow, nrow)
 
-      for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+      for (i ← partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
     })
     new CheckpointedDrmSpark[Int](rdd, nrow, ncol)
   }
 
   def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
-      (implicit sc: DistributedContext): CheckpointedDrm[Long] = {
-    val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
+                             (implicit sc: DistributedContext): CheckpointedDrm[Long] = {
+    val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part ⇒ {
       val partNRow = (nrow - 1) / numPartitions + 1
       val partStart = partNRow * part
       val partEnd = Math.min(partStart + partNRow, nrow)
 
-      for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+      for (i ← partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
     })
     new CheckpointedDrmSpark[Long](rdd, nrow, ncol)
   }
 
+  /**
+   * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys
+   * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix.
+   */
+  override def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) = {
+    if (classTag[K] == ClassTag.Int) {
+      drmX.asInstanceOf[DrmLike[Int]] → None
+    } else {
+
+      val drmXcp = drmX.checkpoint(CacheHint.MEMORY_ONLY)
+      val ncol = drmXcp.asInstanceOf[CheckpointedDrmSpark[K]]._ncol
+      val nrow = drmXcp.asInstanceOf[CheckpointedDrmSpark[K]]._nrow
+
+      // Compute sequential int key numbering.
+      val (intRdd, keyMap) = blas.rekeySeqInts(rdd = drmXcp.rdd, computeMap = computeMap)
+
+      // Convert computed key mapping to a matrix.
+      val mxKeyMap = keyMap.map { rdd =>
+        drmWrap(rdd = rdd.map { case (key, ordinal) ⇒ key → (dvec(ordinal):Vector)}, ncol = 1, nrow = nrow)
+      }
+
+
+      drmWrap(rdd = intRdd, ncol = ncol) → mxKeyMap
+  }
+
+  }
+
+
+  /**
+   * (Optional) Sampling operation. Consistent with Spark semantics of the same.
+   * @param drmX
+   * @param fraction
+   * @param replacement
+   * @tparam K
+   * @return
+   */
+  override def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean): DrmLike[K] = {
+
+    // We do want to take ncol if already computed, if not, then we don't want to trigger computation
+    // here.
+    val ncol = drmX match {
+      case cp: CheckpointedDrmSpark[K] ⇒ cp._ncol
+      case _ ⇒ -1
+    }
+    val sample = drmX.rdd.sample(withReplacement = replacement, fraction = fraction)
+    if (classTag[K] != ClassTag.Int) return drmWrap(sample, ncol = ncol)
+
+    // K == Int: Int-keyed sample. rebase int counts.
+    drmWrap(rdd = blas.rekeySeqInts(rdd = sample, computeMap = false)._1, ncol = ncol).asInstanceOf[DrmLike[K]]
+  }
+
+
+  override def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, replacement: Boolean): Matrix = {
+
+    val ncol = drmX match {
+      case cp: CheckpointedDrmSpark[K] ⇒ cp._ncol
+      case _ ⇒ -1
+    }
+
+    // I think as of the time of this writing, takeSample() in Spark is biased. It is not a true
+    // hypergeometric sampler. But it is faster than a true hypergeometric/categorical samplers
+    // would be.
+    val sample = drmX.rdd.takeSample(withReplacement = replacement, num = numSamples)
+    val isSparse = sample.exists { case (_, vec) ⇒ !vec.isDense }
+
+    val vectors = sample.map(_._2)
+    val labels = sample.view.zipWithIndex.map { case ((key, _), idx) ⇒ key.toString → (idx:Integer) }.toMap
+
+    val mx:Matrix = if (isSparse) sparse(vectors:_*) else dense(vectors)
+    mx.setRowLabelBindings(labels)
+
+    mx
+  }
+
   private[mahout] def cacheHint2Spark(cacheHint: CacheHint.CacheHint): StorageLevel = cacheHint match {
-    case CacheHint.NONE => StorageLevel.NONE
-    case CacheHint.DISK_ONLY => StorageLevel.DISK_ONLY
-    case CacheHint.DISK_ONLY_2 => StorageLevel.DISK_ONLY_2
-    case CacheHint.MEMORY_ONLY => StorageLevel.MEMORY_ONLY
-    case CacheHint.MEMORY_ONLY_2 => StorageLevel.MEMORY_ONLY_2
-    case CacheHint.MEMORY_ONLY_SER => StorageLevel.MEMORY_ONLY_SER
-    case CacheHint.MEMORY_ONLY_SER_2 => StorageLevel.MEMORY_ONLY_SER_2
-    case CacheHint.MEMORY_AND_DISK => StorageLevel.MEMORY_AND_DISK
-    case CacheHint.MEMORY_AND_DISK_2 => StorageLevel.MEMORY_AND_DISK_2
-    case CacheHint.MEMORY_AND_DISK_SER => StorageLevel.MEMORY_AND_DISK_SER
-    case CacheHint.MEMORY_AND_DISK_SER_2 => StorageLevel.MEMORY_AND_DISK_SER_2
+    case CacheHint.NONE ⇒ StorageLevel.NONE
+    case CacheHint.DISK_ONLY ⇒ StorageLevel.DISK_ONLY
+    case CacheHint.DISK_ONLY_2 ⇒ StorageLevel.DISK_ONLY_2
+    case CacheHint.MEMORY_ONLY ⇒ StorageLevel.MEMORY_ONLY
+    case CacheHint.MEMORY_ONLY_2 ⇒ StorageLevel.MEMORY_ONLY_2
+    case CacheHint.MEMORY_ONLY_SER ⇒ StorageLevel.MEMORY_ONLY_SER
+      case CacheHint.MEMORY_ONLY_SER_2 ⇒ StorageLevel.MEMORY_ONLY_SER_2
+    case CacheHint.MEMORY_AND_DISK ⇒ StorageLevel.MEMORY_AND_DISK
+    case CacheHint.MEMORY_AND_DISK_2 ⇒ StorageLevel.MEMORY_AND_DISK_2
+    case CacheHint.MEMORY_AND_DISK_SER ⇒ StorageLevel.MEMORY_AND_DISK_SER
+    case CacheHint.MEMORY_AND_DISK_SER_2 ⇒ StorageLevel.MEMORY_AND_DISK_SER_2
   }
 
   /** Translate previously optimized physical plan */
@@ -221,31 +317,32 @@ object SparkEngine extends DistributedEngine {
       // If there are any such cases, they must go away in pass1. If they were not, then it wasn't
       // the A'A case but actual transposition intent which should be removed from consideration
       // (we cannot do actual flip for non-int-keyed arguments)
-      case OpAtAnyKey(_) =>
+      case OpAtAnyKey(_) ⇒
         throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.")
-      case op@OpAt(a) => At.at(op, tr2phys(a)(op.classTagA))
-      case op@OpABt(a, b) => ABt.abt(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAtB(a, b) => AtB.atb_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB),
-        zippable = a.partitioningTag == b.partitioningTag)
-      case op@OpAtA(a) => AtA.at_a(op, tr2phys(a)(op.classTagA))
-      case op@OpAx(a, x) => Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
-      case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
-      case op@OpAewB(a, b, opId) => AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpCbind(a, b) => CbindAB.cbindAB_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpRbind(a, b) => RbindAB.rbindAB(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAewScalar(a, s, _) => AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s)
-      case op@OpRowRange(a, _) => Slicing.rowRange(op, tr2phys(a)(op.classTagA))
-      case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
+      case op@OpAt(a) ⇒ At.at(op, tr2phys(a)(op.classTagA))
+      case op@OpABt(a, b) ⇒ ABt.abt(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAtB(a, b) ⇒ AtB.atb(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAtA(a) ⇒ AtA.at_a(op, tr2phys(a)(op.classTagA))
+      case op@OpAx(a, x) ⇒ Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
+      case op@OpAtx(a, x) ⇒ Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
+      case op@OpAewUnaryFunc(a, _, _) ⇒ AewB.a_ew_func(op, tr2phys(a)(op.classTagA))
+      case op@OpAewUnaryFuncFusion(a, _) ⇒ AewB.a_ew_func(op, tr2phys(a)(op.classTagA))
+      case op@OpAewB(a, b, opId) ⇒ AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpCbind(a, b) ⇒ CbindAB.cbindAB_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpCbindScalar(a, _, _) ⇒ CbindAB.cbindAScalar(op, tr2phys(a)(op.classTagA))
+      case op@OpRbind(a, b) ⇒ RbindAB.rbindAB(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpAewScalar(a, s, _) ⇒ AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s)
+      case op@OpRowRange(a, _) ⇒ Slicing.rowRange(op, tr2phys(a)(op.classTagA))
+      case op@OpTimesRightMatrix(a, _) ⇒ AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
       // Custom operators, we just execute them
-      case blockOp: OpMapBlock[K, _] => MapBlock.exec(
+      case blockOp: OpMapBlock[K, _] ⇒ MapBlock.exec(
         src = tr2phys(blockOp.A)(blockOp.classTagA),
-        ncol = blockOp.ncol,
-        bmf = blockOp.bmf
+        operator = blockOp
       )
-      case op@OpPar(a,_,_) => Par.exec(op,tr2phys(a)(op.classTagA))
-      case cp: CheckpointedDrm[K] => new DrmRddInput[K](rowWiseSrc = Some((cp.ncol, cp.rdd)))
-      case _ => throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s."
-          .format(oper))
+      case op@OpPar(a, _, _) ⇒ Par.exec(op, tr2phys(a)(op.classTagA))
+      case cp: CheckpointedDrm[K] ⇒ cp.rdd: DrmRddInput[K]
+      case _ ⇒ throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s."
+        .format(oper))
 
     }
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 1e3f286..11e2bad 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -19,16 +19,23 @@ package org.apache.mahout.sparkbindings.blas
 
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
+import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 import org.apache.mahout.sparkbindings._
-import drm._
-import org.apache.mahout.math.{Matrix, SparseRowMatrix}
+import org.apache.mahout.math.drm.BlockifiedDrmTuple
+import org.apache.mahout.sparkbindings.drm._
+import org.apache.mahout.math.{SparseMatrix, Matrix, SparseRowMatrix}
 import org.apache.spark.SparkContext._
 import org.apache.mahout.math.drm.logical.OpABt
+import org.apache.mahout.logging._
+
+import scala.tools.nsc.io.Pickler.TildeDecorator
 
 /** Contains RDD plans for ABt operator */
 object ABt {
 
+  private final implicit val log = getLog(ABt.getClass)
+
   /**
    * General entry point for AB' operator.
    *
@@ -40,8 +47,11 @@ object ABt {
   def abt[K: ClassTag](
       operator: OpABt[K],
       srcA: DrmRddInput[K],
-      srcB: DrmRddInput[Int]): DrmRddInput[K] =
+      srcB: DrmRddInput[Int]): DrmRddInput[K] = {
+
+    debug("operator AB'(Spark)")
     abt_nograph(operator, srcA, srcB)
+  }
 
   /**
    * Computes AB' without GraphX.
@@ -63,7 +73,146 @@ object ABt {
       srcB: DrmRddInput[Int]): DrmRddInput[K] = {
 
     // Blockify everything.
-    val blocksA = srcA.toBlockifiedDrmRdd()
+    val blocksA = srcA.toBlockifiedDrmRdd(operator.A.ncol)
+
+    val blocksB = srcB.toBlockifiedDrmRdd(operator.B.ncol)
+
+    val prodNCol = operator.ncol
+    val prodNRow = operator.nrow
+    // We are actually computing AB' here. 
+    val numProductPartitions = estimateProductPartitions(anrow = prodNRow, ancol = operator.A.ncol,
+      bncol = prodNCol, aparts = blocksA.partitions.size, bparts = blocksB.partitions.size)
+
+    debug(
+      s"AB': #parts = $numProductPartitions; A #parts=${blocksA.partitions.size}, B #parts=${blocksB.partitions.size}."+
+      s"A=${operator.A.nrow}x${operator.A.ncol}, B=${operator.B.nrow}x${operator.B.ncol},AB'=${prodNRow}x$prodNCol."
+    )
+
+    // blockwise multimplication function
+    def mmulFunc(tupleA: BlockifiedDrmTuple[K], tupleB: BlockifiedDrmTuple[Int]): (Array[K], Array[Int], Matrix) = {
+      val (keysA, blockA) = tupleA
+      val (keysB, blockB) = tupleB
+
+      var ms = traceDo(System.currentTimeMillis())
+
+      // We need to send keysB to the aggregator in order to know which columns are being updated.
+      val result = (keysA, keysB, (blockA %*% blockB.t))
+
+      ms = traceDo(System.currentTimeMillis() - ms.get)
+      trace(
+        s"block multiplication of(${blockA.nrow}x${blockA.ncol} x ${blockB.ncol}x${blockB.nrow} is completed in $ms " +
+          "ms.")
+      trace(s"block multiplication types: blockA: ${blockA.getClass.getName}(${blockA.t.getClass.getName}); " +
+        s"blockB: ${blockB.getClass.getName}.")
+
+      result
+    }
+
+    val blockwiseMmulRdd =
+
+    // Combine blocks pairwise.
+      pairwiseApply(blocksA, blocksB, mmulFunc _)
+
+        // Now reduce proper product blocks.
+        .combineByKey(
+
+          // Empty combiner += value
+          createCombiner = (t: (Array[K], Array[Int], Matrix)) =>  {
+            val (rowKeys, colKeys, block) = t
+            val comb = new SparseMatrix(prodNCol, block.nrow).t
+
+            for ((col, i) <- colKeys.zipWithIndex) comb(::, col) := block(::, i)
+            rowKeys -> comb
+          },
+
+          // Combiner += value
+          mergeValue = (comb: (Array[K], Matrix), value: (Array[K], Array[Int], Matrix)) => {
+            val (rowKeys, c) = comb
+            val (_, colKeys, block) = value
+            for ((col, i) <- colKeys.zipWithIndex) c(::, col) := block(::, i)
+            comb
+          },
+
+          // Combiner + Combiner
+          mergeCombiners = (comb1: (Array[K], Matrix), comb2: (Array[K], Matrix)) => {
+            comb1._2 += comb2._2
+            comb1
+          },
+
+          numPartitions = blocksA.partitions.size max blocksB.partitions.size
+        )
+
+
+    // Created BlockifiedRDD-compatible structure.
+    val blockifiedRdd = blockwiseMmulRdd
+
+      // throw away A-partition #
+      .map{case (_,tuple) => tuple}
+
+    val numPartsResult = blockifiedRdd.partitions.size
+
+    // See if we need to rebalance away from A granularity.
+    if (numPartsResult * 2 < numProductPartitions || numPartsResult / 2 > numProductPartitions) {
+
+      debug(s"Will re-coalesce from ${numPartsResult} to ${numProductPartitions}")
+
+      val rowRdd = deblockify(blockifiedRdd).coalesce(numPartitions = numProductPartitions)
+
+      rowRdd
+
+    } else {
+
+      // We don't have a terribly different partition
+      blockifiedRdd
+    }
+
+  }
+
+  /**
+   * This function tries to use join instead of cartesian to group blocks together without bloating
+   * the number of partitions. Hope is that we can apply pairwise reduction of block pair right away
+   * so if the data to one of the join parts is streaming, the result is still fitting to memory,
+   * since result size is much smaller than the operands.
+   *
+   * @param blocksA blockified RDD for A
+   * @param blocksB blockified RDD for B
+   * @param blockFunc a function over (blockA, blockB). Implies `blockA %*% blockB.t` but perhaps may be
+   *                  switched to another scheme based on which of the sides, A or B, is bigger.
+   */
+  private def pairwiseApply[K1, K2, T](blocksA: BlockifiedDrmRdd[K1], blocksB: BlockifiedDrmRdd[K2], blockFunc:
+  (BlockifiedDrmTuple[K1], BlockifiedDrmTuple[K2]) => T): RDD[(Int, T)] = {
+
+    // We will be joining blocks in B to blocks in A using A-partition as a key.
+
+    // Prepare A side.
+    val blocksAKeyed = blocksA.mapPartitionsWithIndex { (part, blockIter) =>
+
+      val r = if (blockIter.hasNext) Some(part -> blockIter.next) else Option.empty[(Int, BlockifiedDrmTuple[K1])]
+
+      require(blockIter.hasNext == false, s"more than 1 (${blockIter.size + 1}) blocks per partition and A of AB'")
+
+      r.toIterator
+    }
+
+    // Prepare B-side.
+    val aParts = blocksA.partitions.size
+    val blocksBKeyed = blocksB.flatMap(bTuple => for (blockKey <- (0 until aParts).view) yield blockKey -> bTuple )
+
+    // Perform the inner join. Let's try to do a simple thing now.
+    blocksAKeyed.join(blocksBKeyed, numPartitions = aParts)
+
+    // Apply product function which should produce smaller products. Hopefully, this streams blockB's in
+    .map{case (partKey,(blockA, blockB)) => partKey -> blockFunc(blockA, blockB)}
+
+  }
+
+  private[blas] def abt_nograph_cart[K: ClassTag](
+      operator: OpABt[K],
+      srcA: DrmRddInput[K],
+      srcB: DrmRddInput[Int]): DrmRddInput[K] = {
+
+    // Blockify everything.
+    val blocksA = srcA.toBlockifiedDrmRdd(operator.A.ncol)
 
         // Mark row-blocks with group id
         .mapPartitionsWithIndex((part, iter) => {
@@ -83,28 +232,35 @@ object ABt {
       }
     })
 
-    val blocksB = srcB.toBlockifiedDrmRdd()
+    val blocksB = srcB.toBlockifiedDrmRdd(operator.B.ncol)
 
     // Final product's geometry. We want to extract that into local variables since we want to use
     // them as closure attributes.
     val prodNCol = operator.ncol
     val prodNRow = operator.nrow
-    
-    // Approximate number of final partitions.
-    val numProductPartitions =
-      if (blocksA.partitions.size > blocksB.partitions.size) {
-        ((prodNCol.toDouble / operator.A.ncol) * blocksA.partitions.size).ceil.toInt
-      } else {
-        ((prodNRow.toDouble / operator.B.ncol) * blocksB.partitions.size).ceil.toInt
-      }
+    val aNCol = operator.A.ncol
+
+    // Approximate number of final partitions. We take bigger partitions as our guide to number of
+    // elements per partition. TODO: do it better.
 
-    //srcA.partitions.size.max(that = srcB.partitions.size)
+    // Elements per partition, bigger of two operands.
+    val epp = aNCol.toDouble * prodNRow / blocksA.partitions.size max aNCol.toDouble * prodNCol /
+      blocksB.partitions.size
 
+    // Number of partitions we want to converge to in the product. For now we simply extrapolate that
+    // assuming product density and operand densities being about the same; and using the same element
+    // per partition number in the product as the bigger of two operands.
+    val numProductPartitions = (prodNCol.toDouble * prodNRow / epp).ceil.toInt
+
+    debug(
+      s"AB': #parts = $numProductPartitions; A #parts=${blocksA.partitions.size}, B #parts=${blocksB.partitions.size}.")
 
     // The plan.
-    var blockifiedRdd :BlockifiedDrmRdd[K] = blocksA
+    var blockifiedRdd: BlockifiedDrmRdd[K] = blocksA
 
-        // Build Cartesian. It may require a bit more memory there at tasks.
+        // Build Cartesian. It generates a LOT of tasks. TODO: figure how to fix performance of AB'
+        // operator. The thing is that product after map is really small one (partition fraction x
+        // partition fraction) so they can be combined into much bigger chunks.
         .cartesian(blocksB)
 
         // Multiply blocks
@@ -126,10 +282,14 @@ object ABt {
         .combineByKey[(Array[K],Matrix)](
 
           createCombiner = (t: (Array[K], Array[Int], Matrix)) => t match {
+
+            // Create combiner structure out of two products. Our combiner is sparse row matrix
+            // initialized to final product partition block dimensions.
             case (rowKeys, colKeys, blockProd) =>
 
-              // Accumulator is a row-wise block of sparse vectors.
-              val acc:Matrix = new SparseRowMatrix(rowKeys.size, prodNCol)
+              // Accumulator is a row-wise block of sparse vectors. Since we assign to columns,
+              // the most efficient is perhaps to create column-oriented block here.
+              val acc:Matrix = new SparseRowMatrix(prodNCol, rowKeys.size).t
 
               // Update accumulator using colKeys as column index indirection
               colKeys.view.zipWithIndex.foreach({
@@ -168,6 +328,8 @@ object ABt {
     // having at most one block per partition.
     blockifiedRdd = rbind(blockifiedRdd)
 
-    new DrmRddInput(blockifiedSrc = Some(blockifiedRdd))
+    blockifiedRdd
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index 3cdb797..8a90398 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -20,19 +20,22 @@ package org.apache.mahout.sparkbindings.blas
 import org.apache.mahout.sparkbindings.drm.DrmRddInput
 import scala.reflect.ClassTag
 import org.apache.spark.SparkContext._
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.{SequentialAccessSparseVector, Matrix, Vector}
-import org.apache.mahout.math.drm.logical.{OpAewScalar, OpAewB}
-import org.apache.log4j.Logger
+import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, TEwFunc, OpAewScalar, OpAewB}
 import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFuncScalar, ReduceFunc}
 import org.apache.mahout.sparkbindings.{BlockifiedDrmRdd, DrmRdd, drm}
 import org.apache.mahout.math.drm._
+import org.apache.mahout.logging._
+import collection._
+import JavaConversions._
 
 /** Elementwise drm-drm operators */
 object AewB {
 
-  private val log = Logger.getLogger(AewB.getClass)
+  private final implicit val log = getLog(AewB.getClass)
 
   /**
    * Set to false to disallow in-place elementwise operations in case side-effects and non-idempotent
@@ -44,10 +47,10 @@ object AewB {
 
   type ReduceFuncScalar = (Matrix, Double) => Matrix
 
-  private[blas] def getEWOps() = {
-    val inplaceProp = System.getProperty(PROPERTY_AEWB_INPLACE, "true").toBoolean
-    if (inplaceProp) InplaceEWOps else CloningEWOps
-  }
+  private[blas] def ewInplace(): Boolean = System.getProperty(PROPERTY_AEWB_INPLACE, "false").toBoolean
+
+  private[blas] def getEWOps() = if (ewInplace()) InplaceEWOps else CloningEWOps
+
 
   /** Elementwise matrix-matrix operator, now handles both non- and identically partitioned */
   def a_ew_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
@@ -67,12 +70,14 @@ object AewB {
     val a = srcA.toDrmRdd()
     val b = srcB.toDrmRdd()
 
+    debug(s"A${op.op}B: #partsA=${a.partitions.size},#partsB=${b.partitions.size}.")
+
     // Check if A and B are identically partitioned AND keyed. if they are, then just perform zip
     // instead of join, and apply the op map-side. Otherwise, perform join and apply the op
     // reduce-side.
     val rdd = if (op.isIdenticallyPartitioned(op.A)) {
 
-      log.debug("applying zipped elementwise")
+      debug(s"A${op.op}B:applying zipped elementwise")
 
       a
           .zip(b)
@@ -83,7 +88,7 @@ object AewB {
       }
     } else {
 
-      log.debug("applying elementwise as join")
+      debug("A${op.op}B:applying elementwise as join")
 
       a
           // Full outer-join operands row-wise
@@ -103,13 +108,51 @@ object AewB {
       })
     }
 
-    new DrmRddInput(rowWiseSrc = Some(ncol -> rdd))
+    rdd
+  }
+
+  def a_ew_func[K:ClassTag](op:AbstractUnaryOp[K,K] with TEwFunc, srcA: DrmRddInput[K]):DrmRddInput[K] = {
+
+    val evalZeros = op.evalZeros
+    val inplace = ewInplace()
+    val f = op.f
+
+    // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing
+    // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar.
+    val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows && evalZeros) {
+      val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
+      drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
+    } else {
+      srcA.toBlockifiedDrmRdd(op.A.ncol)
+    }
+
+    val rdd = aBlockRdd.map {case (keys, block) =>
+
+      // Do inplace or allocate a new copy?
+      val newBlock = if (inplace) block else block cloned
+
+      // Operation cares about zeros?
+      if (evalZeros) {
+
+        // Yes, we evaluate all:
+        newBlock := ((_,_,x)=>f(x))
+      } else {
+
+        // No, evaluate non-zeros only row-wise
+        for (row <- newBlock; el <- row.nonZeroes) el := f(el.get)
+      }
+
+      keys -> newBlock
+    }
+
+    rdd
   }
 
   /** Physical algorithm to handle matrix-scalar operators like A - s or s -: A */
   def a_ew_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double):
   DrmRddInput[K] = {
 
+
     val ewOps = getEWOps()
     val opId = op.op
 
@@ -129,15 +172,17 @@ object AewB {
       val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
       drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
     } else {
-      srcA.toBlockifiedDrmRdd()
+      srcA.toBlockifiedDrmRdd(op.A.ncol)
     }
 
+    debug(s"A${op.op}$scalar: #parts=${aBlockRdd.partitions.size}.")
+
     val rdd = aBlockRdd
-        .map({
+        .map {
       case (keys, block) => keys -> reduceFunc(block, scalar)
-    })
+    }
 
-    new DrmRddInput[K](blockifiedSrc = Some(rdd))
+    rdd
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
index c923e62..5f9f84a 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
@@ -6,13 +6,17 @@ import scalabindings._
 import RLikeOps._
 import org.apache.mahout.sparkbindings._
 import org.apache.mahout.sparkbindings.drm._
+import org.apache.mahout.logging._
 import scala.reflect.ClassTag
 import org.apache.mahout.math.DiagonalMatrix
 import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
 
+
 /** Matrix product with one of operands an in-core matrix */
 object AinCoreB {
 
+  private final implicit val log = getLog(AinCoreB.getClass)
+
   def rightMultiply[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
     if ( op.right.isInstanceOf[DiagonalMatrix])
       rightMultiply_diag(op, srcA)
@@ -21,23 +25,27 @@ object AinCoreB {
   }
 
   private def rightMultiply_diag[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
-    val rddA = srcA.toBlockifiedDrmRdd()
+    val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
     implicit val ctx:DistributedContext = rddA.context
     val dg = drmBroadcast(op.right.viewDiagonal())
 
+    debug(s"operator A %*% inCoreB-diagonal. #parts=${rddA.partitions.size}.")
+
     val rdd = rddA
         // Just multiply the blocks
         .map {
       case (keys, blockA) => keys -> (blockA %*%: diagv(dg))
     }
-    new DrmRddInput(blockifiedSrc = Some(rdd))
+    rdd
   }
 
   private def rightMultiply_common[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
 
-    val rddA = srcA.toBlockifiedDrmRdd()
+    val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
     implicit val sc:DistributedContext = rddA.sparkContext
 
+    debug(s"operator A %*% inCoreB. #parts=${rddA.partitions.size}.")
+
     val bcastB = drmBroadcast(m = op.right)
 
     val rdd = rddA
@@ -46,7 +54,7 @@ object AinCoreB {
       case (keys, blockA) => keys -> (blockA %*% bcastB)
     }
 
-    new DrmRddInput(blockifiedSrc = Some(rdd))
+    rdd
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
index 56de9f4..5789bd2 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
@@ -17,16 +17,20 @@
 
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.sparkbindings.drm._
 import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.logging._
 import RLikeOps._
 import org.apache.spark.SparkContext._
 import org.apache.mahout.math.{DenseVector, Vector, SequentialAccessSparseVector}
 import org.apache.mahout.math.drm.logical.OpAt
 
+
 /** A' algorithms */
 object At {
 
+  private final implicit val log = getLog(At.getClass)
+
   def at(
       operator: OpAt,
       srcA: DrmRddInput[Int]): DrmRddInput[Int] = at_nograph(operator = operator, srcA = srcA)
@@ -39,10 +43,15 @@ object At {
    * groups into final rows of the transposed matrix.
    */
   private[blas] def at_nograph(operator: OpAt, srcA: DrmRddInput[Int]): DrmRddInput[Int] = {
-    val drmRdd = srcA.toBlockifiedDrmRdd()
+
+    debug("operator A'.")
+
+    val drmRdd = srcA.toBlockifiedDrmRdd(operator.A.ncol)
     val numPartitions = drmRdd.partitions.size
     val ncol = operator.ncol
 
+    debug(s"A' #parts = $numPartitions.")
+
     // Validity of this conversion must be checked at logical operator level.
     val nrow = operator.nrow.toInt
     val atRdd = drmRdd
@@ -70,7 +79,7 @@ object At {
         key -> v
     }).densify()
 
-    new DrmRddInput(rowWiseSrc = Some(ncol -> atRdd))
+    atRdd
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
index be4f08c..a212878 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
@@ -17,6 +17,7 @@
 
 package org.apache.mahout.sparkbindings.blas
 
+import org.apache.mahout.logging._
 import org.apache.mahout.math._
 import org.apache.mahout.sparkbindings._
 import org.apache.mahout.sparkbindings.drm._
@@ -34,25 +35,30 @@ import SparkEngine._
  */
 object AtA {
 
-  final val log = Logger.getLogger(AtA.getClass)
+  private final implicit val log = getLog(AtA.getClass)
 
   final val PROPERTY_ATA_MAXINMEMNCOL = "mahout.math.AtA.maxInMemNCol"
+  final val PROPERTY_ATA_MMUL_BLOCKHEIGHT = "mahout.math.AtA.blockHeight"
 
   /** Materialize A'A operator */
   def at_a(operator: OpAtA[_], srcRdd: DrmRddInput[_]): DrmRddInput[Int] = {
 
-    val maxInMemNCol = System.getProperty(PROPERTY_ATA_MAXINMEMNCOL, "2000").toInt
+    val maxInMemNCol = System.getProperty(PROPERTY_ATA_MAXINMEMNCOL, "200").toInt
     maxInMemNCol.ensuring(_ > 0, "Invalid A'A in-memory setting for optimizer")
 
     if (operator.ncol <= maxInMemNCol) {
+
       // If we can comfortably fit upper-triangular operator into a map memory, we will run slim
       // algorithm with upper-triangular accumulators in maps. 
-      val inCoreA = at_a_slim(srcRdd = srcRdd, operator = operator)
+      val inCoreA = at_a_slim(srcRdd = srcRdd.toDrmRdd(), operator = operator)
       val drmRdd = parallelizeInCore(inCoreA, numPartitions = 1)(sc = srcRdd.sparkContext)
-      new DrmRddInput(rowWiseSrc = Some(inCoreA.ncol, drmRdd))
+      drmRdd
+
     } else {
+
       // Otherwise, we need to run a distributed, big version
-      new DrmRddInput(rowWiseSrc = Some(operator.ncol, at_a_nongraph(srcRdd = srcRdd, op = operator)))
+      //      new DrmRddInput(rowWiseSrc = Some(operator.ncol, at_a_nongraph(srcRdd = srcRdd, op = operator)))
+      at_a_nongraph_mmul(srcRdd = srcRdd.toBlockifiedDrmRdd(operator.A.ncol), op = operator)
 
     }
   }
@@ -64,7 +70,7 @@ object AtA {
    */
   def at_a_slim(operator: OpAtA[_], srcRdd: DrmRdd[_]): Matrix = {
 
-    log.debug("Applying slim A'A.")
+    debug("operator slim A'A(Spark)")
 
     val ncol = operator.ncol
     // Compute backing vector of tiny-upper-triangular accumulator accross all the data.
@@ -73,122 +79,195 @@ object AtA {
       val ut = new UpperTriangular(ncol)
 
       // Strategy is to add to an outer product of each row to the upper triangular accumulator.
-      pIter.foreach({
-        case (k, v) =>
+      pIter.foreach({ case (k, v) =>
 
-          // Use slightly various traversal strategies over dense vs. sparse source.
-          if (v.isDense) {
+        // Use slightly various traversal strategies over dense vs. sparse source.
+        if (v.isDense) {
 
-            // Update upper-triangular pattern only (due to symmetry).
-            // Note: Scala for-comprehensions are said to be fairly inefficient this way, but this is
-            // such spectacular case they were deesigned for.. Yes I do observe some 20% difference
-            // compared to while loops with no other payload, but the other payload is usually much
-            // heavier than this overhead, so... I am keeping this as is for the time being.
+          // Update upper-triangular pattern only (due to symmetry).
+          // Note: Scala for-comprehensions are said to be fairly inefficient this way, but this is
+          // such spectacular case they were deesigned for.. Yes I do observe some 20% difference
+          // compared to while loops with no other payload, but the other payload is usually much
+          // heavier than this overhead, so... I am keeping this as is for the time being.
 
-            for (row <- 0 until v.length; col <- row until v.length)
-              ut(row, col) = ut(row, col) + v(row) * v(col)
+          for (row <- 0 until v.length; col <- row until v.length)
+            ut(row, col) = ut(row, col) + v(row) * v(col)
 
-          } else {
+        } else {
 
-            // Sparse source.
-            v.nonZeroes().view
+          // Sparse source.
+          v.nonZeroes().view
 
-                // Outer iterator iterates over rows of outer product.
-                .foreach(elrow => {
+            // Outer iterator iterates over rows of outer product.
+            .foreach(elrow => {
 
-              // Inner loop for columns of outer product.
-              v.nonZeroes().view
+            // Inner loop for columns of outer product.
+            v.nonZeroes().view
 
-                  // Filter out non-upper nonzero elements from the double loop.
-                  .filter(_.index >= elrow.index)
+              // Filter out non-upper nonzero elements from the double loop.
+              .filter(_.index >= elrow.index)
 
-                  // Incrementally update outer product value in the uppper triangular accumulator.
-                  .foreach(elcol => {
+              // Incrementally update outer product value in the uppper triangular accumulator.
+              .foreach(elcol => {
 
-                val row = elrow.index
-                val col = elcol.index
-                ut(row, col) = ut(row, col) + elrow.get() * elcol.get()
+              val row = elrow.index
+              val col = elcol.index
+              ut(row, col) = ut(row, col) + elrow.get() * elcol.get()
 
-              })
             })
+          })
 
-          }
+        }
       })
 
       Iterator(dvec(ddata = ut.getData): Vector)
-    })
-
-        .collect()
-        .reduce(_ += _)
+    }).collect().reduce(_ += _)
 
     new DenseSymmetricMatrix(resSym)
   }
 
+  // Version that tries to use groupBy. In practice this is the slowest.
+  def at_a_group(op: OpAtA[_], srcRdd: DrmRdd[_]): DrmRddInput[Int] = {
+    debug("operator non-slim A'A(Spark-group).")
+
+    // Determine how many partitions the new matrix would need approximately. We base that on
+    // geometry only, but it may eventually not be that adequate. Indeed, A'A tends to be much more
+    // dense in reality than the source.
+    val m = op.A.nrow
+    val n = op.A.ncol
+    val srcNumParts = srcRdd.partitions.size
+    val finalNumParts = (srcNumParts * n / m).ceil.toInt max 1
+    val numParts = finalNumParts max srcNumParts
+    val ranges = computeEvenSplits(n, numParts)
+
+    var rddAtA = srcRdd
+
+      // Remove key, key is irrelevant
+      .map(_._2)
+
+      // Form partial outer blocks for each partition
+      .flatMap { v =>
+      for (blockKey <- 0 until numParts) yield {
+        blockKey -> v
+      }
+    }
+      // Sent to individual partition reducer
+      .groupByKey(numPartitions = numParts)
+
+      // Reduce individual group
+      .map { case (blockKey, iter) =>
+      val range = ranges(blockKey)
+      val mxC: Matrix = new SparseRowMatrix(range.size, n, false)
+      iter.foreach(vec => addOuterProduct(mxC, vec(range), vec))
+
+      // Fix keys
+      val blockStart = range.start
+      val rowKeys = Array.tabulate(mxC.nrow)(blockStart + _)
+      rowKeys -> mxC
+    }
+
+    if (log.isDebugEnabled)
+      log.debug(s"AtA (grouping) #parts: ${rddAtA.partitions.size}.")
+
+    if (finalNumParts < numParts) rddAtA = rddAtA.coalesce(finalNumParts, shuffle = false)
+
+    rddAtA
+  }
+
+
   /** The version of A'A that does not use GraphX */
-  def at_a_nongraph(op: OpAtA[_], srcRdd: DrmRdd[_]): DrmRdd[Int] = {
+  def at_a_nongraph(op: OpAtA[_], srcRdd: DrmRdd[_]): DrmRddInput[Int] = {
 
-    log.debug("Applying non-slim non-graph A'A.")
+    debug("Applying non-slim non-graph A'A.")
 
     // Determine how many partitions the new matrix would need approximately. We base that on 
     // geometry only, but it may eventually not be that adequate. Indeed, A'A tends to be much more
     // dense in reality than the source.
-
     val m = op.A.nrow
     val n = op.A.ncol
-/* possible fix for index out of range for vector range
-    val numParts = (srcRdd.partitions.size.toDouble * n / m).ceil.round.toInt max 1
+    val numParts = (srcRdd.partitions.size.toDouble * n / m).ceil.toInt max 1
     val blockHeight = (n - 1) / numParts + 1
-*/
-    val numParts = (srcRdd.partitions.size.toDouble * n / m).ceil.round.toInt max 1 min n
+    val offsets = (0 until numParts).map(_ * blockHeight)
+    val ranges = offsets.map(offset => offset until (offset + blockHeight min n))
 
-    // Computing evenly split ranges to denote each partition size.
+    val rddAtA = srcRdd
 
-    // Base size.
-    val baseSize = n / numParts
+      // Remove key, key is irrelevant
+      .map(_._2)
 
-    // How many partitions needs to be baseSize +1.
-    val slack = n - baseSize * numParts
+      // Form partial outer blocks for each partition
+      .flatMap { v =>
+      for (blockKey <- 0 until numParts) yield {
+        blockKey ->(blockKey, v)
+      }
+    }
+      // Combine outer products
+      .combineByKey(// Combiner factory
+        createCombiner = (t: (Int, Vector)) => {
+          val partNo = t._1
+          val vec = t._2
+          val range = ranges(partNo)
+          val mxC = if (vec.isDense) new DenseMatrix(range.size, n) else new SparseRowMatrix(range.size, n)
+          addOuterProduct(mxC, vec(range), vec)
+        },
+
+        // Merge values into existing partition accumulator.
+        mergeValue = (mxC: Matrix, t: (Int, Vector)) => {
+          val partNo = t._1
+          val vec = t._2
+          addOuterProduct(mxC, vec(ranges(partNo)), vec)
+        },
+
+        // Merge combiners
+        mergeCombiners = (mxC1: Matrix, mxC2: Matrix) => mxC1 += mxC2, numPartitions = numParts)
+
+      // Restore proper block keys
+      .map { case (blockKey, block) =>
+      val blockStart = blockKey * blockHeight
+      val rowKeys = Array.tabulate(block.nrow)(blockStart + _)
+      rowKeys -> block
+    }
 
-    val ranges =
-      // Start with partition offsets... total numParts + 1.
-      (0 to numParts).view.map { i => (baseSize + 1) * i - (i - slack max 0)}
-        // And convert offsets to ranges.
-        .sliding(2).map(s => s(0) until s(1)).toIndexedSeq
+    if (log.isDebugEnabled)
+      log.debug(s"AtA #parts: ${rddAtA.partitions.size}.")
 
-    val rddAtA = srcRdd
+    rddAtA
+  }
 
-        // Remove key, key is irrelevant
-        .map(_._2)
-
-        // Form partial outer blocks for each partition
-        .flatMap {
-      v =>
-        for (blockKey <- Stream.range(0, numParts)) yield {
-/* patch to fix index out of range for vector access
-          val blockStart = blockKey * blockHeight
-          val blockEnd = n min (blockStart + blockHeight)
-          blockKey -> (v(blockStart until blockEnd) cross v)
-*/
-          val range = ranges(blockKey)
-          blockKey -> (v(range) cross v)
-        }
+  /**
+   * The version of A'A that does not use GraphX. Tries to use blockwise matrix multiply. If an
+   * accelerated matrix back is available, this might be somewhat faster.
+   */
+  def at_a_nongraph_mmul(op: OpAtA[_], srcRdd: BlockifiedDrmRdd[_]): DrmRddInput[Int] = {
+
+    // Determine how many partitions the new matrix would need approximately. We base that on
+    // geometry only, but it may eventually not be that adequate. Indeed, A'A tends to be much more
+    // dense in reality than the source.
+    val m = op.A.nrow
+    val n = op.A.ncol
+    val aparts = srcRdd.partitions.size
+    val numParts = estimateProductPartitions(anrow = n, ancol = m, bncol = n, aparts = aparts, bparts = aparts)
+    val ranges = computeEvenSplits(n, numParts)
+
+    debug(s"operator mmul-A'A(Spark); #parts = $numParts, #partsA=$aparts.")
+
+    val rddAtA = srcRdd.flatMap { case (keys, block) =>
+      Iterator.tabulate(numParts) { i =>
+        i -> block(::, ranges(i)).t %*% block
+      }
     }
-        // Combine outer blocks
-        .reduceByKey(_ += _)
-
-        // Restore proper block keys
-        .map {
-      case (blockKey, block) =>
-/* patch to fix index out of range for vector access
-        val blockStart = blockKey * blockHeight
-        val rowKeys = Array.tabulate(block.nrow)(blockStart + _)
-*/
-        val range = ranges(blockKey)
-        val rowKeys = Array.tabulate(block.nrow)(range.start + _)
-        rowKeys -> block
+      // Reduce partial blocks.
+      .reduceByKey(_ += _, numPartitions = numParts)
+
+      // Produce keys
+      .map { case (blockKey, block) =>
+
+      val blockStart = ranges(blockKey).start
+      val rowKeys = Array.tabulate(block.nrow)(blockStart + _)
+      rowKeys -> block
     }
 
-    new DrmRddInput[Int](blockifiedSrc = Some(rddAtA))
+    rddAtA
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
index 86aadc8..45705a9 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
@@ -17,8 +17,13 @@
 
 package org.apache.mahout.sparkbindings.blas
 
-import scala.reflect.ClassTag
-import org.apache.mahout.math.drm._
+import reflect.ClassTag
+import collection._
+import JavaConversions._
+
+import org.apache.mahout.logging._
+import org.apache.mahout.math._
+import drm._
 import org.apache.mahout.sparkbindings.drm._
 import org.apache.spark.rdd.RDD
 import org.apache.mahout.math.scalabindings._
@@ -27,92 +32,330 @@ import org.apache.spark.SparkContext._
 import org.apache.log4j.Logger
 import org.apache.mahout.math.drm.logical.OpAtB
 
+import scala.collection.mutable.ArrayBuffer
+
 object AtB {
 
-  private val log = Logger.getLogger(AtB.getClass)
+  private final implicit val log = getLog(AtB.getClass)
 
+  def atb[A: ClassTag](operator: OpAtB[A], srcA: DrmRddInput[A], srcB: DrmRddInput[A]): DrmRddInput[Int] = {
+    atb_nograph_mmul(operator, srcA, srcB, operator.A.partitioningTag == operator.B.partitioningTag)
+  }
   /**
    * The logic for computing A'B is pretty much map-side generation of partial outer product blocks
    * over co-grouped rows of A and B. If A and B are identically partitioned, we can just directly
    * zip all the rows. Otherwise, we need to inner-join them first.
+   *
    */
-  def atb_nograph[A: ClassTag](
-      operator: OpAtB[A],
-      srcA: DrmRddInput[A],
-      srcB: DrmRddInput[A],
-      zippable:Boolean = false
-      ): DrmRddInput[Int] = {
+  @deprecated("slow, will remove", since = "0.10.2")
+  def atb_nograph[A: ClassTag](operator: OpAtB[A], srcA: DrmRddInput[A], srcB: DrmRddInput[A],
+                               zippable: Boolean = false): DrmRddInput[Int] = {
 
     val rddA = srcA.toDrmRdd()
-    val zipped = if ( zippable ) {
+    val rddB = srcB.toDrmRdd()
+
+
+    val prodNCol = operator.ncol
+    val prodNRow = operator.nrow
+    val aNRow = operator.A.nrow
+
+    // Approximate number of final partitions. We take bigger partitions as our guide to number of
+    // elements per partition. TODO: do it better.
+    // Elements per partition, bigger of two operands.
+    val epp = aNRow.toDouble * prodNRow / rddA.partitions.size max aNRow.toDouble * prodNCol /
+      rddB.partitions.size
+
+    // Number of partitions we want to converge to in the product. For now we simply extrapolate that
+    // assuming product density and operand densities being about the same; and using the same element
+    // per partition number in the product as the bigger of two operands.
+    val numProductPartitions = (prodNCol.toDouble * prodNRow / epp).ceil.toInt
+
+    if (log.isDebugEnabled) log.debug(s"AtB: #parts ${numProductPartitions} for $prodNRow x $prodNCol geometry.")
+
+    val zipped = if (zippable) {
 
       log.debug("A and B for A'B are identically distributed, performing row-wise zip.")
 
-      rddA.zip(other = srcB.toDrmRdd())
+      rddA.zip(other = rddB)
 
     } else {
 
       log.debug("A and B for A'B are not identically partitioned, performing inner join.")
 
-      rddA.join(other=srcB.toDrmRdd()).map({
-        case (key,(v1,v2) ) => (key -> v1) -> (key -> v2)
+      rddA.join(other = rddB, numPartitions = numProductPartitions).map({ case (key, (v1,
+      v2)) => (key -> v1) -> (key -> v2)
       })
     }
 
-    val blockHeight = safeToNonNegInt(
-      (operator.B.ncol.toDouble/rddA.partitions.size).ceil.round max 1L
-    )
-
-    computeAtBZipped(
-      zipped,
-      nrow = operator.nrow,
-      ancol = operator.A.ncol,
-      bncol = operator.B.ncol,
-      blockHeight = blockHeight
-    )
+    computeAtBZipped2(zipped, nrow = operator.nrow, ancol = operator.A.ncol, bncol = operator.B.ncol,
+      numPartitions = numProductPartitions)
+  }
+
+  private[sparkbindings] def atb_nograph_mmul[A:ClassTag](operator:OpAtB[A], srcA: DrmRddInput[A], srcB:DrmRddInput[A], zippable:Boolean = false):DrmRddInput[Int] = {
+
+    debug("operator mmul-A'B(Spark)")
+
+    val prodNCol = operator.ncol
+    val prodNRow = safeToNonNegInt(operator.nrow)
+    val aNRow = safeToNonNegInt(operator.A.nrow)
+
+    val rddA = srcA.toDrmRdd()
+    val rddB = srcB.toDrmRdd()
+
+    // Approximate number of final partitions. We take bigger partitions as our guide to number of
+    // elements per partition. TODO: do it better.
+    // Elements per partition, bigger of two operands.
+    val epp = aNRow.toDouble * prodNRow / rddA.partitions.size max aNRow.toDouble * prodNCol /
+      rddB.partitions.size
+
+    // Number of partitions we want to converge to in the product. For now we simply extrapolate that
+    // assuming product density and operand densities being about the same; and using the same element
+    // per partition number in the product as the bigger of two operands.
+    val numProductPartitions = (prodNCol.toDouble * prodNRow / epp).ceil.toInt min prodNRow
+
+    if (log.isDebugEnabled) log.debug(s"AtB mmul: #parts ${numProductPartitions} for $prodNRow x $prodNCol geometry.")
+
+    val zipped = if (zippable) {
+
+      debug("mmul-A'B - zip: are identically distributed, performing row-wise zip.")
+
+      val blockdRddA = srcA.toBlockifiedDrmRdd(operator.A.ncol)
+      val blockdRddB = srcB.toBlockifiedDrmRdd(operator.B.ncol)
+
+      blockdRddA
+
+        // Zip
+        .zip(other = blockdRddB)
+
+        // Throw away the keys
+        .map { case ((_, blockA), (_, blockB)) => blockA -> blockB}
+
+    } else {
+
+      debug("mmul-A'B: cogroup for non-identically distributed stuff.")
+
+      // To take same route, we'll join stuff row-wise, blockify it here and then proceed with the
+      // same computation path. Although it is possible we could shave off one shuffle here. TBD.
+
+      rddA
+
+        // Do full join. We can't get away with partial join because it is going to lose some rows
+        // in case we have missing rows on either side.
+        .cogroup(other = rddB, numPartitions = rddA.partitions.size max rddB.partitions.size )
+
+
+        // Merge groups.
+        .mapPartitions{iter =>
+
+        val aRows = new ArrayBuffer[Vector](1000)
+        val bRows = new ArrayBuffer[Vector](1000)
+
+        // Populate hanging row buffs
+        iter.foreach{case (_, (arowbag,browbag)) =>
+
+          // Some up all vectors, if any, for a row. If we have > 1 that means original matrix had
+          // non-uniquely keyed rows which is generally a matrix format inconsistency (should not
+          // happen).
+          aRows += (if (arowbag.isEmpty)
+            new SequentialAccessSparseVector(prodNRow)
+          else arowbag.reduce(_ += _))
+
+          bRows += (if (browbag.isEmpty)
+            new SequentialAccessSparseVector(prodNCol)
+          else browbag.reduce(_ += _))
+        }
+
+        // Transform collection of vectors into blocks.
+        val blockNRow = aRows.size
+        assert(blockNRow == bRows.size)
+
+        val aBlock:Matrix = new SparseRowMatrix(blockNRow, prodNRow, aRows.toArray)
+        val bBlock:Matrix = new SparseRowMatrix(blockNRow, prodNCol, bRows.toArray)
+
+        // Form pairwise result
+        Iterator(aBlock -> bBlock)
+      }
+    }
+
+    computeAtBZipped3(pairwiseRdd = zipped, nrow = prodNRow, ancol = prodNRow, bncol = aNRow,
+      numPartitions = numProductPartitions)
+
+  }
+  /**
+   * Compute, combine and accumulate outer products for every key. The incoming tuple structure
+   * is (partNo, (vecA, vecB)), so for every `partNo` we compute an outer product of the form {{{
+   *   vecA cross vecB
+   * }}}
+   * @param pairwiseRdd
+   * @return
+   */
+  @deprecated("slow, will remove", since = "0.10.2")
+  private[sparkbindings] def combineOuterProducts(pairwiseRdd: RDD[(Int, (Vector, Vector))], numPartitions: Int) = {
+
+    pairwiseRdd
+
+      // Reduce individual partitions
+      .combineByKey(createCombiner = (t: (Vector, Vector)) => {
+
+      val vecA = t._1
+      val vecB = t._2
+
+      // Create partition accumulator. Generally, summation of outer products probably calls for
+      // dense accumulators. However, let's assume extremely sparse cases are still possible, and
+      // by default assume any sparse case is an extremely sparse case. May need to tweak further.
+      val mxC: Matrix = if (!vecA.isDense && !vecB.isDense)
+        new SparseRowMatrix(vecA.length, vecB.length)
+      else
+        new DenseMatrix(vecA.length, vecB.length)
+
+      // Add outer product of arow and bRowFrag to mxC
+      addOuterProduct(mxC, vecA, vecB)
+
+    }, mergeValue = (mxC: Matrix, t: (Vector, Vector)) => {
+      // Merge of a combiner with another outer product fragment.
+      val vecA = t._1
+      val vecB = t._2
+
+      addOuterProduct(mxC, vecA, vecB)
+
+    }, mergeCombiners = (mxC1: Matrix, mxC2: Matrix) => {
+
+      // Merge of 2 combiners.
+      mxC1 += mxC2
+
+    }, numPartitions = numPartitions)
+  }
+
+  private[sparkbindings] def computeAtBZipped3[A: ClassTag](pairwiseRdd: RDD[(Matrix, Matrix)], nrow: Int,
+                                                            ancol: Int, bncol: Int, numPartitions: Int) = {
+
+    val ranges = computeEvenSplits(nrow, numPartitions)
+
+    val rdd = pairwiseRdd.flatMap{ case (blockA, blockB) ⇒
+
+      // Handling microscopic Pat's cases. Any slicing doesn't work well on 0-row matrix. This
+      // probably should be fixed in the in-core matrix implementations.
+      if (blockA.nrow == 0 )
+        Iterator.empty
+      else
+      // Output each partial outer product with its correspondent partition #.
+        Iterator.tabulate(numPartitions) {part ⇒
+
+          val mBlock = blockA(::, ranges(part)).t %*% blockB
+          part → mBlock
+        }
+    }
+
+      // Reduce.
+      .reduceByKey(_ += _, numPartitions = numPartitions)
+
+      // Produce keys
+      .map { case (blockKey, block) ⇒ ranges(blockKey).toArray → block }
+
+    debug(s"A'B mmul #parts: ${rdd.partitions.size}.")
+
+    rdd
   }
 
+  private[sparkbindings] def computeAtBZipped2[A: ClassTag](zipped: RDD[(DrmTuple[A], DrmTuple[A])], nrow: Long,
+                                                            ancol: Int, bncol: Int, numPartitions: Int) = {
+
+    // The plan of this approach is to send a_i and parts of b_i to partitoin reducers which actually
+    // do outer product sum update locally (instead of sending outer blocks). Thus it should minimize
+    // expense for IO and also in-place partition block accum update should be much more efficient
+    // than forming outer block matrices and perform matrix-on-patrix +.
+    // Figure out appriximately block height per partition of the result.
+    val blockHeight = safeToNonNegInt((nrow - 1) / numPartitions) + 1
 
-//  private[sparkbindings] def atb_nograph()
+    val partitionedRdd = zipped
+
+      // Split B-rows into partitions using blockHeight
+      .mapPartitions { iter =>
+
+      val offsets = (0 until numPartitions).map(_ * blockHeight)
+      val ranges = offsets.map(offs => offs until (offs + blockHeight min ancol))
+
+      // Transform into series of (part -> (arow, part-brow)) tuples (keyed by part #).
+      iter.flatMap { case ((_, arow), (_, brow)) =>
+
+        ranges.view.zipWithIndex.map { case (arange, partNum) =>
+          partNum -> (arow(arange).cloned -> brow)
+        }
+      }
+    }
+
+    val blockRdd = combineOuterProducts(partitionedRdd, numPartitions)
+
+      // Add ordinal row keys.
+      .map { case (blockNum, block) =>
+
+      // Starting key
+      var offset = blockNum * blockHeight
+
+      var keys = Array.tabulate(block.nrow)(offset + _)
+      keys -> block
+
+    }
+
+    blockRdd
+  }
 
   /** Given already zipped, joined rdd of rows of A' and B, compute their product A'B */
-  private[sparkbindings] def computeAtBZipped[A: ClassTag](zipped:RDD[(DrmTuple[A], DrmTuple[A])],
-      nrow:Long, ancol:Int, bncol:Int, blockHeight: Int) = {
+  @deprecated("slow, will remove", since = "0.10.2")
+  private[sparkbindings] def computeAtBZipped[A: ClassTag](zipped: RDD[(DrmTuple[A], DrmTuple[A])], nrow: Long,
+                                                           ancol: Int, bncol: Int, numPartitions: Int) = {
 
     // Since Q and A are partitioned same way,we can just zip their rows and proceed from there by
     // forming outer products. Our optimizer lacks this primitive, so we will implement it using RDDs
     // directly. We try to compile B' = A'Q now by collecting outer products of rows of A and Q. At
     // this point we need to split n-range  of B' into sutiable number of partitions.
 
-    val btNumParts = safeToNonNegInt((nrow - 1) / blockHeight + 1)
+    if (log.isDebugEnabled) {
+      log.debug(s"AtBZipped:zipped #parts ${zipped.partitions.size}")
+      log.debug(s"AtBZipped:Targeted #parts ${numPartitions}")
+    }
+
+    // Figure out appriximately block height per partition of the result.
+    val blockHeight = safeToNonNegInt((nrow - 1) / numPartitions) + 1
 
     val rddBt = zipped
 
-        // Produce outer product blocks
-        .flatMap {
-      case ((aKey, aRow), (qKey, qRow)) =>
-        for (blockKey <- Stream.range(0, btNumParts)) yield {
-          val blockStart = blockKey * blockHeight
-          val blockEnd = ancol min (blockStart + blockHeight)
+      // Produce outer product blocks
+      .flatMap { case ((aKey, aRow), (qKey, qRow)) =>
+      for (blockKey <- Stream.range(0, numPartitions)) yield {
+        val blockStart = blockKey * blockHeight
+        val blockEnd = ancol min (blockStart + blockHeight)
 
-          // Create block by cross product of proper slice of aRow and qRow
-          blockKey -> (aRow(blockStart until blockEnd) cross qRow)
-        }
-    }
-        // Combine blocks by just summing them up
-        .reduceByKey {
-      case (block1, block2) => block1 += block2
+        // Create block by cross product of proper slice of aRow and qRow
+        blockKey -> (aRow(blockStart until blockEnd) cross qRow)
+
+        // TODO: computing tons of cross product matrices seems to be pretty inefficient here. More
+        // likely single streaming algorithm of updates will perform much better here. So rewrite
+        // this using mapPartitions with numPartitions block accumulators.
+
+      }
     }
+      //      .combineByKey(
+      //        createCombiner = (mx:Matrix) => mx,
+      //        mergeValue = (c:Matrix,mx:Matrix) => c += mx,
+      //        mergeCombiners = (c1:Matrix,c2:Matrix) => c1 += c2,
+      //        numPartitions = numPartitions
+      //      )
+      // Doesn't look like use of combineByKey produces any better results than reduceByKey. So keeping
+      // reduceByKey for simplicity. Combiners probably doesn't mean reduceByKey doesn't combine map-side.
+      // Combine blocks by just summing them up
+      .reduceByKey((block1, block2) => block1 += block2, numPartitions)
 
-        // Throw away block key, generate row keys instead.
-        .map {
-      case (blockKey, block) =>
-        val blockStart = blockKey * blockHeight
-        val rowKeys = Array.tabulate(block.nrow)(blockStart + _)
-        rowKeys -> block
+      // Throw away block key, generate row keys instead.
+      .map { case (blockKey, block) =>
+      val blockStart = blockKey * blockHeight
+      val rowKeys = Array.tabulate(block.nrow)(blockStart + _)
+      rowKeys -> block
     }
 
-    new DrmRddInput[Int](blockifiedSrc = Some(rddBt))
+    if (log.isDebugEnabled) log.debug(s"AtBZipped #parts ${rddBt.partitions.size}")
+
+    rddBt
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
index 94c3f06..629accd 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
@@ -15,22 +15,22 @@ object Ax {
 
   def ax_with_broadcast[K: ClassTag](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
 
-    val rddA = srcA.toBlockifiedDrmRdd()
-    implicit val sc:DistributedContext = rddA.sparkContext
+    val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
+    implicit val sc: DistributedContext = rddA.sparkContext
 
     val bcastX = drmBroadcast(op.x)
 
-    val rdd = rddA
-        // Just multiply the blocks
-        .map({
-      case (keys, blockA) => keys -> (blockA %*% bcastX).toColMatrix
-    })
+    val rdd: BlockifiedDrmRdd[K] = rddA
+
+      // Just multiply the blocks
+      .map { case (keys, blockA) ⇒ keys → (blockA %*% bcastX).toColMatrix }
 
-    new DrmRddInput(blockifiedSrc = Some(rdd))
+    new DrmRddInput(Right(rdd))
   }
 
   def atx_with_broadcast(op: OpAtx, srcA: DrmRddInput[Int]): DrmRddInput[Int] = {
-    val rddA = srcA.toBlockifiedDrmRdd()
+
+    val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
     implicit val dc:DistributedContext = rddA.sparkContext
 
     val bcastX = drmBroadcast(op.x)
@@ -52,10 +52,10 @@ object Ax {
     // It is ridiculous, but in this scheme we will have to re-parallelize it again in order to plug
     // it back as drm blockified rdd
 
-    val rdd = dc.parallelize(Seq(inCoreM), numSlices = 1)
-        .map(block => Array.tabulate(block.nrow)(i => i) -> block)
+    val rdd:BlockifiedDrmRdd[Int] = dc.parallelize(Seq(inCoreM), numSlices = 1)
+        .map{block ⇒ Array.tabulate(block.nrow)(i ⇒ i) -> block}
 
-    new DrmRddInput(blockifiedSrc = Some(rdd))
+    rdd
 
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
index ea10ccb..4a379ec 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
@@ -18,12 +18,13 @@
 package org.apache.mahout.sparkbindings.blas
 
 import org.apache.log4j.Logger
-import scala.reflect.ClassTag
+import org.apache.mahout.sparkbindings.DrmRdd
+import reflect._
 import org.apache.mahout.sparkbindings.drm.DrmRddInput
 import org.apache.mahout.math._
 import scalabindings._
 import RLikeOps._
-import org.apache.mahout.math.drm.logical.OpCbind
+import org.apache.mahout.math.drm.logical.{OpCbindScalar, OpCbind}
 import org.apache.spark.SparkContext._
 
 /** Physical cbind */
@@ -31,6 +32,34 @@ object CbindAB {
 
   private val log = Logger.getLogger(CbindAB.getClass)
 
+  def cbindAScalar[K:ClassTag](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : DrmRddInput[K] = {
+    val srcRdd = srcA.toDrmRdd()
+
+    val ncol = op.A.ncol
+    val x = op.x
+
+    val fixedRdd = if (classTag[K] == ClassTag.Int && x != 0.0)
+      fixIntConsistency(op.asInstanceOf[OpCbindScalar[Int]],
+        src = srcRdd.asInstanceOf[DrmRdd[Int]]).asInstanceOf[DrmRdd[K]]
+    else srcRdd
+
+    val left = op.leftBind
+
+    val resultRdd = fixedRdd.map { case (key, vec) =>
+      val newVec = vec.like(ncol + 1)
+      if (left) {
+        newVec(1 to ncol) := vec
+        newVec(0) = x
+      } else {
+        newVec(0 until ncol) := vec
+        newVec(ncol) = x
+      }
+      key -> newVec
+    }
+
+    resultRdd
+  }
+
   def cbindAB_nograph[K: ClassTag](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
 
     val a = srcA.toDrmRdd()
@@ -88,7 +117,7 @@ object CbindAB {
       }
     }
 
-    new DrmRddInput(rowWiseSrc = Some(op.ncol -> rdd))
+    rdd
 
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
index a3caac7..4cd9a74 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
@@ -25,12 +25,14 @@ import org.apache.mahout.sparkbindings.DrmRdd
 
 class DrmRddOps[K: ClassTag](private[blas] val rdd: DrmRdd[K]) {
 
+  /** Turn RDD into dense row-wise vectors if density threshold is exceeded. */
   def densify(threshold: Double = 0.80): DrmRdd[K] = rdd.map({
     case (key, v) =>
       val vd = if (!v.isDense && v.getNumNonZeroElements > threshold * v.length) new DenseVector(v) else v
       key -> vd
   })
 
+  /** Turn rdd into sparse RDD if density threshold is underrun. */
   def sparsify(threshold: Double = 0.80): DrmRdd[K] = rdd.map({
     case (key, v) =>
       val vs = if (v.isDense() && v.getNumNonZeroElements <= threshold * v.length)

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
index 4c68c9a..2933ddc 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
@@ -17,6 +17,7 @@
 
 package org.apache.mahout.sparkbindings.blas
 
+import org.apache.mahout.math.drm.logical.OpMapBlock
 import org.apache.mahout.sparkbindings.drm.DrmRddInput
 import org.apache.mahout.math.drm.BlockMapFunc
 import org.apache.mahout.math.scalabindings.RLikeOps._
@@ -24,12 +25,13 @@ import scala.reflect.ClassTag
 
 object MapBlock {
 
-  def exec[S, R:ClassTag](src: DrmRddInput[S], ncol:Int, bmf:BlockMapFunc[S,R]): DrmRddInput[R] = {
+  def exec[S, R:ClassTag](src: DrmRddInput[S], operator:OpMapBlock[S,R]): DrmRddInput[R] = {
 
-    // We can't use attributes to avoid putting the whole this into closure.
-
-    val rdd = src.toBlockifiedDrmRdd()
-        .map(blockTuple => {
+    // We can't use attributes directly in the closure in order to avoid putting the whole object
+    // into closure.
+    val bmf = operator.bmf
+    val ncol = operator.ncol
+    val rdd = src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => {
       val out = bmf(blockTuple)
 
       assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.")
@@ -37,7 +39,8 @@ object MapBlock {
 
       out
     })
-    new DrmRddInput(blockifiedSrc = Some(rdd))
+
+    rdd
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
index e73376d..0434a72 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
@@ -1,50 +1,58 @@
 package org.apache.mahout.sparkbindings.blas
 
+import org.apache.mahout.sparkbindings.drm
+
 import scala.reflect.ClassTag
 import org.apache.mahout.sparkbindings.drm.DrmRddInput
 import org.apache.mahout.math.drm.logical.OpPar
 import org.apache.spark.rdd.RDD
+import scala.math._
+
+import org.apache.mahout.logging._
 
 /** Physical adjustment of parallelism */
 object Par {
 
+  private final implicit val log = getLog(Par.getClass)
+
   def exec[K: ClassTag](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K] = {
 
-    def adjust[T](rdd: RDD[T]): RDD[T] =
-      if (op.minSplits > 0) {
-        if (rdd.partitions.size < op.minSplits)
-          rdd.coalesce(op.minSplits, shuffle = true)
-        else rdd.coalesce(rdd.partitions.size)
-      } else if (op.exactSplits > 0) {
-        if (op.exactSplits < rdd.partitions.size)
-          rdd.coalesce(numPartitions = op.exactSplits, shuffle = false)
-        else if (op.exactSplits > rdd.partitions.size)
-          rdd.coalesce(numPartitions = op.exactSplits, shuffle = true)
-        else
-          rdd.coalesce(rdd.partitions.size)
-      } else if (op.exactSplits == -1 && op.minSplits == -1) {
-
-        // auto adjustment, try to scale up to either x1Size or x2Size.
-        val clusterSize = rdd.context.getConf.get("spark.default.parallelism", "1").toInt
-
-        val x1Size = (clusterSize * .95).ceil.toInt
-        val x2Size = (clusterSize * 1.9).ceil.toInt
-
-        if (rdd.partitions.size <= x1Size)
-          rdd.coalesce(numPartitions = x1Size, shuffle = true)
-        else if (rdd.partitions.size <= x2Size)
-          rdd.coalesce(numPartitions = x2Size, shuffle = true)
-        else
-          rdd.coalesce(numPartitions = rdd.partitions.size)
-      } else rdd.coalesce(rdd.partitions.size)
-
-    if (src.isBlockified) {
-      val rdd = src.toBlockifiedDrmRdd()
-      new DrmRddInput[K](blockifiedSrc = Some(adjust(rdd)))
+    val srcBlockified = src.isBlockified
+
+    val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else src.toDrmRdd()
+    val srcNParts = srcRdd.partitions.size
+
+    // To what size?
+    val targetParts = if (op.minSplits > 0) srcNParts max op.minSplits
+    else if (op.exactSplits > 0) op.exactSplits
+    else /* auto adjustment */ {
+      val stdParallelism = srcRdd.context.getConf.get("spark.default.parallelism", "1").toInt
+      val x1 = 0.95 * stdParallelism
+      if (srcNParts <= ceil(x1)) ceil(x1).toInt else ceil(2 * x1).toInt
+    }
+
+    debug(s"par $srcNParts => $targetParts.")
+
+    if (targetParts > srcNParts) {
+
+      // Expanding. Always requires deblockified stuff. May require re-shuffling.
+      val rdd = src.toDrmRdd().repartition(numPartitions = targetParts)
+
+      rdd
+
+    } else if (targetParts < srcNParts) {
+      // Shrinking.
+
+      if (srcBlockified) {
+        drm.rbind(src.toBlockifiedDrmRdd(op.ncol).coalesce(numPartitions = targetParts))
+      } else {
+        src.toDrmRdd().coalesce(numPartitions = targetParts)
+      }
     } else {
-      val rdd = src.toDrmRdd()
-      new DrmRddInput[K](rowWiseSrc = Some(op.ncol -> adjust(rdd)))
+      // no adjustment required.
+      src
     }
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
index 5037d68..62abba6 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
@@ -31,11 +31,11 @@ object RbindAB {
 
     // If any of the inputs is blockified, use blockified inputs
     if (srcA.isBlockified || srcB.isBlockified) {
-      val a = srcA.toBlockifiedDrmRdd()
-      val b = srcB.toBlockifiedDrmRdd()
+      val a = srcA.toBlockifiedDrmRdd(op.A.ncol)
+      val b = srcB.toBlockifiedDrmRdd(op.B.ncol)
 
       // Union seems to be fine, it is indeed just do partition-level unionization, no shuffles
-      new DrmRddInput(blockifiedSrc = Some(a ++ b))
+      a ++ b
 
     } else {
 
@@ -43,7 +43,7 @@ object RbindAB {
       val a = srcA.toDrmRdd()
       val b = srcB.toDrmRdd()
 
-      new DrmRddInput(rowWiseSrc = Some(op.ncol -> (a ++ b)))
+      a ++ b
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
index d0a50b5..0284ba2 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
@@ -22,6 +22,6 @@ object Slicing {
 
     // TODO: we probably need to re-shuffle result or at least cut down the partitions of 0 size
 
-    new DrmRddInput(rowWiseSrc = Some(ncol -> rdd))
+    rdd
   }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
index 9a50afa..6b8513f 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
@@ -17,13 +17,17 @@
 
 package org.apache.mahout.sparkbindings
 
+import org.apache.mahout.sparkbindings
+import org.apache.spark.rdd.RDD
+
 import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput}
 import org.apache.spark.SparkContext._
 import org.apache.mahout.math._
 import org.apache.mahout.math.drm._
 import scalabindings._
 import RLikeOps._
+import collection._
+import JavaConversions._
 
 /**
  * This validation contains distributed algorithms that distributed matrix expression optimizer picks
@@ -31,8 +35,81 @@ import RLikeOps._
  */
 package object blas {
 
-  implicit def drmRdd2ops[K:ClassTag](rdd:DrmRdd[K]):DrmRddOps[K] = new DrmRddOps[K](rdd)
+  implicit def drmRdd2ops[K: ClassTag](rdd: DrmRdd[K]): DrmRddOps[K] = new DrmRddOps[K](rdd)
+
+
+  /**
+   * Rekey matrix dataset keys to consequtive int keys.
+   * @param rdd incoming matrix row-wise dataset
+   *
+   * @param computeMap if true, also compute mapping between old and new keys
+   * @tparam K existing key parameter
+   * @return
+   */
+  private[mahout] def rekeySeqInts[K: ClassTag](rdd: DrmRdd[K], computeMap: Boolean = true): (DrmRdd[Int],
+    Option[RDD[(K, Int)]]) = {
+
+    // Spark context please.
+    val sctx = rdd.context
+    import sctx._
+
+    // First, compute partition sizes.
+    val partSizes = rdd.mapPartitionsWithIndex((part, iter) => Iterator(part -> iter.size))
+
+      // Collect in-core
+      .collect()
+
+    // Starting indices
+    var startInd = new Array[Int](rdd.partitions.size)
+
+    // Save counts
+    for (pc <- partSizes) startInd(pc._1) = pc._2
+
+    // compute cumulative sum
+    val siBcast = broadcast(startInd.scanLeft(0)(_ + _).init)
+
+    // Compute key -> int index map:
+    val keyMap = if (computeMap) {
+      Some(rdd
+
+        // Process individual partition with index, output `key -> index` tuple
+        .mapPartitionsWithIndex((part, iter) => {
+
+        // Start index for this partition
+        val si = siBcast.value(part)
+        iter.zipWithIndex.map { case ((key, _), index) => key -> (index + si)}
+      })) // Some
+
+    } else {
+
+      // Were not asked to compute key mapping
+      None
+    }
+
+    // Finally, do the transform
+    val intRdd = rdd
+
+      // Re-number each partition
+      .mapPartitionsWithIndex((part, iter) => {
 
+      // Start index for this partition
+      val si = siBcast.value(part)
+
+      // Iterate over data by producing sequential row index and retaining vector value.
+      iter.zipWithIndex.map { case ((_, vec), ind) => si + ind -> vec}
+    })
+
+    // Finally, return drm -> keymap result
+
+    intRdd -> keyMap
+
+  }
+
+
+  /**
+   * Fills in missing rows in an Int-indexed matrix by putting in empty row vectors for the missing
+   * keys.
+   */
   private[mahout] def fixIntConsistency(op: DrmLike[Int], src: DrmRdd[Int]): DrmRdd[Int] = {
 
     if (op.canHaveMissingRows) {
@@ -45,20 +122,20 @@ package object blas {
       // Compute the fix.
       sc
 
-          // Bootstrap full key set
-          .parallelize(0 until dueRows, numSlices = rdd.partitions.size max 1)
+        // Bootstrap full key set
+        .parallelize(0 until dueRows, numSlices = rdd.partitions.size max 1)
 
-          // Enable PairedFunctions
-          .map(_ -> Unit)
+        // Enable PairedFunctions
+        .map(_ -> Unit)
 
-          // Cogroup with all rows
-          .cogroup(other = rdd)
+        // Cogroup with all rows
+        .cogroup(other = rdd)
 
-          // Filter out out-of-bounds
-          .filter { case (key, _) => key >= 0 && key < dueRows}
+        // Filter out out-of-bounds
+        .filter { case (key, _) => key >= 0 && key < dueRows}
 
-          // Coalesce and output RHS
-          .map { case (key, (seqUnit, seqVec)) =>
+        // Coalesce and output RHS
+        .map { case (key, (seqUnit, seqVec)) =>
         val acc = seqVec.headOption.getOrElse(new SequentialAccessSparseVector(dueCols))
         val vec = if (seqVec.size > 0) (acc /: seqVec.tail)(_ + _) else acc
         key -> vec
@@ -68,4 +145,77 @@ package object blas {
 
   }
 
+  /** Method to do `mxC += a cross b` in-plcae a bit more efficiently than this expression does. */
+  def addOuterProduct(mxC: Matrix, a: Vector, b: Vector): Matrix = {
+
+    // Try to pay attention to density a bit here when computing and adding the outer product of
+    // arow and brow fragment.
+    if (b.isDense)
+      for (ela <- a.nonZeroes) mxC(ela.index, ::) := { (i, x) => x + ela * b(i)}
+    else
+      for (ela <- a.nonZeroes; elb <- b.nonZeroes()) mxC(ela.index, elb.index) += ela * elb
+
+    mxC
+  }
+
+  /**
+   * Compute ranges of more or less even splits of total `nrow` number
+   *
+   * @param nrow
+   * @param numSplits
+   * @return
+   */
+  @inline
+  private[blas] def computeEvenSplits(nrow: Long, numSplits: Int): IndexedSeq[Range] = {
+    require(numSplits <= nrow, "Requested amount of splits greater than number of data points.")
+    require(nrow >= 1)
+    require(numSplits >= 1)
+
+    // Base split -- what is our base split size?
+    val baseSplit = safeToNonNegInt(nrow / numSplits)
+
+    // Slack -- how many splits will have to be incremented by 1 though?
+    val slack = safeToNonNegInt(nrow % numSplits)
+
+    // Compute ranges. We need to set ranges so that numSplits - slack splits have size of baseSplit;
+    // and `slack` splits have size baseSplit + 1. Here is how we do it: First, we compute the range
+    // offsets:
+    val offsets = (0 to numSplits).map(i => i * (baseSplit + 1) - (0 max i - slack))
+    // And then we connect the ranges using gaps between offsets:
+    offsets.sliding(2).map(offs => offs(0) until offs(1)).toIndexedSeq
+  }
+
+  /**
+   * Estimate number of partitions for the product of A %*% B.
+   *
+   * We take average per-partition element count of product as higher of the same of A and B. (prefer
+   * larger partitions of operands).
+   *
+   * @param anrow A.nrow
+   * @param ancol A.ncol
+   * @param bncol B.ncol
+   * @param aparts partitions in A
+   * @param bparts partitions in B
+   * @return recommended partitions
+   */
+  private[blas] def estimateProductPartitions(anrow:Long, ancol:Long, bncol:Long, aparts:Int, bparts:Int):Int = {
+
+    // Compute per-partition element density in A
+    val eppA = anrow.toDouble * ancol/ aparts
+
+    // Compute per-partition element density in B
+    val eppB = ancol.toDouble * bncol / bparts
+
+    // Take the maximum element density into account. Is it a good enough?
+    val epp = eppA max eppB
+
+    // product partitions
+    val prodParts = anrow * bncol / epp
+
+    val nparts = math.round(prodParts).toInt max 1
+
+    // Constrain nparts to maximum of anrow to prevent guaranteed empty partitions.
+    if (nparts > anrow) anrow.toInt else nparts
+  }
+
 }


Mime
View raw message