mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlyubi...@apache.org
Subject svn commit: r1592933 - in /mahout/trunk/spark/src: main/scala/org/apache/mahout/sparkbindings/decompositions/ main/scala/org/apache/mahout/sparkbindings/drm/ main/scala/org/apache/mahout/sparkbindings/drm/plan/ test/scala/org/apache/mahout/sparkbinding...
Date Wed, 07 May 2014 01:28:34 GMT
Author: dlyubimov
Date: Wed May  7 01:28:34 2014
New Revision: 1592933

URL: http://svn.apache.org/r1592933
Log:
MAHOUT-1529: completely abstracting away dssvd, dspca and dqr: introducing CacheHint enum

Squashed commit of the following:

commit 6c4bf1650f0e87d0d1fc5b9b23c94f6e3553b74d
Merge: a748e8b 0c5a754
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Tue May 6 18:20:36 2014 -0700

    Merge branch 'trunk' into MAHOUT-1529

commit a748e8b8be2ad7ce44af231147b236726704b561
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Tue May 6 18:19:35 2014 -0700

    MAHOUT-1529: completely abstracting away dssvd, dspca and dqr: introducing CacheHint enum

Added:
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
Modified:
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/CheckpointAction.scala
    mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
    mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala

Modified: mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala?rev=1592933&r1=1592932&r2=1592933&view=diff
==============================================================================
--- mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
(original)
+++ mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
Wed May  7 01:28:34 2014
@@ -23,7 +23,6 @@ import org.apache.mahout.math.scalabindi
 import RLikeOps._
 import org.apache.mahout.sparkbindings.drm._
 import RLikeDrmOps._
