mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlyubi...@apache.org
Subject svn commit: r1579565 - in /mahout/trunk/spark/src: main/scala/org/apache/mahout/sparkbindings/ main/scala/org/apache/mahout/sparkbindings/blas/ main/scala/org/apache/mahout/sparkbindings/decompositions/ main/scala/org/apache/mahout/sparkbindings/drm/de...
Date Thu, 20 Mar 2014 08:53:50 GMT
Author: dlyubimov
Date: Thu Mar 20 08:53:50 2014
New Revision: 1579565

URL: http://svn.apache.org/r1579565
Log:
MAHOUT-1464 'Fat' nongraph physical A'A.
Refactored decompositions package out of drm package.
Added some kryo and akka related properties to the test setup.

Added:
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala
      - copied, changed from r1578677, mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DQR.scala
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
      - copied, changed from r1578677, mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DSPCA.scala
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
      - copied, changed from r1578677, mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DSSVD.scala
    mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/
    mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
      - copied, changed from r1578677, mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/decompositions/MathSuite.scala
Removed:
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DQR.scala
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DSPCA.scala
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DSSVD.scala
    mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/decompositions/MathSuite.scala
Modified:
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/io/WritableKryoSerializer.scala
    mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
    mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
    mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala

Modified: mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala?rev=1579565&r1=1579564&r2=1579565&view=diff
==============================================================================
--- mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala (original)
+++ mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala Thu Mar
20 08:53:50 2014
@@ -24,19 +24,23 @@ import RLikeOps._
 import collection._
 import JavaConversions._
 import org.apache.mahout.sparkbindings.drm.plan.OpAtA
