mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlyubi...@apache.org
Subject git commit: MAHOUT-1597: A + 1.0 (element-wise scala operation) gives wrong result if rdd is missing rows, Spark side (dlyubimov) This closes apache/mahout#33
Date Mon, 28 Jul 2014 17:34:27 GMT
Repository: mahout
Updated Branches:
  refs/heads/master c9d978a46 -> 377dcace1


MAHOUT-1597: A + 1.0 (element-wise scala operation) gives wrong result if rdd is missing rows,
Spark side (dlyubimov)
This closes apache/mahout#33

Squashed commit of the following:

commit 9038b2c248ec163e0ff85dd37cb109345b3bbb9d
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Mon Jul 28 10:22:30 2014 -0700

    in-place performance enhancement at the risk of side-effects, for now

commit 35c873cceeda87b7b212677bffc0a21877932deb
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Tue Jul 22 15:34:00 2014 -0700

    CBind test tweak

commit 005aeccd353faeb59f2ddb2003f393a603978edb
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Tue Jul 22 15:33:18 2014 -0700

    Cbind test

commit 57f669a7d50097a5a816ce13ca8230f6c6742b65
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Tue Jul 22 14:57:57 2014 -0700

    Also fixing `A ew B` with missing rows operands

commit 2063c5ccaf88bf31d7f5c1dd3d1650925c4dfd58
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Tue Jul 22 14:12:56 2014 -0700

    style

commit 5d8e1407a7ea2535ae6d00701ed4f60390c1b30e
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Tue Jul 22 14:09:11 2014 -0700

    Orientation changing unary ops cannot produce missing rows

commit c9ac3be81ed464ccc4d440b8187e15efa9a21193
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Tue Jul 22 14:03:25 2014 -0700

    Tests, passing .

commit 1ff376b2ddd1bcbe61f896d14e27d7a413e7313c
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Tue Jul 22 13:23:14 2014 -0700

    Code up for lazy int-keyed missing rows fix

commit 746b3ddc6c0e7e8bb89ce591c32ba1b70ec688e6
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Tue Jul 22 11:25:57 2014 -0700

    WIP

commit 45642b65f3f1620a4e2187af4b2b54e26ce1c42e
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Mon Jul 21 18:19:37 2014 -0700

    WIP


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/377dcace
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/377dcace
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/377dcace

Branch: refs/heads/master
Commit: 377dcace16fe4fd77baaab91ad575e9da5c49ac0
Parents: c9d978a
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Authored: Mon Jul 28 10:25:05 2014 -0700
Committer: Dmitriy Lyubimov <dlyubimov@apache.org>
Committed: Mon Jul 28 10:25:05 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +
 .../mahout/math/drm/CheckpointedDrm.scala       |  2 +-
 .../org/apache/mahout/math/drm/DrmLike.scala    |  2 +
 .../math/drm/logical/AbstractBinaryOp.scala     |  2 +
 .../math/drm/logical/AbstractUnaryOp.scala      |  1 +
 .../mahout/math/drm/logical/OpAewScalar.scala   |  3 +
 .../apache/mahout/math/drm/logical/OpAt.scala   |  2 +
 .../apache/mahout/math/drm/logical/OpAtA.scala  |  4 +-
 .../mahout/math/drm/RLikeDrmOpsSuiteBase.scala  |  9 ++
 .../mahout/sparkbindings/SparkEngine.scala      |  8 +-
 .../apache/mahout/sparkbindings/blas/AewB.scala | 33 +++++--
 .../mahout/sparkbindings/blas/package.scala     | 41 +++++++++
 .../drm/CheckpointedDrmSpark.scala              | 46 ++++++++--
 .../apache/mahout/sparkbindings/package.scala   | 27 +++++-
 .../mahout/sparkbindings/drm/DrmLikeSuite.scala |  1 +
 .../sparkbindings/drm/RLikeDrmOpsSuite.scala    | 91 +++++++++++++++++++-
 16 files changed, 250 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 78b17b5..50909e2 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 1.0 - unreleased
 
