Return-Path: X-Original-To: apmail-mahout-commits-archive@www.apache.org Delivered-To: apmail-mahout-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3611B100F4 for ; Fri, 13 Jun 2014 00:45:25 +0000 (UTC) Received: (qmail 48025 invoked by uid 500); 13 Jun 2014 00:45:24 -0000 Delivered-To: apmail-mahout-commits-archive@mahout.apache.org Received: (qmail 47924 invoked by uid 500); 13 Jun 2014 00:45:24 -0000 Mailing-List: contact commits-help@mahout.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mahout.apache.org Delivered-To: mailing list commits@mahout.apache.org Received: (qmail 47854 invoked by uid 99); 13 Jun 2014 00:45:24 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Jun 2014 00:45:24 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1863F9320EF; Fri, 13 Jun 2014 00:45:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pat@apache.org To: commits@mahout.apache.org Date: Fri, 13 Jun 2014 00:45:28 -0000 Message-Id: <54dd206452f04585851810047f2c917e@git.apache.org> In-Reply-To: <1fe06f1cc18048f0a5fc9fda406c2956@git.apache.org> References: <1fe06f1cc18048f0a5fc9fda406c2956@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/6] git commit: MAHOUT-1464 Cooccurrence Analysis on Spark (pat) closes apache/mahout#12 MAHOUT-1464 Cooccurrence Analysis on Spark (pat) closes apache/mahout#12 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/c1ca3087 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/c1ca3087 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/c1ca3087 Branch: refs/heads/master Commit: c1ca30872c622e513e49fc1bb111bc4b8a527d3b Parents: b77caec Author: pferrel Authored: Thu Jun 12 17:45:09 2014 -0700 Committer: pferrel Committed: Thu Jun 12 17:45:09 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../mahout/math/drm/CheckpointedOps.scala | 3 + .../mahout/math/drm/DistributedEngine.scala | 3 + .../mahout/math/scalabindings/MatrixOps.scala | 6 +- .../math/scalabindings/MatrixOpsSuite.scala | 3 +- .../java/org/apache/mahout/math/MurmurHash.java | 13 +- .../apache/mahout/cf/CooccurrenceAnalysis.scala | 213 +++++++++++++++++++ .../mahout/sparkbindings/SparkEngine.scala | 35 ++- .../apache/mahout/sparkbindings/package.scala | 4 +- .../mahout/cf/CooccurrenceAnalysisSuite.scala | 195 +++++++++++++++++ .../sparkbindings/drm/RLikeDrmOpsSuite.scala | 11 + 11 files changed, 477 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 7111122..1d5bd3d 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,8 @@ Mahout Change Log Release 1.0 - unreleased + MAHOUT-1464: Cooccurrence Analysis on Spark (pat) + MAHOUT-1578: Optimizations in matrix serialization (ssc) MAHOUT-1572: blockify() to detect (naively) the data sparsity in the loaded data (dlyubimov) http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala index edd0cfc..8c3911f 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala @@ -32,6 +32,9 @@ class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) { /** Column sums. At this point this runs on checkpoint and collects in-core vector. */ def colSums(): Vector = drm.context.colSums(drm) + /** Column clounts. Counts the non-zero values. At this point this runs on checkpoint and collects in-core vector. */ + def numNonZeroElementsPerColumn(): Vector = drm.context.numNonZeroElementsPerColumn(drm) + /** Column Means */ def colMeans(): Vector = drm.context.colMeans(drm) http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala index 5ffee9d..f136981 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala @@ -46,6 +46,9 @@ trait DistributedEngine { /** Engine-specific colSums implementation based on a checkpoint. */ def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector + /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */ + def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector + /** Engine-specific colMeans implementation based on a checkpoint. */ def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala index 48c2048..149feca 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala @@ -176,7 +176,7 @@ class MatrixOps(val m: Matrix) { def rowMeans() = if (m.ncol == 0) rowSums() else rowSums() /= m.ncol - + def numNonZeroElementsPerColumn() = m.aggregateColumns(vectorCountFunc) } object MatrixOps { @@ -188,4 +188,8 @@ object MatrixOps { def apply(f: Vector): Double = f.sum } + private def vectorCountFunc = new VectorFunction { + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0)) + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala index e57c75c..d59d3a5 100644 --- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala +++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala @@ -109,7 +109,7 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite { println(a.toString) } - test("colSums, rowSums, colMeans, rowMeans") { + test("colSums, rowSums, colMeans, rowMeans, numNonZeroElementsPerColumn") { val a = dense( (2, 3, 4), (3, 4, 5) @@ -119,6 +119,7 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite { a.rowSums() should equal(dvec(9, 12)) a.colMeans() should equal(dvec(2.5, 3.5, 4.5)) a.rowMeans() should equal(dvec(3, 4)) + a.numNonZeroElementsPerColumn() should equal(dvec(2,2,2)) } http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/math/src/main/java/org/apache/mahout/math/MurmurHash.java ---------------------------------------------------------------------- diff --git a/math/src/main/java/org/apache/mahout/math/MurmurHash.java b/math/src/main/java/org/apache/mahout/math/MurmurHash.java index 0b3fab0..32dfdd6 100644 --- a/math/src/main/java/org/apache/mahout/math/MurmurHash.java +++ b/math/src/main/java/org/apache/mahout/math/MurmurHash.java @@ -17,6 +17,8 @@ package org.apache.mahout.math; +import com.google.common.primitives.Ints; + import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -29,7 +31,16 @@ import java.nio.ByteOrder; */ public final class MurmurHash { - private MurmurHash() { + private MurmurHash() {} + + /** + * Hashes an int. + * @param data The int to hash. + * @param seed The seed for the hash. + * @return The 32 bit hash of the bytes in question. + */ + public static int hash(int data, int seed) { + return hash(ByteBuffer.wrap(Ints.toByteArray(data)), seed); } /** http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala new file mode 100644 index 0000000..ee44f90 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf + +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import org.apache.mahout.sparkbindings._ +import scala.collection.JavaConversions._ +import org.apache.mahout.math.stats.LogLikelihood +import collection._ +import org.apache.mahout.common.RandomUtils +import org.apache.mahout.math.function.{VectorFunction, Functions} + + +/** + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", + * available at http://www.mapr.com/practical-machine-learning + * + * see also "Sebastian Schelter, Christoph Boden, Volker Markl: + * Scalable Similarity-Based Neighborhood Methods with MapReduce + * ACM Conference on Recommender Systems 2012" + */ +object CooccurrenceAnalysis extends Serializable { + + /** Compares (Int,Double) pairs by the second value */ + private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} + + def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, + maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = { + + implicit val distributedContext = drmARaw.context + + // Apply selective downsampling, pin resulting matrix + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) + + // num users, which equals the maximum number of interactions per item + val numUsers = drmA.nrow.toInt + + // Compute & broadcast the number of interactions per thing in A + val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn) + + // Compute co-occurrence matrix A'A + val drmAtA = drmA.t %*% drmA + + // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix + val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA, + bcastInteractionsPerItemA, crossCooccurrence = false) + + var indicatorMatrices = List(drmIndicatorsAtA) + + // Now look at cross-co-occurrences + for (drmBRaw <- drmBs) { + // Down-sample and pin other interaction matrix + val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() + + // Compute & broadcast the number of interactions per thing in B + val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn) + + // Compute cross-co-occurrence matrix B'A + val drmBtA = drmB.t %*% drmA + + val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, maxInterestingItemsPerThing, + bcastInteractionsPerThingB, bcastInteractionsPerItemA) + + indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA + + drmB.uncache() + } + + // Unpin downsampled interaction matrix + drmA.uncache() + + // Return list of indicator matrices + indicatorMatrices + } + + /** + * Compute loglikelihood ratio + * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details + **/ + def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, + numInteractionsWithAandB: Long, numInteractions: Long) = { + + val k11 = numInteractionsWithAandB + val k12 = numInteractionsWithA - numInteractionsWithAandB + val k21 = numInteractionsWithB - numInteractionsWithAandB + val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB + + LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) + } + + def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int, + bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector], + crossCooccurrence: Boolean = true) = { + drmBtA.mapBlock() { + case (keys, block) => + + val llrBlock = block.like() + val numInteractionsB: Vector = bcastNumInteractionsB + val numInteractionsA: Vector = bcastNumInteractionsA + + for (index <- 0 until keys.size) { + + val thingB = keys(index) + + // PriorityQueue to select the top-k items + val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore) + + block(index, ::).nonZeroes().foreach { elem => + val thingA = elem.index + val cooccurrences = elem.get + + // exclude co-occurrences of the item with itself + if (crossCooccurrence || thingB != thingA) { + // Compute loglikelihood ratio + val llrRatio = loglikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong, + cooccurrences.toLong, numUsers) + val candidate = thingA -> llrRatio + + // Enqueue item with score, if belonging to the top-k + if (topItemsPerThing.size < maxInterestingItemsPerThing) { + topItemsPerThing.enqueue(candidate) + } else if (orderByScore.lt(candidate, topItemsPerThing.head)) { + topItemsPerThing.dequeue() + topItemsPerThing.enqueue(candidate) + } + } + } + + // Add top-k interesting items to the output matrix + topItemsPerThing.dequeueAll.foreach { + case (otherThing, llrScore) => + llrBlock(index, otherThing) = llrScore + } + } + + keys -> llrBlock + } + } + + /** + * Selectively downsample users and things with an anomalous amount of interactions, inspired by + * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java + * + * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not + */ + def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = { + + implicit val distributedContext = drmM.context + + // Pin raw interaction matrix + val drmI = drmM.checkpoint() + + // Broadcast vector containing the number of interactions with each thing + val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn) + + val downSampledDrmI = drmI.mapBlock() { + case (keys, block) => + val numInteractions: Vector = bcastNumInteractions + + // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures + val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed)) + + val downsampledBlock = block.like() + + // Downsample the interaction vector of each user + for (userIndex <- 0 until keys.size) { + + val interactionsOfUser = block(userIndex, ::) + + val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements() + + val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser + + interactionsOfUser.nonZeroes().foreach { elem => + val numInteractionsWithThing = numInteractions(elem.index) + val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing + + if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) { + // We ignore the original interaction value and create a binary 0-1 matrix + // as we only consider whether interactions happened or did not happen + downsampledBlock(userIndex, elem.index) = 1 + } + } + } + + keys -> downsampledBlock + } + + // Unpin raw interaction matrix + drmI.uncache() + + downSampledDrmI + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index 3a03e58..7a1fb2d 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -40,16 +40,37 @@ object SparkEngine extends DistributedEngine { val n = drm.ncol drm.rdd - // Throw away keys - .map(_._2) - // Fold() doesn't work with kryo still. So work around it. - .mapPartitions(iter => { + // Throw away keys + .map(_._2) + // Fold() doesn't work with kryo still. So work around it. + .mapPartitions(iter => { val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) => acc += v) Iterator(acc) }) - // Since we preallocated new accumulator vector per partition, this must not cause any side - // effects now. - .reduce(_ += _) + // Since we preallocated new accumulator vector per partition, this must not cause any side + // effects now. + .reduce(_ += _) + } + + def numNonZeroElementsPerColumn[K:ClassTag](drm: CheckpointedDrm[K]): Vector = { + val n = drm.ncol + + drm.rdd + // Throw away keys + .map(_._2) + // Fold() doesn't work with kryo still. So work around it. + .mapPartitions(iter => { + val acc = ((new DenseVector(n): Vector) /: iter){(acc, v) => + v.nonZeroes().foreach { elem => + if (elem.get() > 0) acc(elem.index) += 1 + } + acc + } + Iterator(acc) + }) + // Since we preallocated new accumulator vector per partition, this must not cause any side + // effects now. + .reduce(_ += _) } /** Engine-specific colMeans implementation based on a checkpoint. */ http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala index c4ef0d3..e9fd7ac 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala @@ -95,7 +95,9 @@ package object sparkbindings { do { val cp = r.readLine() if (cp == null) - throw new IllegalArgumentException("Unable to read output from \"mahout classpath\"") + throw new IllegalArgumentException( + "Unable to read output from \"mahout -spark classpath\". Is SPARK_HOME defined?" + ) val j = cp.split(File.pathSeparatorChar) if (j.size > 10) { http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala new file mode 100644 index 0000000..3c05a42 --- /dev/null +++ b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf + +import org.scalatest.FunSuite +import org.apache.mahout.test.MahoutSuite +import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math.scalabindings.MatrixOps +import org.apache.mahout.math.drm._ +import org.apache.mahout.math._ +import org.apache.mahout.sparkbindings.test.MahoutLocalContext + +/* values +A = +1 1 0 0 0 +0 0 1 1 0 +0 0 0 0 1 +1 0 0 1 0 + +B = +1 1 1 1 0 +1 1 1 1 0 +0 0 1 0 1 +1 1 0 1 0 + */ + +class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLocalContext { + + test("cooccurrence [A'A], [B'A] boolbean data using LLR") { + val a = dense((1, 1, 0, 0, 0), (0, 0, 1, 1, 0), (0, 0, 0, 0, 1), (1, 0, 0, 1, 0)) + val b = dense((1, 1, 1, 1, 0), (1, 1, 1, 1, 0), (0, 0, 1, 0, 1), (1, 1, 0, 1, 0)) + val drmA = drmParallelize(m = a, numPartitions = 2) + val drmB = drmParallelize(m = b, numPartitions = 2) + + // correct cooccurrence with LLR + val matrixLLRCoocAtAControl = dense( + (0.0, 1.7260924347106847, 0, 0, 0), + (1.7260924347106847, 0, 0, 0, 0), + (0, 0, 0, 1.7260924347106847, 0), + (0, 0, 1.7260924347106847, 0, 0), + (0, 0, 0, 0, 0) + ) + + // correct cross-cooccurrence with LLR + val matrixLLRCoocBtAControl = dense( + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0), + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0), + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897), + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0), + (0, 0, 0, 0, 4.498681156950466) + ) + + //self similarity + val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB)) + val matrixSelfCooc = drmCooc(0).checkpoint().collect + val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl) + var n = (new MatrixOps(m = diffMatrix)).norm + n should be < 1E-10 + + //cross similarity + val matrixCrossCooc = drmCooc(1).checkpoint().collect + val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl) + n = (new MatrixOps(m = diff2Matrix)).norm + n should be < 1E-10 + } + + test("cooccurrence [A'A], [B'A] double data using LLR") { + val a = dense((100000.0D, 1.0D, 0.0D, 0.0D, 0.0D), (0.0D, 0.0D, 10.0D, 1.0D, 0.0D), (0.0D, 0.0D, 0.0D, 0.0D, 1000.0D), (1.0D, 0.0D, 0.0D, 10.0D, 0.0D)) + val b = dense((10000.0D, 100.0D, 1000.0D, 1.0D, 0.0D), (10.0D, 1.0D, 10000000.0D, 10.0D, 0.0D), (0.0D, 0.0D, 1000.0D, 0.0D, 100.0D), (100.0D, 1.0D, 0.0D, 100000.0D, 0.0D)) + val drmA = drmParallelize(m = a, numPartitions = 2) + val drmB = drmParallelize(m = b, numPartitions = 2) + + // correct cooccurrence with LLR + val matrixLLRCoocAtAControl = dense( + (0.0, 1.7260924347106847, 0, 0, 0), + (1.7260924347106847, 0, 0, 0, 0), + (0, 0, 0, 1.7260924347106847, 0), + (0, 0, 1.7260924347106847, 0, 0), + (0, 0, 0, 0, 0) + ) + + // correct cross-cooccurrence with LLR + val matrixLLRCoocBtAControl = dense( + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0), + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0), + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897), + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0), + (0, 0, 0, 0, 4.498681156950466) + ) + + //self similarity + val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB)) + val matrixSelfCooc = drmCooc(0).checkpoint().collect + val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl) + var n = (new MatrixOps(m = diffMatrix)).norm + n should be < 1E-10 + + //cross similarity + val matrixCrossCooc = drmCooc(1).checkpoint().collect + val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl) + n = (new MatrixOps(m = diff2Matrix)).norm + n should be < 1E-10 + } + + test("cooccurrence [A'A], [B'A] integer data using LLR") { + val a = dense((1000, 10, 0, 0, 0), (0, 0, 10000, 10, 0), (0, 0, 0, 0, 100), (10000, 0, 0, 1000, 0)) + val b = dense((100, 1000, 10000, 10000, 0), (10000, 1000, 100, 10, 0), (0, 0, 10, 0, 100), (10, 100, 0, 1000, 0)) + val drmA = drmParallelize(m = a, numPartitions = 2) + val drmB = drmParallelize(m = b, numPartitions = 2) + + // correct cooccurrence with LLR + val matrixLLRCoocAtAControl = dense( + (0.0, 1.7260924347106847, 0, 0, 0), + (1.7260924347106847, 0, 0, 0, 0), + (0, 0, 0, 1.7260924347106847, 0), + (0, 0, 1.7260924347106847, 0, 0), + (0, 0, 0, 0, 0) + ) + + // correct cross-cooccurrence with LLR + val matrixLLRCoocBtAControl = dense( + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0), + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0), + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897), + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0), + (0, 0, 0, 0, 4.498681156950466) + ) + + //self similarity + val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB)) + //var cp = drmSelfCooc(0).checkpoint() + //cp.writeDRM("/tmp/cooc-spark/")//to get values written + val matrixSelfCooc = drmCooc(0).checkpoint().collect + val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl) + var n = (new MatrixOps(m = diffMatrix)).norm + n should be < 1E-10 + + //cross similarity + val matrixCrossCooc = drmCooc(1).checkpoint().collect + val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl) + n = (new MatrixOps(m = diff2Matrix)).norm + n should be < 1E-10 + } + + test("LLR calc") { + val numInteractionsWithAandB = 10L + val numInteractionsWithA = 100L + val numInteractionsWithB = 200L + val numInteractions = 10000l + + val llr = CooccurrenceAnalysis.loglikelihoodRatio(numInteractionsWithA, numInteractionsWithB, numInteractionsWithAandB, numInteractions) + + assert(llr == 17.19462327013025) + } + + test("downsampling by number per row") { + val a = dense((1, 1, 1, 1, 0), + (1, 1, 1, 1, 1), + (0, 0, 0, 0, 1), + (1, 1, 0, 1, 0) + ) + val drmA: DrmLike[Int] = drmParallelize(m = a, numPartitions = 2) + + val downSampledDrm = CooccurrenceAnalysis.sampleDownAndBinarize(drmA, 0xdeadbeef, 4) + //count non-zero values, should be == 7 + var numValues = 0 + val m = downSampledDrm.collect + val it = m.iterator() + while (it.hasNext) { + val v = it.next().vector() + val nonZeroIt = v.nonZeroes().iterator() + while (nonZeroIt.hasNext) { + numValues += 1 + nonZeroIt.next() + } + } + + assert(numValues == 8) //Don't change the random seed or this may fail. + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala index 6152426..30a602b 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala @@ -463,4 +463,15 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext { drmA.colMeans() should equal (inCoreA.colMeans()) } + test("numNonZeroElementsPerColumn") { + val inCoreA = dense( + (0, 2), + (3, 4), + (0, 30) + ) + val drmA = drmParallelize(inCoreA, numPartitions = 2) + + drmA.numNonZeroElementsPerColumn() should equal (inCoreA.numNonZeroElementsPerColumn()) + } + }