mahout-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitriy Lyubimov <dlie...@gmail.com>
Subject Re: 0xdata interested in contributing
Date Fri, 14 Mar 2014 17:08:20 GMT
On Fri, Mar 14, 2014 at 9:39 AM, Pat Ferrel <pat@occamsmachete.com> wrote:

> Love the architectural discussion but sometimes the real answers can be
> hidden by minutiae.
>
> Dimitriy is there enough running on Spark to compare to a DRM
> implementation on H2O? 0xdata, go ahead and implement DRM on H2O. If "the
> proof is in the pudding" why not compare?.
>

https://issues.apache.org/jira/browse/MAHOUT-1346 is in the trunk. You can
check details in the attchment. There's environment with SSVD and PCA
chosen as a guinea pig for validation. You are more than welcome to
benchmark. So yes, to date Spark is far more "in the pudding" of Mahout
than anything else, except MR.

I already placed links in previous discussion, but I will even cut it and
paste it here for you sake.

  /**
   * Distributed Stochastic Singular Value decomposition algorithm.
   *
   * @param A input matrix A
   * @param k request SSVD rank
   * @param p oversampling parameter
   * @param q number of power iterations
   * @return (U,V,s). Note that U, V are non-checkpointed matrices
(i.e. one needs to actually use them
   *         e.g. save them to hdfs in order to trigger their computation.
   */
  def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
  (DrmLike[K], DrmLike[Int], Vector) = {

    val drmA = A.checkpoint()

    val m = drmA.nrow
    val n = drmA.ncol
    assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
    val pfxed = safeToNonNegInt((m min n) - k min p)

    // Actual decomposition rank
    val r = k + pfxed

    // We represent Omega by its seed.
    val omegaSeed = Random.nextInt()

    // Compute Y = A*Omega. Instead of redistributing view, we
redistribute the Omega seed only and
    // instantiate the Omega random matrix view in the backend
instead. That way serialized closure
    // is much more compact.
    var drmY = drmA.mapBlock(ncol = r)({
      case (keys, blockA) =>
        val blockY = blockA %*%
Matrices.symmetricUniformView(blockA.ncol, r, omegaSeed)
        keys -> blockY
    })

    var drmQ = dqrThin(drmY.checkpoint())._1
    // Checkpoint Q if last iteration
    if (q==0) drmQ = drmQ.checkpoint()

    // This actually should be optimized as identically partitioned
map-side A'B since A and Q should
    // still be identically partitioned.
    var drmBt = drmA.t %*% drmQ
    // Checkpoint B' if last iteration
    if (q==0) drmBt = drmBt.checkpoint()

    for (i <- 1 to q) {
      drmY = drmA %*% drmBt
      drmQ = dqrThin(drmY.checkpoint())._1
      // Checkpoint Q if last iteration
      if ( i == q) drmQ = drmQ.checkpoint()

      // This on the other hand should be inner-join-and-map A'B
optimization since A and Q_i are not
      // identically partitioned anymore.
      drmBt = drmA.t %*% drmQ
      // Checkpoint B' if last iteration
      if ( i == q) drmBt = drmBt.checkpoint()
    }

    val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(StorageLevel.NONE).collect
    val (inCoreUHat, d) = eigen(inCoreBBt)
    val s = d.sqrt

    // Since neither drmU nor drmV are actually computed until
actually used, we don't need the flags
    // instructing compute (or not compute) either of the U,V outputs
anymore. Neat, isn't it?
    val drmU = drmQ %*% inCoreUHat
    val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))

    (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
  }


 40 lines of actionable code, give a take. Give me 24 hours, you'll be
drowning in various flavors of custering running on Spark. But that's not
the point.  If you are still sure you are not missing the point, you can
read my post again.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message