-import org.apache.spark.storage.StorageLevel
 import org.apache.mahout.common.RandomUtils
 
 object DSPCA {
@@ -138,7 +137,7 @@ object DSPCA {
     }
 
     val c = s_q cross s_b
-    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(StorageLevel.NONE).collect -
+    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
         c - c.t + (s_q cross s_q) * (xi dot xi)
     val (inCoreUHat, d) = eigen(inCoreBBt)
     val s = d.sqrt

Modified: mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala?rev=1592933&r1=1592932&r2=1592933&view=diff
==============================================================================
--- mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
(original)
+++ mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
Wed May  7 01:28:34 2014
@@ -6,9 +6,6 @@ import org.apache.mahout.math.scalabindi
 import RLikeOps._
 import org.apache.mahout.sparkbindings.drm._
 import RLikeDrmOps._
-import scala.util.Random
-import org.apache.spark.SparkContext._
-import org.apache.spark.storage.StorageLevel
 import org.apache.mahout.common.RandomUtils
 
 object DSSVD {
@@ -71,7 +68,7 @@ object DSSVD {
       if (i == q - 1) drmBt = drmBt.checkpoint()
     }
 
-    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(StorageLevel.NONE).collect
+    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect
     val (inCoreUHat, d) = eigen(inCoreBBt)
     val s = d.sqrt
 

Added: mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala?rev=1592933&view=auto
==============================================================================
--- mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
(added)
+++ mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
Wed May  7 01:28:34 2014
@@ -0,0 +1,20 @@
+package org.apache.mahout.sparkbindings.drm
+
+
+object CacheHint extends Enumeration {
+
+  type CacheHint = Value
+
+  val NONE,
+  DISK_ONLY,
+  DISK_ONLY_2,
+  MEMORY_ONLY,
+  MEMORY_ONLY_2,
+  MEMORY_ONLY_SER,
+  MEMORY_ONLY_SER_2,
+  MEMORY_AND_DISK,
+  MEMORY_AND_DISK_2,
+  MEMORY_AND_DISK_SER,
+  MEMORY_AND_DISK_SER_2 = Value
+
+}

Modified: mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala?rev=1592933&r1=1592932&r2=1592933&view=diff
==============================================================================
--- mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
(original)
+++ mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
Wed May  7 01:28:34 2014
@@ -48,7 +48,7 @@ class CheckpointedDrmBase[K: ClassTag](
    * Action operator -- does not necessary means Spark action; but does mean running BLAS
optimizer
    * and writing down Spark graph lineage since last checkpointed DRM.
    */
-  def checkpoint(sLevel: StorageLevel): CheckpointedDrm[K] =
+  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] =
   // We are already checkpointed in a sense that we already have Spark lineage. So just return
self.
     this
 

Modified: mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala?rev=1592933&r1=1592932&r2=1592933&view=diff
==============================================================================
--- mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala (original)
+++ mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala Wed
May  7 01:28:34 2014
@@ -41,6 +41,6 @@ trait DrmLike[K] {
    * Action operator -- does not necessary means Spark action; but does mean running BLAS
optimizer
    * and writing down Spark graph lineage since last checkpointed DRM.
    */
-  def checkpoint(sLevel: StorageLevel = StorageLevel.MEMORY_ONLY): CheckpointedDrm[K]
+  def checkpoint(cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): CheckpointedDrm[K]
 
 }

Modified: mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala?rev=1592933&r1=1592932&r2=1592933&view=diff
==============================================================================
--- mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala (original)
+++ mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala Wed
May  7 01:28:34 2014
@@ -54,6 +54,9 @@ package object drm {
   /** Block-map func */
   type BlockMapFunc[S, R] = BlockifiedDrmTuple[S] => BlockifiedDrmTuple[R]
 
+  /** CacheHint type */
+//  type CacheHint = CacheHint.CacheHint
+
   implicit def input2drmRdd[K](input: DrmRddInput[K]): DrmRdd[K] = input.toDrmRdd()
 
   implicit def input2blockifiedDrmRdd[K](input: DrmRddInput[K]): BlockifiedDrmRdd[K] = input.toBlockifiedDrmRdd()

Modified: mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/CheckpointAction.scala
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/CheckpointAction.scala?rev=1592933&r1=1592932&r2=1592933&view=diff
==============================================================================
--- mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/CheckpointAction.scala
(original)
+++ mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/plan/CheckpointAction.scala
Wed May  7 01:28:34 2014
@@ -43,7 +43,7 @@ abstract class CheckpointAction[K: Class
    * Action operator -- does not necessary means Spark action; but does mean running BLAS
optimizer
    * and writing down Spark graph lineage since last checkpointed DRM.
    */
-  def checkpoint(sLevel: StorageLevel): CheckpointedDrm[K] = cp.getOrElse({
+  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = cp.getOrElse({
     // Non-zero count is sparsely supported by logical operators now. So assume we have no
knowledge
     // if it is unsupported, instead of failing.
     val plan = optimize(this)
@@ -52,7 +52,7 @@ abstract class CheckpointAction[K: Class
       rdd = rdd,
       _nrow = nrow,
       _ncol = ncol,
-      _cacheStorageLevel = sLevel,
+      _cacheStorageLevel = cacheHint2Spark(cacheHint),
       partitioningTag = plan.partitioningTag
     )
     cp = Some(newcp)
@@ -66,6 +66,20 @@ object CheckpointAction {
   /** Perform expression optimization. Return physical plan that we can pass to exec() */
   def optimize[K: ClassTag](action: DrmLike[K]): DrmLike[K] = pass3(pass2(pass1(action)))
 
+  private def cacheHint2Spark(cacheHint: CacheHint.CacheHint): StorageLevel = cacheHint match
{
+    case CacheHint.NONE => StorageLevel.NONE
+    case CacheHint.DISK_ONLY => StorageLevel.DISK_ONLY
+    case CacheHint.DISK_ONLY_2 => StorageLevel.DISK_ONLY_2
+    case CacheHint.MEMORY_ONLY => StorageLevel.MEMORY_ONLY
+    case CacheHint.MEMORY_ONLY_2 => StorageLevel.MEMORY_ONLY_2
+    case CacheHint.MEMORY_ONLY_SER => StorageLevel.MEMORY_ONLY_SER
+    case CacheHint.MEMORY_ONLY_SER_2 => StorageLevel.MEMORY_ONLY_SER_2
+    case CacheHint.MEMORY_AND_DISK => StorageLevel.MEMORY_AND_DISK
+    case CacheHint.MEMORY_AND_DISK_2 => StorageLevel.MEMORY_AND_DISK_2
+    case CacheHint.MEMORY_AND_DISK_SER => StorageLevel.MEMORY_AND_DISK_SER
+    case CacheHint.MEMORY_AND_DISK_SER_2 => StorageLevel.MEMORY_AND_DISK_SER_2
+  }
+
 
   /** This is mostly multiplication operations rewrites */
   private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {

Modified: mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala?rev=1592933&r1=1592932&r2=1592933&view=diff
==============================================================================
--- mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
(original)
+++ mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
Wed May  7 01:28:34 2014
@@ -23,9 +23,7 @@ import org.apache.mahout.math.scalabindi
 import RLikeOps._
 import org.apache.mahout.sparkbindings.drm._
 import RLikeDrmOps._
-import scala.util.Random
 import org.apache.mahout.math.{Matrices, SparseRowMatrix}
-import org.apache.spark.storage.StorageLevel
 import org.apache.mahout.common.RandomUtils
 
 class MathSuite extends FunSuite with Matchers with MahoutLocalContext {
@@ -157,7 +155,7 @@ class MathSuite extends FunSuite with Ma
     // Un-normalized pca data:
     drmPCA = drmPCA %*% diagv(s)
 
-    val pca = drmPCA.checkpoint(sLevel = StorageLevel.NONE).collect
+    val pca = drmPCA.checkpoint(CacheHint.NONE).collect
 
     // Of course, once we calculated the pca, the spectrum is going to be different since
our originally
     // generated input was not centered. So here, we'd just brute-solve pca to verify

Modified: mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala?rev=1592933&r1=1592932&r2=1592933&view=diff
==============================================================================
--- mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
(original)
+++ mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
Wed May  7 01:28:34 2014
@@ -23,11 +23,11 @@ import org.apache.mahout.math.scalabindi
 import org.apache.mahout.sparkbindings.drm._
 import RLikeDrmOps._
 import org.apache.mahout.sparkbindings.drm.plan.{OpAtx, OpAtB, OpAtA, CheckpointAction}
-import org.apache.spark.storage.StorageLevel
 import org.apache.spark.SparkContext
 import scala.collection.mutable.ArrayBuffer
 import org.apache.mahout.math.Matrices
 import org.apache.mahout.sparkbindings.blas
+import org.apache.spark.storage.StorageLevel
 
 /** R-like DRM DSL operation tests */
 class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
@@ -99,7 +99,7 @@ class RLikeDrmOpsSuite extends FunSuite 
         keys -> block
     }
 
-    val inCoreC = C checkpoint StorageLevel.NONE collect;
+    val inCoreC = C checkpoint CacheHint.NONE collect;
     println(inCoreC)
 
     (inCoreC - inCoreCControl).norm should be < 1E-10
@@ -355,8 +355,8 @@ class RLikeDrmOpsSuite extends FunSuite 
     val inCoreB = dense((3, 5), (4, 6))
 
     val B = drmParallelize(inCoreB, numPartitions = 2)
-//    val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(StorageLevel.MEMORY_ONLY_SER)
-    val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(StorageLevel.MEMORY_ONLY)
+//    val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY_SER)
+    val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY)
 
     val C = A + B
     val inCoreC = C.collect



Mime
View raw message