On Fri, Mar 14, 2014 at 9:39 AM, Pat Ferrel <pat@occamsmachete.com> wrote:
> Love the architectural discussion but sometimes the real answers can be
> hidden by minutiae.
>
> Dimitriy is there enough running on Spark to compare to a DRM
> implementation on H2O? 0xdata, go ahead and implement DRM on H2O. If "the
> proof is in the pudding" why not compare?.
>
https://issues.apache.org/jira/browse/MAHOUT1346 is in the trunk. You can
check details in the attchment. There's environment with SSVD and PCA
chosen as a guinea pig for validation. You are more than welcome to
benchmark. So yes, to date Spark is far more "in the pudding" of Mahout
than anything else, except MR.
I already placed links in previous discussion, but I will even cut it and
paste it here for you sake.
/**
* Distributed Stochastic Singular Value decomposition algorithm.
*
* @param A input matrix A
* @param k request SSVD rank
* @param p oversampling parameter
* @param q number of power iterations
* @return (U,V,s). Note that U, V are noncheckpointed matrices
(i.e. one needs to actually use them
* e.g. save them to hdfs in order to trigger their computation.
*/
def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
(DrmLike[K], DrmLike[Int], Vector) = {
val drmA = A.checkpoint()
val m = drmA.nrow
val n = drmA.ncol
assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
val pfxed = safeToNonNegInt((m min n)  k min p)
// Actual decomposition rank
val r = k + pfxed
// We represent Omega by its seed.
val omegaSeed = Random.nextInt()
// Compute Y = A*Omega. Instead of redistributing view, we
redistribute the Omega seed only and
// instantiate the Omega random matrix view in the backend
instead. That way serialized closure
// is much more compact.
var drmY = drmA.mapBlock(ncol = r)({
case (keys, blockA) =>
val blockY = blockA %*%
Matrices.symmetricUniformView(blockA.ncol, r, omegaSeed)
keys > blockY
})
var drmQ = dqrThin(drmY.checkpoint())._1
// Checkpoint Q if last iteration
if (q==0) drmQ = drmQ.checkpoint()
// This actually should be optimized as identically partitioned
mapside A'B since A and Q should
// still be identically partitioned.
var drmBt = drmA.t %*% drmQ
// Checkpoint B' if last iteration
if (q==0) drmBt = drmBt.checkpoint()
for (i < 1 to q) {
drmY = drmA %*% drmBt
drmQ = dqrThin(drmY.checkpoint())._1
// Checkpoint Q if last iteration
if ( i == q) drmQ = drmQ.checkpoint()
// This on the other hand should be innerjoinandmap A'B
optimization since A and Q_i are not
// identically partitioned anymore.
drmBt = drmA.t %*% drmQ
// Checkpoint B' if last iteration
if ( i == q) drmBt = drmBt.checkpoint()
}
val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(StorageLevel.NONE).collect
val (inCoreUHat, d) = eigen(inCoreBBt)
val s = d.sqrt
// Since neither drmU nor drmV are actually computed until
actually used, we don't need the flags
// instructing compute (or not compute) either of the U,V outputs
anymore. Neat, isn't it?
val drmU = drmQ %*% inCoreUHat
val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
(drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
}
40 lines of actionable code, give a take. Give me 24 hours, you'll be
drowning in various flavors of custering running on Spark. But that's not
the point. If you are still sure you are not missing the point, you can
read my post again.
