Return-Path: X-Original-To: apmail-mahout-commits-archive@www.apache.org Delivered-To: apmail-mahout-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D21AF11F13 for ; Mon, 28 Jul 2014 17:34:27 +0000 (UTC) Received: (qmail 12075 invoked by uid 500); 28 Jul 2014 17:34:27 -0000 Delivered-To: apmail-mahout-commits-archive@mahout.apache.org Received: (qmail 12020 invoked by uid 500); 28 Jul 2014 17:34:27 -0000 Mailing-List: contact commits-help@mahout.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mahout.apache.org Delivered-To: mailing list commits@mahout.apache.org Received: (qmail 12011 invoked by uid 99); 28 Jul 2014 17:34:27 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Jul 2014 17:34:27 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 45F239B8359; Mon, 28 Jul 2014 17:34:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dlyubimov@apache.org To: commits@mahout.apache.org Message-Id: <393ed0e9c0124792a255e7d81a6c19da@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: MAHOUT-1597: A + 1.0 (element-wise scala operation) gives wrong result if rdd is missing rows, Spark side (dlyubimov) This closes apache/mahout#33 Date: Mon, 28 Jul 2014 17:34:27 +0000 (UTC) Repository: mahout Updated Branches: refs/heads/master c9d978a46 -> 377dcace1 MAHOUT-1597: A + 1.0 (element-wise scala operation) gives wrong result if rdd is missing rows, Spark side (dlyubimov) This closes apache/mahout#33 Squashed commit of the following: commit 9038b2c248ec163e0ff85dd37cb109345b3bbb9d Author: Dmitriy Lyubimov Date: Mon Jul 28 10:22:30 2014 -0700 in-place performance enhancement at the risk of side-effects, for now commit 35c873cceeda87b7b212677bffc0a21877932deb Author: Dmitriy Lyubimov Date: Tue Jul 22 15:34:00 2014 -0700 CBind test tweak commit 005aeccd353faeb59f2ddb2003f393a603978edb Author: Dmitriy Lyubimov Date: Tue Jul 22 15:33:18 2014 -0700 Cbind test commit 57f669a7d50097a5a816ce13ca8230f6c6742b65 Author: Dmitriy Lyubimov Date: Tue Jul 22 14:57:57 2014 -0700 Also fixing `A ew B` with missing rows operands commit 2063c5ccaf88bf31d7f5c1dd3d1650925c4dfd58 Author: Dmitriy Lyubimov Date: Tue Jul 22 14:12:56 2014 -0700 style commit 5d8e1407a7ea2535ae6d00701ed4f60390c1b30e Author: Dmitriy Lyubimov Date: Tue Jul 22 14:09:11 2014 -0700 Orientation changing unary ops cannot produce missing rows commit c9ac3be81ed464ccc4d440b8187e15efa9a21193 Author: Dmitriy Lyubimov Date: Tue Jul 22 14:03:25 2014 -0700 Tests, passing . commit 1ff376b2ddd1bcbe61f896d14e27d7a413e7313c Author: Dmitriy Lyubimov Date: Tue Jul 22 13:23:14 2014 -0700 Code up for lazy int-keyed missing rows fix commit 746b3ddc6c0e7e8bb89ce591c32ba1b70ec688e6 Author: Dmitriy Lyubimov Date: Tue Jul 22 11:25:57 2014 -0700 WIP commit 45642b65f3f1620a4e2187af4b2b54e26ce1c42e Author: Dmitriy Lyubimov Date: Mon Jul 21 18:19:37 2014 -0700 WIP Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/377dcace Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/377dcace Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/377dcace Branch: refs/heads/master Commit: 377dcace16fe4fd77baaab91ad575e9da5c49ac0 Parents: c9d978a Author: Dmitriy Lyubimov Authored: Mon Jul 28 10:25:05 2014 -0700 Committer: Dmitriy Lyubimov Committed: Mon Jul 28 10:25:05 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../mahout/math/drm/CheckpointedDrm.scala | 2 +- .../org/apache/mahout/math/drm/DrmLike.scala | 2 + .../math/drm/logical/AbstractBinaryOp.scala | 2 + .../math/drm/logical/AbstractUnaryOp.scala | 1 + .../mahout/math/drm/logical/OpAewScalar.scala | 3 + .../apache/mahout/math/drm/logical/OpAt.scala | 2 + .../apache/mahout/math/drm/logical/OpAtA.scala | 4 +- .../mahout/math/drm/RLikeDrmOpsSuiteBase.scala | 9 ++ .../mahout/sparkbindings/SparkEngine.scala | 8 +- .../apache/mahout/sparkbindings/blas/AewB.scala | 33 +++++-- .../mahout/sparkbindings/blas/package.scala | 41 +++++++++ .../drm/CheckpointedDrmSpark.scala | 46 ++++++++-- .../apache/mahout/sparkbindings/package.scala | 27 +++++- .../mahout/sparkbindings/drm/DrmLikeSuite.scala | 1 + .../sparkbindings/drm/RLikeDrmOpsSuite.scala | 91 +++++++++++++++++++- 16 files changed, 250 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 78b17b5..50909e2 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,8 @@ Mahout Change Log Release 1.0 - unreleased + MAHOUT-1597: A + 1.0 (element-wise scala operation) gives wrong result if rdd is missing rows, Spark side (dlyubimov) + MAHOUT-1595: MatrixVectorView - implement a proper iterateNonZero() (Anand Avati via dlyubimov) MAHOUT-1529(e): Move dense/sparse matrix test in mapBlock into spark (Anand Avati via dlyubimov) http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala index 0266944..28fb7fd 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala @@ -31,6 +31,6 @@ trait CheckpointedDrm[K] extends DrmLike[K] { def writeDRM(path: String) /** If this checkpoint is already declared cached, uncache. */ - def uncache() + def uncache(): this.type } http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala index 995c873..97fe989 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala @@ -30,6 +30,8 @@ trait DrmLike[K] { protected[mahout] def partitioningTag: Long + protected[mahout] def canHaveMissingRows: Boolean + /** * Distributed context, can be implicitly converted to operations on [[org.apache.mahout.math.drm. * DistributedEngine]]. http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala index 7863526..3b6b8bf 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala @@ -42,6 +42,8 @@ abstract class AbstractBinaryOp[A: ClassTag, B: ClassTag, K: ClassTag] protected[drm] var B: DrmLike[B] lazy val context: DistributedContext = A.context + protected[mahout] def canHaveMissingRows: Boolean = false + // These are explicit evidence export. Sometimes scala falls over to figure that on its own. def classTagA: ClassTag[A] = implicitly[ClassTag[A]] http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala index 92abdb4..a445f21 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala @@ -32,5 +32,6 @@ abstract class AbstractUnaryOp[A: ClassTag, K: ClassTag] def classTagK: ClassTag[K] = implicitly[ClassTag[K]] + override protected[mahout] lazy val canHaveMissingRows: Boolean = A.canHaveMissingRows } http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala index 91e0dd4..3b651f6 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala @@ -29,6 +29,9 @@ case class OpAewScalar[K: ClassTag]( override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag + /** Stuff like `A +1` is always supposed to fix this */ + override protected[mahout] lazy val canHaveMissingRows: Boolean = false + /** R-like syntax for number of rows. */ def nrow: Long = A.nrow http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala index 3239ad2..4791301 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala @@ -30,4 +30,6 @@ case class OpAt( /** R-like syntax for number of columns */ def ncol: Int = safeToNonNegInt(A.nrow) + /** A' after simplifications cannot produce missing rows, ever. */ + override protected[mahout] lazy val canHaveMissingRows: Boolean = false } http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala index c7c6046..ad2a5d8 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala @@ -31,6 +31,6 @@ case class OpAtA[K: ClassTag]( /** R-like syntax for number of columns */ def ncol: Int = A.ncol - /** Non-zero element count */ - def nNonZero: Long = throw new UnsupportedOperationException + override protected[mahout] lazy val canHaveMissingRows: Boolean = false + } http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala index 71dc640..50beccf 100644 --- a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala +++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala @@ -480,4 +480,13 @@ trait RLikeDrmOpsSuiteBase extends DistributedMahoutSuite with Matchers { } + test("B = A + 1.0") { + val inCoreA = dense((1, 2), (2, 3), (3, 4)) + val controlB = inCoreA + 1.0 + + val drmB = drmParallelize(m = inCoreA, numPartitions = 2) + 1.0 + + (drmB -: controlB).norm should be < 1e-10 + } + } http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index b68a98e..c37354f 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -161,10 +161,10 @@ object SparkEngine extends DistributedEngine { { implicit def getWritable(x: Any): Writable = val2keyFunc() - new CheckpointedDrmSpark( - rdd = rdd.map(t => (key2valFunc(t._1), t._2)), - _cacheStorageLevel = StorageLevel.MEMORY_ONLY - )(unwrappedKeyTag.asInstanceOf[ClassTag[Any]]) + + val drmRdd = rdd.map { t => (key2valFunc(t._1), t._2)} + + drmWrap(rdd = drmRdd, cacheHint = CacheHint.MEMORY_ONLY)(unwrappedKeyTag.asInstanceOf[ClassTag[Any]]) } } http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala index 384b986..3cdb797 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala @@ -22,10 +22,12 @@ import scala.reflect.ClassTag import org.apache.spark.SparkContext._ import org.apache.mahout.math.scalabindings._ import RLikeOps._ -import org.apache.mahout.math.{Matrix, Vector} +import org.apache.mahout.math.{SequentialAccessSparseVector, Matrix, Vector} import org.apache.mahout.math.drm.logical.{OpAewScalar, OpAewB} import org.apache.log4j.Logger import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFuncScalar, ReduceFunc} +import org.apache.mahout.sparkbindings.{BlockifiedDrmRdd, DrmRdd, drm} +import org.apache.mahout.math.drm._ /** Elementwise drm-drm operators */ object AewB { @@ -52,6 +54,7 @@ object AewB { val ewOps = getEWOps() val opId = op.op + val ncol = op.ncol val reduceFunc = opId match { case "+" => ewOps.plus @@ -83,14 +86,24 @@ object AewB { log.debug("applying elementwise as join") a + // Full outer-join operands row-wise .cogroup(b, numPartitions = a.partitions.size max b.partitions.size) + + // Reduce both sides. In case there are duplicate rows in RHS or LHS, they are summed up + // prior to reduction. .map({ case (key, (vectorSeqA, vectorSeqB)) => - key -> reduceFunc(vectorSeqA.reduce(reduceFunc), vectorSeqB.reduce(reduceFunc)) + val lhsVec: Vector = if (vectorSeqA.isEmpty) new SequentialAccessSparseVector(ncol) + else + (vectorSeqA.head /: vectorSeqA.tail)(_ += _) + val rhsVec: Vector = if (vectorSeqB.isEmpty) new SequentialAccessSparseVector(ncol) + else + (vectorSeqB.head /: vectorSeqB.tail)(_ += _) + key -> reduceFunc(lhsVec, rhsVec) }) } - new DrmRddInput(rowWiseSrc = Some(op.ncol -> rdd)) + new DrmRddInput(rowWiseSrc = Some(ncol -> rdd)) } /** Physical algorithm to handle matrix-scalar operators like A - s or s -: A */ @@ -109,11 +122,21 @@ object AewB { case "/:" => ewOps.scalarDiv case default => throw new IllegalArgumentException("Unsupported elementwise operator:%s.".format(opId)) } - val a = srcA.toBlockifiedDrmRdd() - val rdd = a + + // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing + // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar. + val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows) { + val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]]) + drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]] + } else { + srcA.toBlockifiedDrmRdd() + } + + val rdd = aBlockRdd .map({ case (keys, block) => keys -> reduceFunc(block, scalar) }) + new DrmRddInput[K](blockifiedSrc = Some(rdd)) } } http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala index 32d6fb5..9a50afa 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala @@ -18,6 +18,12 @@ package org.apache.mahout.sparkbindings import scala.reflect.ClassTag +import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput} +import org.apache.spark.SparkContext._ +import org.apache.mahout.math._ +import org.apache.mahout.math.drm._ +import scalabindings._ +import RLikeOps._ /** * This validation contains distributed algorithms that distributed matrix expression optimizer picks @@ -27,4 +33,39 @@ package object blas { implicit def drmRdd2ops[K:ClassTag](rdd:DrmRdd[K]):DrmRddOps[K] = new DrmRddOps[K](rdd) + private[mahout] def fixIntConsistency(op: DrmLike[Int], src: DrmRdd[Int]): DrmRdd[Int] = { + + if (op.canHaveMissingRows) { + + val rdd = src + val sc = rdd.sparkContext + val dueRows = safeToNonNegInt(op.nrow) + val dueCols = op.ncol + + // Compute the fix. + sc + + // Bootstrap full key set + .parallelize(0 until dueRows, numSlices = rdd.partitions.size max 1) + + // Enable PairedFunctions + .map(_ -> Unit) + + // Cogroup with all rows + .cogroup(other = rdd) + + // Filter out out-of-bounds + .filter { case (key, _) => key >= 0 && key < dueRows} + + // Coalesce and output RHS + .map { case (key, (seqUnit, seqVec)) => + val acc = seqVec.headOption.getOrElse(new SequentialAccessSparseVector(dueCols)) + val vec = if (seqVec.size > 0) (acc /: seqVec.tail)(_ + _) else acc + key -> vec + } + + } else src + + } + } http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index 674ff0a..03050bb 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -31,17 +31,36 @@ import org.apache.mahout.math.drm._ import org.apache.mahout.sparkbindings._ import org.apache.spark.SparkContext._ -/** Spark-specific optimizer-checkpointed DRM. */ +/** ==Spark-specific optimizer-checkpointed DRM.== + * + * @param rdd underlying rdd to wrap over. + * @param _nrow number of rows; if unspecified, we will compute with an inexpensive traversal. + * @param _ncol number of columns; if unspecified, we will try to guess with an inexpensive traversal. + * @param _cacheStorageLevel storage level + * @param partitioningTag unique partitioning tag. Used to detect identically partitioned operands. + * @param _canHaveMissingRows true if the matrix is int-keyed, and if it also may have missing rows + * (will require a lazy fix for some physical operations. + * @param evidence$1 class tag context bound for K. + * @tparam K matrix key type (e.g. the keys of sequence files once persisted) + */ class CheckpointedDrmSpark[K: ClassTag]( val rdd: DrmRdd[K], private var _nrow: Long = -1L, private var _ncol: Int = -1, private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, - override protected[mahout] val partitioningTag: Long = Random.nextLong() + override protected[mahout] val partitioningTag: Long = Random.nextLong(), + private var _canHaveMissingRows: Boolean = false ) extends CheckpointedDrm[K] { lazy val nrow = if (_nrow >= 0) _nrow else computeNRow lazy val ncol = if (_ncol >= 0) _ncol else computeNCol + lazy val canHaveMissingRows: Boolean = { + nrow + _canHaveMissingRows + } + + // private[mahout] var canHaveMissingRows = false + private[mahout] var intFixExtra: Long = 0L private var cached: Boolean = false override val context: DistributedContext = rdd.context @@ -69,7 +88,7 @@ class CheckpointedDrmSpark[K: ClassTag]( * if matrix was previously persisted into cache, * delete cached representation */ - def uncache() = { + def uncache(): this.type = { if (cached) { rdd.unpersist(blocking = false) cached = false @@ -77,7 +96,7 @@ class CheckpointedDrmSpark[K: ClassTag]( this } -// def mapRows(mapfun: (K, Vector) => Vector): CheckpointedDrmSpark[K] = + // def mapRows(mapfun: (K, Vector) => Vector): CheckpointedDrmSpark[K] = // new CheckpointedDrmSpark[K](rdd.map(t => (t._1, mapfun(t._1, t._2)))) @@ -151,12 +170,25 @@ class CheckpointedDrmSpark[K: ClassTag]( val intRowIndex = classTag[K] == classTag[Int] - if (intRowIndex) - cache().rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L - else + if (intRowIndex) { + val rdd = cache().rdd.asInstanceOf[DrmRdd[Int]] + + // I guess it is a suitable place to compute int keys consistency test here because we know + // that nrow can be computed lazily, which always happens when rdd is already available, cached, + // and it's ok to compute small summaries without triggering huge pipelines. Which usually + // happens right after things like drmFromHDFS or drmWrap(). + val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L + val rowCount = rdd.count() + _canHaveMissingRows = maxPlus1 != rowCount || + rdd.map(_._1).sum().toLong != ((rowCount -1.0 ) * (rowCount -2.0) /2.0).toLong + intFixExtra = (maxPlus1 - rowCount) max 0L + maxPlus1 + } else cache().rdd.count() } + + protected def computeNCol = cache().rdd.map(_._2.length).fold(-1)(max(_, _)) http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala index 8726766..6639a34 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala @@ -24,7 +24,7 @@ import org.apache.mahout.common.IOUtils import org.apache.log4j.Logger import org.apache.mahout.math.drm._ import scala.reflect.ClassTag -import org.apache.mahout.sparkbindings.drm.{SparkBCast, CheckpointedDrmSparkOps, CheckpointedDrmSpark} +import org.apache.mahout.sparkbindings.drm.{DrmRddInput, SparkBCast, CheckpointedDrmSparkOps, CheckpointedDrmSpark} import org.apache.spark.rdd.RDD import org.apache.spark.broadcast.Broadcast import org.apache.mahout.math.{VectorWritable, Vector, MatrixWritable, Matrix} @@ -176,17 +176,36 @@ package object sparkbindings { private[sparkbindings] implicit def w2v(w:VectorWritable):Vector = w.get() - def drmWrap[K : ClassTag]( + /** + * ==Wrap existing RDD into a matrix== + * + * @param rdd source rdd conforming to [[org.apache.mahout.sparkbindings.DrmRdd]] + * @param nrow optional, number of rows. If not specified, we'll try to figure out on our own. + * @param ncol optional, number of columns. If not specififed, we'll try to figure out on our own. + * @param cacheHint optional, desired cache policy for that rdd. + * @param canHaveMissingRows optional. For int-keyed rows, there might be implied but missing rows. + * If underlying rdd may have that condition, we need to know since some + * operators consider that a deficiency and we'll need to fix it lazily + * before proceeding with such operators. It only meaningful if `nrow` is + * also specified (otherwise, we'll run quick test to figure if rows may + * be missing, at the time we count the rows). + * @tparam K row key type + * @return wrapped DRM + */ + def drmWrap[K: ClassTag]( rdd: DrmRdd[K], nrow: Int = -1, ncol: Int = -1, - cacheHint:CacheHint.CacheHint = CacheHint.NONE + cacheHint: CacheHint.CacheHint = CacheHint.NONE, + canHaveMissingRows: Boolean = false ): CheckpointedDrm[K] = + new CheckpointedDrmSpark[K]( rdd = rdd, _nrow = nrow, _ncol = ncol, - _cacheStorageLevel = SparkEngine.cacheHint2Spark(cacheHint) + _cacheStorageLevel = SparkEngine.cacheHint2Spark(cacheHint), + _canHaveMissingRows = canHaveMissingRows ) http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala index bf635dc..c47f7f1 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala @@ -56,4 +56,5 @@ class DrmLikeSuite extends FunSuite with DistributedSparkSuite with DrmLikeSuite keys -> block }).norm should be < 1e-4 } + } http://git-wip-us.apache.org/repos/asf/mahout/blob/377dcace/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala index b15c72c..2a4f213 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala @@ -19,9 +19,98 @@ package org.apache.mahout.sparkbindings.drm import org.scalatest.FunSuite import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ import drm._ import org.apache.mahout.sparkbindings._ +import RLikeDrmOps._ import test.DistributedSparkSuite /** ==R-like DRM DSL operation tests -- Spark== */ -class RLikeDrmOpsSuite extends FunSuite with DistributedSparkSuite with RLikeDrmOpsSuiteBase +class RLikeDrmOpsSuite extends FunSuite with DistributedSparkSuite with RLikeDrmOpsSuiteBase { + + test("C = A + B missing rows") { + val sc = mahoutCtx.asInstanceOf[SparkDistributedContext].sc + + // Concoct an rdd with missing rows + val aRdd: DrmRdd[Int] = sc.parallelize( + 0 -> dvec(1, 2, 3) :: + 3 -> dvec(4, 5, 6) :: Nil + ).map { case (key, vec) => key -> (vec: Vector)} + + val bRdd: DrmRdd[Int] = sc.parallelize( + 1 -> dvec(2, 3, 4) :: + 2 -> dvec(3, 4, 5) :: Nil + ).map { case (key, vec) => key -> (vec: Vector)} + + val drmA = drmWrap(rdd=aRdd) + val drmB = drmWrap(rdd = bRdd, nrow = 4, canHaveMissingRows = true) + val drmC = drmA + drmB + val controlC = drmA.collect + drmB.collect + + (drmC -: controlC).norm should be < 1e-10 + + } + + test("C = cbind(A, B) with missing rows") { + val sc = mahoutCtx.asInstanceOf[SparkDistributedContext].sc + + // Concoct an rdd with missing rows + val aRdd: DrmRdd[Int] = sc.parallelize( + 1 -> dvec(2, 2, 3) :: + 3 -> dvec(4, 5, 6) :: Nil + ).map { case (key, vec) => key -> (vec: Vector)} + + val bRdd: DrmRdd[Int] = sc.parallelize( + 1 -> dvec(2, 3, 4) :: + 2 -> dvec(3, 4, 5) :: Nil + ).map { case (key, vec) => key -> (vec: Vector)} + + val drmA = drmWrap(rdd=aRdd) + val drmB = drmWrap(rdd = bRdd, nrow = 4, canHaveMissingRows = true) + val drmC = drmA.cbind(drmB) + + val controlC = new DenseMatrix(safeToNonNegInt(drmA.nrow), drmA.ncol + drmB.ncol) + controlC(::, 0 until drmA.ncol) := drmA + controlC(::, drmA.ncol until drmA.ncol + drmB.ncol) := drmB + + (drmC -: controlC).norm should be < 1e-10 + + } + + test("B = A + 1.0 missing rows") { + + val sc = mahoutCtx.asInstanceOf[SparkDistributedContext].sc + + // Concoct an rdd with missing rows + val aRdd: DrmRdd[Int] = sc.parallelize( + 0 -> dvec(1, 2, 3) :: + 3 -> dvec(3, 4, 5) :: Nil + ).map { case (key, vec) => key -> (vec: Vector)} + + val drmA = drmWrap(rdd = aRdd) + + drmA.canHaveMissingRows should equal(true) + + val inCoreA = drmA.collect + + printf("collected A = \n%s\n", inCoreA) + + val controlB = inCoreA + 1.0 + + val drmB = drmA + 1.0 + + printf ("collected B = \n%s\n", drmB.collect) + + (drmB -: controlB).norm should be < 1e-10 + + // Test that unary operators don't obscure the fact that source had missing rows + val drmC = drmA.mapBlock() { case (keys, block) => + keys -> block + } + 1.0 + + (drmC -: controlB).norm should be < 1e-10 + + } + +}