-import org.apache.hadoop.io.Writable
-import scala.reflect.ClassTag
+import org.apache.spark.SparkContext._
+import org.apache.log4j.Logger
 
 /**
  * Collection of algorithms to compute X' times X
  */
 object AtA {
 
-  val maxInMemNCol = System.getProperty("mahout.math.AtA.maxInMemNCol", "2000").toInt
-  maxInMemNCol.ensuring(_ > 0, "Invalid A'A in-memory setting for optimizer")
+  final val s_log = Logger.getLogger(AtA.getClass)
+
+  final val PROPERTY_ATA_MAXINMEMNCOL = "mahout.math.AtA.maxInMemNCol"
 
   /** Materialize A'A operator */
-  def at_a(operator: OpAtA[_], srcRdd: DrmRddInput[_] ): DrmRddInput[Int] = {
+  def at_a(operator: OpAtA[_], srcRdd: DrmRddInput[_]): DrmRddInput[Int] = {
+
+    val maxInMemNCol = System.getProperty(PROPERTY_ATA_MAXINMEMNCOL, "2000").toInt
+    maxInMemNCol.ensuring(_ > 0, "Invalid A'A in-memory setting for optimizer")
 
     if (operator.ncol <= maxInMemNCol) {
       // If we can comfortably fit upper-triangular operator into a map memory, we will run
slim
@@ -46,7 +50,7 @@ object AtA {
       new DrmRddInput(rowWiseSrc = Some(inCoreA.ncol, drmRdd))
     } else {
       // Otherwise, we need to run a distributed, big version
-      new DrmRddInput(rowWiseSrc=Some(operator.ncol, at_a_nongraph(srcRdd = srcRdd, operator
= operator)))
+      new DrmRddInput(rowWiseSrc = Some(operator.ncol, at_a_nongraph(srcRdd = srcRdd, op
= operator)))
 
     }
   }
@@ -56,7 +60,9 @@ object AtA {
    * Computes A' * A for tall but skinny A matrices. Comes up a lot in SSVD and ALS flavors
alike.
    * @return
    */
-  def at_a_slim(operator: OpAtA[_], srcRdd: DrmRdd[_] ): Matrix = {
+  def at_a_slim(operator: OpAtA[_], srcRdd: DrmRdd[_]): Matrix = {
+
+    s_log.debug("Applying slim A'A.")
 
     val ncol = operator.ncol
     // Compute backing vector of tiny-upper-triangular accumulator accross all the data.
@@ -65,45 +71,46 @@ object AtA {
       val ut = new UpperTriangular(ncol)
 
       // Strategy is to add to an outer product of each row to the upper triangular accumulator.
-      pIter.foreach({case(k,v) =>
+      pIter.foreach({
+        case (k, v) =>
 
-        // Use slightly various traversal strategies over dense vs. sparse source.
-        if (v.isDense) {
+          // Use slightly various traversal strategies over dense vs. sparse source.
+          if (v.isDense) {
 
-          // Update upper-triangular pattern only (due to symmetry).
-          // Note: Scala for-comprehensions are said to be fairly inefficient this way, but
this is
-          // such spectacular case they were deesigned for.. Yes I do observe some 20% difference
-          // compared to while loops with no other payload, but the other payload is usually
much
-          // heavier than this overhead, so... I am keeping this as is for the time being.
+            // Update upper-triangular pattern only (due to symmetry).
+            // Note: Scala for-comprehensions are said to be fairly inefficient this way,
but this is
+            // such spectacular case they were deesigned for.. Yes I do observe some 20%
difference
+            // compared to while loops with no other payload, but the other payload is usually
much
+            // heavier than this overhead, so... I am keeping this as is for the time being.
 
-          for (row <- 0 until v.length; col <- row until v.length)
-            ut(row, col) = ut(row, col) + v(row) * v(col)
+            for (row <- 0 until v.length; col <- row until v.length)
+              ut(row, col) = ut(row, col) + v(row) * v(col)
 
-        } else {
+          } else {
 
-          // Sparse source.
-          v.nonZeroes().view
+            // Sparse source.
+            v.nonZeroes().view
 
-              // Outer iterator iterates over rows of outer product.
-              .foreach(elrow => {
+                // Outer iterator iterates over rows of outer product.
+                .foreach(elrow => {
 
-            // Inner loop for columns of outer product.
-            v.nonZeroes().view
+              // Inner loop for columns of outer product.
+              v.nonZeroes().view
 
-                // Filter out non-upper nonzero elements from the double loop.
-                .filter(_.index >= elrow.index)
+                  // Filter out non-upper nonzero elements from the double loop.
+                  .filter(_.index >= elrow.index)
 
-                // Incrementally update outer product value in the uppper triangular accumulator.
-                .foreach(elcol => {
+                  // Incrementally update outer product value in the uppper triangular accumulator.
+                  .foreach(elcol => {
 
-              val row = elrow.index
-              val col = elcol.index
-              ut(row, col) = ut(row, col) + elrow.get() * elcol.get()
+                val row = elrow.index
+                val col = elcol.index
+                ut(row, col) = ut(row, col) + elrow.get() * elcol.get()
 
+              })
             })
-          })
 
-        }
+          }
       })
 
       Iterator(dvec(ddata = ut.getData): Vector)
@@ -116,7 +123,45 @@ object AtA {
   }
 
   /** The version of A'A that does not use GraphX */
-  def at_a_nongraph(operator: OpAtA[_], srcRdd: DrmRdd[_] ): DrmRdd[Int] =
-    throw new UnsupportedOperationException
+  def at_a_nongraph(op: OpAtA[_], srcRdd: DrmRdd[_]): DrmRdd[Int] = {
+
+    s_log.debug("Applying non-slim non-graph A'A.")
+
+    // Determine how many partitions the new matrix would need approximately. We base that
on 
+    // geometry only, but it may eventually not be that adequate. Indeed, A'A tends to be
much more
+    // dense in reality than the source.
+
+    val m = op.A.nrow
+    val n = op.A.ncol
+    val numParts = (srcRdd.partitions.size.toDouble * n / m).ceil.round.toInt max 1
+    val blockHeight = (n - 1) / numParts + 1
+
+    val rddAtA = srcRdd
+
+        // Remove key, key is irrelevant
+        .map(_._2)
+
+        // Form partial outer blocks for each partition
+        .flatMap {
+      v =>
+        for (blockKey <- Stream.range(0, numParts)) yield {
+          val blockStart = blockKey * blockHeight
+          val blockEnd = n min (blockStart + blockHeight)
+          blockKey -> (v(blockStart until blockEnd) cross v)
+        }
+    }
+        // Combine outer blocks
+        .reduceByKey(_ += _)
+
+        // Restore proper block keys
+        .map {
+      case (blockKey, block) =>
+        val blockStart = blockKey * blockHeight
+        val rowKeys = Array.tabulate(block.nrow)(blockStart + _)
+        rowKeys -> block
+    }
+
+    new DrmRddInput[Int](blockifiedSrc = Some(rddAtA))
+  }
 
 }

Modified: mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala?rev=1579565&r1=1579564&r2=1579565&view=diff
==============================================================================
--- mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala (original)
+++ mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala Thu Mar
20 08:53:50 2014
@@ -77,6 +77,7 @@ object AtB {
   /** Given already zipped, joined rdd of rows of A' and B, compute their product A'B */
   private[sparkbindings] def computeAtBZipped[A: ClassTag](zipped:RDD[(DrmTuple[A], DrmTuple[A])],
       nrow:Long, ancol:Int, bncol:Int, blockHeight: Int) = {
+
     // Since Q and A are partitioned same way,we can just zip their rows and proceed from
there by
     // forming outer products. Our optimizer lacks this primitive, so we will implement it
using RDDs
     // directly. We try to compile B' = A'Q now by collecting outer products of rows of A
and Q. At
@@ -85,32 +86,30 @@ object AtB {
     val btNumParts = safeToNonNegInt((nrow - 1) / blockHeight + 1)
 
     val rddBt = zipped
-        
 
         // Produce outer product blocks
-        .flatMap({
+        .flatMap {
       case ((aKey, aRow), (qKey, qRow)) =>
-        for (blockKey <- 0 until btNumParts) yield {
+        for (blockKey <- Stream.range(0, btNumParts)) yield {
           val blockStart = blockKey * blockHeight
           val blockEnd = ancol min (blockStart + blockHeight)
 
           // Create block by cross product of proper slice of aRow and qRow
           blockKey -> (aRow(blockStart until blockEnd) cross qRow)
         }
-    })
-
+    }
         // Combine blocks by just summing them up
-        .reduceByKey({
+        .reduceByKey {
       case (block1, block2) => block1 += block2
-    })
+    }
 
         // Throw away block key, generate row keys instead.
-        .map({
+        .map {
       case (blockKey, block) =>
         val blockStart = blockKey * blockHeight
         val rowKeys = Array.tabulate(block.nrow)(blockStart + _)
         rowKeys -> block
-    })
+    }
 
     new DrmRddInput[Int](blockifiedSrc = Some(rddBt))
   }

Copied: mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala
(from r1578677, mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DQR.scala)
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala?p2=mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala&p1=mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DQR.scala&r1=1578677&r2=1579565&rev=1579565&view=diff
==============================================================================
    (empty)

Copied: mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
(from r1578677, mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DSPCA.scala)
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala?p2=mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala&p1=mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DSPCA.scala&r1=1578677&r2=1579565&rev=1579565&view=diff
==============================================================================
    (empty)

Copied: mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
(from r1578677, mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DSSVD.scala)
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala?p2=mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala&p1=mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DSSVD.scala&r1=1578677&r2=1579565&rev=1579565&view=diff
==============================================================================
    (empty)

Modified: mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/io/WritableKryoSerializer.scala
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/io/WritableKryoSerializer.scala?rev=1579565&r1=1579564&r2=1579565&view=diff
==============================================================================
--- mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/io/WritableKryoSerializer.scala
(original)
+++ mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/io/WritableKryoSerializer.scala
Thu Mar 20 08:53:50 2014
@@ -20,12 +20,13 @@ package org.apache.mahout.sparkbindings.
 import com.esotericsoftware.kryo.{Kryo, Serializer}
 import com.esotericsoftware.kryo.io.{Input, Output}
 import org.apache.hadoop.io.{DataInputBuffer, DataOutputBuffer, Writable}
+import scala.reflect.ClassTag
 
 /**
  *
  * @author dmitriy
  */
-class WritableKryoSerializer[V <% Writable, W <: Writable <% V : ClassManifest]
extends Serializer[V] {
+class WritableKryoSerializer[V <% Writable, W <: Writable <% V : ClassTag] extends
Serializer[V] {
 
   def write(kryo: Kryo, out: Output, v: V) = {
     val dob = new DataOutputBuffer()
@@ -42,7 +43,7 @@ class WritableKryoSerializer[V <% Writab
     val data = new Array[Byte](len)
     in.read(data)
     dib.reset(data, len)
-    val w: W = classManifest[W].erasure.newInstance().asInstanceOf[W]
+    val w: W = implicitly[ClassTag[W]].runtimeClass.newInstance().asInstanceOf[W]
     w.readFields(dib)
     w
 

Modified: mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala?rev=1579565&r1=1579564&r2=1579565&view=diff
==============================================================================
--- mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala (original)
+++ mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala Thu Mar
20 08:53:50 2014
@@ -47,7 +47,10 @@ package object sparkbindings {
 
     try {
 
-      val conf = if (masterUrl != "local") {
+      val conf = if (!masterUrl.startsWith("local")
+          || System.getProperties.contains("mahout.home")
+          || System.getenv("MAHOUT_HOME") != null
+      ) {
         var mhome = System.getenv("MAHOUT_HOME")
         if (mhome == null) mhome = System.getProperty("mahout.home")
 

Copied: mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
(from r1578677, mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/decompositions/MathSuite.scala)
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala?p2=mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala&p1=mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/decompositions/MathSuite.scala&r1=1578677&r2=1579565&rev=1579565&view=diff
==============================================================================
    (empty)

Modified: mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala?rev=1579565&r1=1579564&r2=1579565&view=diff
==============================================================================
--- mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
(original)
+++ mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
Thu Mar 20 08:53:50 2014
@@ -26,6 +26,8 @@ import org.apache.mahout.sparkbindings.d
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.SparkContext
 import scala.collection.mutable.ArrayBuffer
+import org.apache.mahout.math.Matrices
+import org.apache.mahout.sparkbindings.blas
 
 /** R-like DRM DSL operation tests */
 class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
@@ -252,6 +254,25 @@ class RLikeDrmOpsSuite extends FunSuite 
     (inCoreAtA - inCoreAtAControl).norm should be < 1E-10
   }
 
+  test("C = A.t %*% A fat non-graph") {
+    // Hack the max in-mem size for this test
+    System.setProperty(blas.AtA.PROPERTY_ATA_MAXINMEMNCOL, "540")
+
+    val inCoreA = Matrices.uniformView(400, 550, 1234)
+    val A = drmParallelize(m = inCoreA, numPartitions = 2)
+
+    val AtA = A.t %*% A
+
+    // Assert optimizer detects square
+    CheckpointAction.optimize(action = AtA) should equal(OpAtA(A))
+
+    val inCoreAtA = AtA.collect
+    val inCoreAtAControl = inCoreA.t %*% inCoreA
+
+    (inCoreAtA - inCoreAtAControl).norm should be < 1E-10
+    s_log.debug("test done.")
+  }
+
 
   test("C = A.t %*% A non-int key") {
     val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))

Modified: mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
URL: http://svn.apache.org/viewvc/mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala?rev=1579565&r1=1579564&r2=1579565&view=diff
==============================================================================
--- mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
(original)
+++ mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
Thu Mar 20 08:53:50 2014
@@ -18,7 +18,13 @@ trait MahoutLocalContext extends MahoutS
 
   override protected def beforeEach() {
     super.beforeEach()
-    mahoutCtx = mahoutSparkContext(masterUrl = "local", appName = "MahoutLocalContext", customJars
= buildJars)
+
+    System.setProperty("spark.kryoserializer.buffer.mb","15")
+    System.setProperty("spark.akka.frameSize","30")
+    mahoutCtx = mahoutSparkContext(masterUrl = "local[3]",
+      appName = "MahoutLocalContext",
+      customJars = buildJars
+    )
   }
 
   override protected def afterEach() {



Mime
View raw message