+  MAHOUT-1597: A + 1.0 (element-wise scala operation) gives wrong result if rdd is missing
rows, Spark side (dlyubimov)
+
   MAHOUT-1595: MatrixVectorView - implement a proper iterateNonZero() (Anand Avati via dlyubimov)
 
   MAHOUT-1529(e): Move dense/sparse matrix test in mapBlock into spark (Anand Avati via dlyubimov)

http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
index 0266944..28fb7fd 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
@@ -31,6 +31,6 @@ trait CheckpointedDrm[K] extends DrmLike[K] {
   def writeDRM(path: String)
 
   /** If this checkpoint is already declared cached, uncache. */
-  def uncache()
+  def uncache(): this.type
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
index 995c873..97fe989 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
@@ -30,6 +30,8 @@ trait DrmLike[K] {
 
   protected[mahout] def partitioningTag: Long
 
+  protected[mahout] def canHaveMissingRows: Boolean
+
   /**
    * Distributed context, can be implicitly converted to operations on [[org.apache.mahout.math.drm.
    * DistributedEngine]].

http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
index 7863526..3b6b8bf 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
@@ -42,6 +42,8 @@ abstract class AbstractBinaryOp[A: ClassTag, B: ClassTag, K: ClassTag]
   protected[drm] var B: DrmLike[B]
   lazy val context: DistributedContext = A.context
 
+  protected[mahout] def canHaveMissingRows: Boolean = false
+
   // These are explicit evidence export. Sometimes scala falls over to figure that on its
own.
   def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
index 92abdb4..a445f21 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
@@ -32,5 +32,6 @@ abstract class AbstractUnaryOp[A: ClassTag, K: ClassTag]
 
   def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
 
+  override protected[mahout] lazy val canHaveMissingRows: Boolean = A.canHaveMissingRows
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
index 91e0dd4..3b651f6 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
@@ -29,6 +29,9 @@ case class OpAewScalar[K: ClassTag](
 
   override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
 
+  /** Stuff like `A +1` is always supposed to fix this */
+  override protected[mahout] lazy val canHaveMissingRows: Boolean = false
+
   /** R-like syntax for number of rows. */
   def nrow: Long = A.nrow
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
index 3239ad2..4791301 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala
@@ -30,4 +30,6 @@ case class OpAt(
   /** R-like syntax for number of columns */
   def ncol: Int = safeToNonNegInt(A.nrow)
 
+  /** A' after simplifications cannot produce missing rows, ever. */
+  override protected[mahout] lazy val canHaveMissingRows: Boolean = false
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
index c7c6046..ad2a5d8 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala
@@ -31,6 +31,6 @@ case class OpAtA[K: ClassTag](
   /** R-like syntax for number of columns */
   def ncol: Int = A.ncol
 
-  /** Non-zero element count */
-  def nNonZero: Long = throw new UnsupportedOperationException
+  override protected[mahout] lazy val canHaveMissingRows: Boolean = false
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
index 71dc640..50beccf 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
@@ -480,4 +480,13 @@ trait RLikeDrmOpsSuiteBase extends DistributedMahoutSuite with Matchers
{
 
   }
 
+  test("B = A + 1.0") {
+    val inCoreA = dense((1, 2), (2, 3), (3, 4))
+    val controlB = inCoreA + 1.0
+
+    val drmB = drmParallelize(m = inCoreA, numPartitions = 2) + 1.0
+
+    (drmB -: controlB).norm should be < 1e-10
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index b68a98e..c37354f 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -161,10 +161,10 @@ object SparkEngine extends DistributedEngine {
 
     {
       implicit def getWritable(x: Any): Writable = val2keyFunc()
-      new CheckpointedDrmSpark(
-        rdd = rdd.map(t => (key2valFunc(t._1), t._2)),
-        _cacheStorageLevel = StorageLevel.MEMORY_ONLY
-      )(unwrappedKeyTag.asInstanceOf[ClassTag[Any]])
+
+      val drmRdd = rdd.map { t => (key2valFunc(t._1), t._2)}
+
+      drmWrap(rdd = drmRdd, cacheHint = CacheHint.MEMORY_ONLY)(unwrappedKeyTag.asInstanceOf[ClassTag[Any]])
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index 384b986..3cdb797 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -22,10 +22,12 @@ import scala.reflect.ClassTag
 import org.apache.spark.SparkContext._
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
-import org.apache.mahout.math.{Matrix, Vector}
+import org.apache.mahout.math.{SequentialAccessSparseVector, Matrix, Vector}
 import org.apache.mahout.math.drm.logical.{OpAewScalar, OpAewB}
 import org.apache.log4j.Logger
 import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFuncScalar, ReduceFunc}
+import org.apache.mahout.sparkbindings.{BlockifiedDrmRdd, DrmRdd, drm}
+import org.apache.mahout.math.drm._
 
 /** Elementwise drm-drm operators */
 object AewB {
@@ -52,6 +54,7 @@ object AewB {
 
     val ewOps = getEWOps()
     val opId = op.op
+    val ncol = op.ncol
 
     val reduceFunc = opId match {
       case "+" => ewOps.plus
@@ -83,14 +86,24 @@ object AewB {
       log.debug("applying elementwise as join")
 
       a
+          // Full outer-join operands row-wise
           .cogroup(b, numPartitions = a.partitions.size max b.partitions.size)
+
+          // Reduce both sides. In case there are duplicate rows in RHS or LHS, they are
summed up
+          // prior to reduction.
           .map({
         case (key, (vectorSeqA, vectorSeqB)) =>
-          key -> reduceFunc(vectorSeqA.reduce(reduceFunc), vectorSeqB.reduce(reduceFunc))
+          val lhsVec: Vector = if (vectorSeqA.isEmpty) new SequentialAccessSparseVector(ncol)
+          else
+            (vectorSeqA.head /: vectorSeqA.tail)(_ += _)
+          val rhsVec: Vector = if (vectorSeqB.isEmpty) new SequentialAccessSparseVector(ncol)
+          else
+            (vectorSeqB.head /: vectorSeqB.tail)(_ += _)
+          key -> reduceFunc(lhsVec, rhsVec)
       })
     }
 
-    new DrmRddInput(rowWiseSrc = Some(op.ncol -> rdd))
+    new DrmRddInput(rowWiseSrc = Some(ncol -> rdd))
   }
 
   /** Physical algorithm to handle matrix-scalar operators like A - s or s -: A */
@@ -109,11 +122,21 @@ object AewB {
       case "/:" => ewOps.scalarDiv
       case default => throw new IllegalArgumentException("Unsupported elementwise operator:%s.".format(opId))
     }
-    val a = srcA.toBlockifiedDrmRdd()
-    val rdd = a
+
+    // Before obtaining blockified rdd, see if we have to fix int row key consistency so
that missing 
+    // rows can get lazily pre-populated with empty vectors before proceeding with elementwise
scalar.
+    val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows)
{
+      val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
+      drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
+    } else {
+      srcA.toBlockifiedDrmRdd()
+    }
+
+    val rdd = aBlockRdd
         .map({
       case (keys, block) => keys -> reduceFunc(block, scalar)
     })
+
     new DrmRddInput[K](blockifiedSrc = Some(rdd))
   }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
index 32d6fb5..9a50afa 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
@@ -18,6 +18,12 @@
 package org.apache.mahout.sparkbindings
 
 import scala.reflect.ClassTag
+import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput}
+import org.apache.spark.SparkContext._
+import org.apache.mahout.math._
+import org.apache.mahout.math.drm._
+import scalabindings._
+import RLikeOps._
 
 /**
  * This validation contains distributed algorithms that distributed matrix expression optimizer
picks
@@ -27,4 +33,39 @@ package object blas {
 
   implicit def drmRdd2ops[K:ClassTag](rdd:DrmRdd[K]):DrmRddOps[K] = new DrmRddOps[K](rdd)
 
+  private[mahout] def fixIntConsistency(op: DrmLike[Int], src: DrmRdd[Int]): DrmRdd[Int]
= {
+
+    if (op.canHaveMissingRows) {
+
+      val rdd = src
+      val sc = rdd.sparkContext
+      val dueRows = safeToNonNegInt(op.nrow)
+      val dueCols = op.ncol
+
+      // Compute the fix.
+      sc
+
+          // Bootstrap full key set
+          .parallelize(0 until dueRows, numSlices = rdd.partitions.size max 1)
+
+          // Enable PairedFunctions
+          .map(_ -> Unit)
+
+          // Cogroup with all rows
+          .cogroup(other = rdd)
+
+          // Filter out out-of-bounds
+          .filter { case (key, _) => key >= 0 && key < dueRows}
+
+          // Coalesce and output RHS
+          .map { case (key, (seqUnit, seqVec)) =>
+        val acc = seqVec.headOption.getOrElse(new SequentialAccessSparseVector(dueCols))
+        val vec = if (seqVec.size > 0) (acc /: seqVec.tail)(_ + _) else acc
+        key -> vec
+      }
+
+    } else src
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 674ff0a..03050bb 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -31,17 +31,36 @@ import org.apache.mahout.math.drm._
 import org.apache.mahout.sparkbindings._
 import org.apache.spark.SparkContext._
 
-/** Spark-specific optimizer-checkpointed DRM. */
+/** ==Spark-specific optimizer-checkpointed DRM.==
+  *
+  * @param rdd underlying rdd to wrap over.
+  * @param _nrow number of rows; if unspecified, we will compute with an inexpensive traversal.
+  * @param _ncol number of columns; if unspecified, we will try to guess with an inexpensive
traversal.
+  * @param _cacheStorageLevel storage level
+  * @param partitioningTag unique partitioning tag. Used to detect identically partitioned
operands.
+  * @param _canHaveMissingRows true if the matrix is int-keyed, and if it also may have missing
rows
+  *                            (will require a lazy fix for some physical operations.
+  * @param evidence$1 class tag context bound for K.
+  * @tparam K matrix key type (e.g. the keys of sequence files once persisted)
+  */
 class CheckpointedDrmSpark[K: ClassTag](
     val rdd: DrmRdd[K],
     private var _nrow: Long = -1L,
     private var _ncol: Int = -1,
     private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
-    override protected[mahout] val partitioningTag: Long = Random.nextLong()
+    override protected[mahout] val partitioningTag: Long = Random.nextLong(),
+    private var _canHaveMissingRows: Boolean = false
     ) extends CheckpointedDrm[K] {
 
   lazy val nrow = if (_nrow >= 0) _nrow else computeNRow
   lazy val ncol = if (_ncol >= 0) _ncol else computeNCol
+  lazy val canHaveMissingRows: Boolean = {
+    nrow
+    _canHaveMissingRows
+  }
+
+  //  private[mahout] var canHaveMissingRows = false
+  private[mahout] var intFixExtra: Long = 0L
 
   private var cached: Boolean = false
   override val context: DistributedContext = rdd.context
@@ -69,7 +88,7 @@ class CheckpointedDrmSpark[K: ClassTag](
    * if matrix was previously persisted into cache,
    * delete cached representation
    */
-  def uncache() = {
+  def uncache(): this.type = {
     if (cached) {
       rdd.unpersist(blocking = false)
       cached = false
@@ -77,7 +96,7 @@ class CheckpointedDrmSpark[K: ClassTag](
     this
   }
 
-//  def mapRows(mapfun: (K, Vector) => Vector): CheckpointedDrmSpark[K] =
+  //  def mapRows(mapfun: (K, Vector) => Vector): CheckpointedDrmSpark[K] =
 //    new CheckpointedDrmSpark[K](rdd.map(t => (t._1, mapfun(t._1, t._2))))
 
 
@@ -151,12 +170,25 @@ class CheckpointedDrmSpark[K: ClassTag](
 
     val intRowIndex = classTag[K] == classTag[Int]
 
-    if (intRowIndex)
-      cache().rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L
-    else
+    if (intRowIndex) {
+      val rdd = cache().rdd.asInstanceOf[DrmRdd[Int]]
+
+      // I guess it is a suitable place to compute int keys consistency test here because
we know
+      // that nrow can be computed lazily, which always happens when rdd is already available,
cached,
+      // and it's ok to compute small summaries without triggering huge pipelines. Which
usually
+      // happens right after things like drmFromHDFS or drmWrap().
+      val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L
+      val rowCount = rdd.count()
+      _canHaveMissingRows = maxPlus1 != rowCount ||
+        rdd.map(_._1).sum().toLong != ((rowCount -1.0 ) * (rowCount -2.0) /2.0).toLong
+      intFixExtra = (maxPlus1 - rowCount) max 0L
+      maxPlus1
+    } else
       cache().rdd.count()
   }
 
+
+
   protected def computeNCol =
     cache().rdd.map(_._2.length).fold(-1)(max(_, _))
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index 8726766..6639a34 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -24,7 +24,7 @@ import org.apache.mahout.common.IOUtils
 import org.apache.log4j.Logger
 import org.apache.mahout.math.drm._
 import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.{SparkBCast, CheckpointedDrmSparkOps, CheckpointedDrmSpark}
+import org.apache.mahout.sparkbindings.drm.{DrmRddInput, SparkBCast, CheckpointedDrmSparkOps,
CheckpointedDrmSpark}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.broadcast.Broadcast
 import org.apache.mahout.math.{VectorWritable, Vector, MatrixWritable, Matrix}
@@ -176,17 +176,36 @@ package object sparkbindings {
 
   private[sparkbindings] implicit def w2v(w:VectorWritable):Vector = w.get()
 
-  def drmWrap[K : ClassTag](
+  /**
+   * ==Wrap existing RDD into a matrix==
+   *
+   * @param rdd source rdd conforming to [[org.apache.mahout.sparkbindings.DrmRdd]]
+   * @param nrow optional, number of rows. If not specified, we'll try to figure out on our
own.
+   * @param ncol optional, number of columns. If not specififed, we'll try to figure out
on our own.
+   * @param cacheHint optional, desired cache policy for that rdd.
+   * @param canHaveMissingRows optional. For int-keyed rows, there might be implied but missing
rows.
+   *                           If underlying rdd may have that condition, we need to know
since some
+   *                           operators consider that a deficiency and we'll need to fix
it lazily
+   *                           before proceeding with such operators. It only meaningful
if `nrow` is
+   *                           also specified (otherwise, we'll run quick test to figure
if rows may
+   *                           be missing, at the time we count the rows).
+   * @tparam K row key type
+   * @return wrapped DRM
+   */
+  def drmWrap[K: ClassTag](
       rdd: DrmRdd[K],
       nrow: Int = -1,
       ncol: Int = -1,
-      cacheHint:CacheHint.CacheHint = CacheHint.NONE
+      cacheHint: CacheHint.CacheHint = CacheHint.NONE,
+      canHaveMissingRows: Boolean = false
       ): CheckpointedDrm[K] =
+
     new CheckpointedDrmSpark[K](
       rdd = rdd,
       _nrow = nrow,
       _ncol = ncol,
-      _cacheStorageLevel = SparkEngine.cacheHint2Spark(cacheHint)
+      _cacheStorageLevel = SparkEngine.cacheHint2Spark(cacheHint),
+      _canHaveMissingRows = canHaveMissingRows
     )
 
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
index bf635dc..c47f7f1 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
@@ -56,4 +56,5 @@ class DrmLikeSuite extends FunSuite with DistributedSparkSuite with DrmLikeSuite
         keys -> block
     }).norm should be < 1e-4
   }
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
index b15c72c..2a4f213 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
@@ -19,9 +19,98 @@ package org.apache.mahout.sparkbindings.drm
 
 import org.scalatest.FunSuite
 import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
 import drm._
 import org.apache.mahout.sparkbindings._
+import RLikeDrmOps._
 import test.DistributedSparkSuite
 
 /** ==R-like DRM DSL operation tests -- Spark== */
-class RLikeDrmOpsSuite extends FunSuite with DistributedSparkSuite with RLikeDrmOpsSuiteBase
+class RLikeDrmOpsSuite extends FunSuite with DistributedSparkSuite with RLikeDrmOpsSuiteBase
{
+
+  test("C = A + B missing rows") {
+    val sc = mahoutCtx.asInstanceOf[SparkDistributedContext].sc
+
+    // Concoct an rdd with missing rows
+    val aRdd: DrmRdd[Int] = sc.parallelize(
+      0 -> dvec(1, 2, 3) ::
+          3 -> dvec(4, 5, 6) :: Nil
+    ).map { case (key, vec) => key -> (vec: Vector)}
+
+    val bRdd: DrmRdd[Int] = sc.parallelize(
+      1 -> dvec(2, 3, 4) ::
+          2 -> dvec(3, 4, 5) :: Nil
+    ).map { case (key, vec) => key -> (vec: Vector)}
+
+    val drmA = drmWrap(rdd=aRdd)
+    val drmB = drmWrap(rdd = bRdd, nrow = 4, canHaveMissingRows = true)
+    val drmC = drmA + drmB
+    val controlC = drmA.collect + drmB.collect
+
+    (drmC -: controlC).norm should be < 1e-10
+
+  }
+
+  test("C = cbind(A, B) with missing rows") {
+    val sc = mahoutCtx.asInstanceOf[SparkDistributedContext].sc
+
+    // Concoct an rdd with missing rows
+    val aRdd: DrmRdd[Int] = sc.parallelize(
+      1 -> dvec(2, 2, 3) ::
+          3 -> dvec(4, 5, 6) :: Nil
+    ).map { case (key, vec) => key -> (vec: Vector)}
+
+    val bRdd: DrmRdd[Int] = sc.parallelize(
+      1 -> dvec(2, 3, 4) ::
+          2 -> dvec(3, 4, 5) :: Nil
+    ).map { case (key, vec) => key -> (vec: Vector)}
+
+    val drmA = drmWrap(rdd=aRdd)
+    val drmB = drmWrap(rdd = bRdd, nrow = 4, canHaveMissingRows = true)
+    val drmC = drmA.cbind(drmB)
+
+    val controlC = new DenseMatrix(safeToNonNegInt(drmA.nrow), drmA.ncol + drmB.ncol)
+    controlC(::, 0 until drmA.ncol) := drmA
+    controlC(::, drmA.ncol until drmA.ncol + drmB.ncol) := drmB
+
+    (drmC -: controlC).norm should be < 1e-10
+
+  }
+
+  test("B = A + 1.0 missing rows") {
+
+    val sc = mahoutCtx.asInstanceOf[SparkDistributedContext].sc
+
+    // Concoct an rdd with missing rows
+    val aRdd: DrmRdd[Int] = sc.parallelize(
+      0 -> dvec(1, 2, 3) ::
+          3 -> dvec(3, 4, 5) :: Nil
+    ).map { case (key, vec) => key -> (vec: Vector)}
+
+    val drmA = drmWrap(rdd = aRdd)
+
+    drmA.canHaveMissingRows should equal(true)
+
+    val inCoreA = drmA.collect
+
+    printf("collected A = \n%s\n", inCoreA)
+
+    val controlB = inCoreA + 1.0
+
+    val drmB = drmA + 1.0
+
+    printf ("collected B = \n%s\n", drmB.collect)
+
+    (drmB -: controlB).norm should be < 1e-10
+
+    // Test that unary operators don't obscure the fact that source had missing rows
+    val drmC = drmA.mapBlock() { case (keys, block) =>
+      keys -> block
+    } + 1.0
+
+    (drmC -: controlB).norm should be < 1e-10
+
+  }
+
+}


Mime
View raw message