mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From p..@apache.org
Subject [6/6] git commit: MAHOUT-1464 Cooccurrence Analysis on Spark (pat) closes apache/mahout#12
Date Fri, 13 Jun 2014 00:45:28 GMT
MAHOUT-1464 Cooccurrence Analysis on Spark (pat) closes apache/mahout#12


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/c1ca3087
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/c1ca3087
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/c1ca3087

Branch: refs/heads/master
Commit: c1ca30872c622e513e49fc1bb111bc4b8a527d3b
Parents: b77caec
Author: pferrel <pat@occamsmachete.com>
Authored: Thu Jun 12 17:45:09 2014 -0700
Committer: pferrel <pat@occamsmachete.com>
Committed: Thu Jun 12 17:45:09 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../mahout/math/drm/CheckpointedOps.scala       |   3 +
 .../mahout/math/drm/DistributedEngine.scala     |   3 +
 .../mahout/math/scalabindings/MatrixOps.scala   |   6 +-
 .../math/scalabindings/MatrixOpsSuite.scala     |   3 +-
 .../java/org/apache/mahout/math/MurmurHash.java |  13 +-
 .../apache/mahout/cf/CooccurrenceAnalysis.scala | 213 +++++++++++++++++++
 .../mahout/sparkbindings/SparkEngine.scala      |  35 ++-
 .../apache/mahout/sparkbindings/package.scala   |   4 +-
 .../mahout/cf/CooccurrenceAnalysisSuite.scala   | 195 +++++++++++++++++
 .../sparkbindings/drm/RLikeDrmOpsSuite.scala    |  11 +
 11 files changed, 477 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 7111122..1d5bd3d 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 1.0 - unreleased
 
+  MAHOUT-1464: Cooccurrence Analysis on Spark (pat)
+
   MAHOUT-1578: Optimizations in matrix serialization (ssc)
 
   MAHOUT-1572: blockify() to detect (naively) the data sparsity in the loaded data (dlyubimov)

http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
index edd0cfc..8c3911f 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
@@ -32,6 +32,9 @@ class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
   /** Column sums. At this point this runs on checkpoint and collects in-core vector. */
   def colSums(): Vector = drm.context.colSums(drm)
 
+  /** Column clounts. Counts the non-zero values. At this point this runs on checkpoint and
collects in-core vector. */
+  def numNonZeroElementsPerColumn(): Vector = drm.context.numNonZeroElementsPerColumn(drm)
+
   /** Column Means */
   def colMeans(): Vector = drm.context.colMeans(drm)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index 5ffee9d..f136981 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -46,6 +46,9 @@ trait DistributedEngine {
   /** Engine-specific colSums implementation based on a checkpoint. */
   def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector
 
+  /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */
+  def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector
+
   /** Engine-specific colMeans implementation based on a checkpoint. */
   def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
index 48c2048..149feca 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
@@ -176,7 +176,7 @@ class MatrixOps(val m: Matrix) {
 
   def rowMeans() = if (m.ncol == 0) rowSums() else rowSums() /= m.ncol
 
-
+  def numNonZeroElementsPerColumn() = m.aggregateColumns(vectorCountFunc)
 }
 
 object MatrixOps {
@@ -188,4 +188,8 @@ object MatrixOps {
     def apply(f: Vector): Double = f.sum
   }
 
+  private def vectorCountFunc = new VectorFunction {
+    def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0))
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
index e57c75c..d59d3a5 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
@@ -109,7 +109,7 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite {
     println(a.toString)
   }
 
-  test("colSums, rowSums, colMeans, rowMeans") {
+  test("colSums, rowSums, colMeans, rowMeans, numNonZeroElementsPerColumn") {
     val a = dense(
       (2, 3, 4),
       (3, 4, 5)
@@ -119,6 +119,7 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite {
     a.rowSums() should equal(dvec(9, 12))
     a.colMeans() should equal(dvec(2.5, 3.5, 4.5))
     a.rowMeans() should equal(dvec(3, 4))
+    a.numNonZeroElementsPerColumn() should equal(dvec(2,2,2))
 
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/math/src/main/java/org/apache/mahout/math/MurmurHash.java
----------------------------------------------------------------------
diff --git a/math/src/main/java/org/apache/mahout/math/MurmurHash.java b/math/src/main/java/org/apache/mahout/math/MurmurHash.java
index 0b3fab0..32dfdd6 100644
--- a/math/src/main/java/org/apache/mahout/math/MurmurHash.java
+++ b/math/src/main/java/org/apache/mahout/math/MurmurHash.java
@@ -17,6 +17,8 @@
 
 package org.apache.mahout.math;
 
+import com.google.common.primitives.Ints;
+
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
@@ -29,7 +31,16 @@ import java.nio.ByteOrder;
  */
 public final class MurmurHash {
 
-  private MurmurHash() {
+  private MurmurHash() {}
+
+  /**
+   * Hashes an int.
+   * @param data The int to hash.
+   * @param seed The seed for the hash.
+   * @return The 32 bit hash of the bytes in question.
+   */
+  public static int hash(int data, int seed) {
+    return hash(ByteBuffer.wrap(Ints.toByteArray(data)), seed);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
new file mode 100644
index 0000000..ee44f90
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf
+
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import drm._
+import RLikeDrmOps._
+import org.apache.mahout.sparkbindings._
+import scala.collection.JavaConversions._
+import org.apache.mahout.math.stats.LogLikelihood
+import collection._
+import org.apache.mahout.common.RandomUtils
+import org.apache.mahout.math.function.{VectorFunction, Functions}
+
+
+/**
+ * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in
Recommendation",
+ * available at http://www.mapr.com/practical-machine-learning
+ *
+ * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
+ * Scalable Similarity-Based Neighborhood Methods with MapReduce
+ * ACM Conference on Recommender Systems 2012"
+ */
+object CooccurrenceAnalysis extends Serializable {
+
+  /** Compares (Int,Double) pairs by the second value */
+  private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_,
score2)) => score1 > score2}
+
+  def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing:
Int = 50,
+                    maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()):
List[DrmLike[Int]] = {
+
+    implicit val distributedContext = drmARaw.context
+
+    // Apply selective downsampling, pin resulting matrix
+    val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
+
+    // num users, which equals the maximum number of interactions per item
+    val numUsers = drmA.nrow.toInt
+
+    // Compute & broadcast the number of interactions per thing in A
+    val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn)
+
+    // Compute co-occurrence matrix A'A
+    val drmAtA = drmA.t %*% drmA
+
+    // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator
matrix
+    val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing,
bcastInteractionsPerItemA,
+      bcastInteractionsPerItemA, crossCooccurrence = false)
+
+    var indicatorMatrices = List(drmIndicatorsAtA)
+
+    // Now look at cross-co-occurrences
+    for (drmBRaw <- drmBs) {
+      // Down-sample and pin other interaction matrix
+      val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint()
+
+      // Compute & broadcast the number of interactions per thing in B
+      val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn)
+
+      // Compute cross-co-occurrence matrix B'A
+      val drmBtA = drmB.t %*% drmA
+
+      val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, maxInterestingItemsPerThing,
+        bcastInteractionsPerThingB, bcastInteractionsPerItemA)
+
+      indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA
+
+      drmB.uncache()
+    }
+
+    // Unpin downsampled interaction matrix
+    drmA.uncache()
+
+    // Return list of indicator matrices
+    indicatorMatrices
+  }
+
+  /**
+   * Compute loglikelihood ratio
+   * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
+   **/
+  def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
+                         numInteractionsWithAandB: Long, numInteractions: Long) = {
+
+    val k11 = numInteractionsWithAandB
+    val k12 = numInteractionsWithA - numInteractionsWithAandB
+    val k21 = numInteractionsWithB - numInteractionsWithAandB
+    val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
+
+    LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
+  }
+
+  def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing:
Int,
+                        bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector],
+                        crossCooccurrence: Boolean = true) = {
+    drmBtA.mapBlock() {
+      case (keys, block) =>
+
+        val llrBlock = block.like()
+        val numInteractionsB: Vector = bcastNumInteractionsB
+        val numInteractionsA: Vector = bcastNumInteractionsA
+
+        for (index <- 0 until keys.size) {
+
+          val thingB = keys(index)
+
+          // PriorityQueue to select the top-k items
+          val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
+
+          block(index, ::).nonZeroes().foreach { elem =>
+            val thingA = elem.index
+            val cooccurrences = elem.get
+
+            // exclude co-occurrences of the item with itself
+            if (crossCooccurrence || thingB != thingA) {
+              // Compute loglikelihood ratio
+              val llrRatio = loglikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
+                cooccurrences.toLong, numUsers)
+              val candidate = thingA -> llrRatio
+
+              // Enqueue item with score, if belonging to the top-k
+              if (topItemsPerThing.size < maxInterestingItemsPerThing) {
+                topItemsPerThing.enqueue(candidate)
+              } else if (orderByScore.lt(candidate, topItemsPerThing.head)) {
+                topItemsPerThing.dequeue()
+                topItemsPerThing.enqueue(candidate)
+              }
+            }
+          }
+
+          // Add top-k interesting items to the output matrix
+          topItemsPerThing.dequeueAll.foreach {
+            case (otherThing, llrScore) =>
+              llrBlock(index, otherThing) = llrScore
+          }
+        }
+
+        keys -> llrBlock
+    }
+  }
+
+  /**
+   * Selectively downsample users and things with an anomalous amount of interactions, inspired
by
+   * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
+   *
+   * additionally binarizes input matrix, as we're only interesting in knowing whether interactions
happened or not
+   */
+  def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
+
+    implicit val distributedContext = drmM.context
+
+    // Pin raw interaction matrix
+    val drmI = drmM.checkpoint()
+
+    // Broadcast vector containing the number of interactions with each thing
+    val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn)
+
+    val downSampledDrmI = drmI.mapBlock() {
+      case (keys, block) =>
+        val numInteractions: Vector = bcastNumInteractions
+
+        // Use a hash of the unique first key to seed the RNG, makes this computation repeatable
in case of failures
+        val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
+
+        val downsampledBlock = block.like()
+
+        // Downsample the interaction vector of each user
+        for (userIndex <- 0 until keys.size) {
+
+          val interactionsOfUser = block(userIndex, ::)
+
+          val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements()
+
+          val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser
+
+          interactionsOfUser.nonZeroes().foreach { elem =>
+            val numInteractionsWithThing = numInteractions(elem.index)
+            val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing)
/ numInteractionsWithThing
+
+            if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate))
{
+              // We ignore the original interaction value and create a binary 0-1 matrix
+              // as we only consider whether interactions happened or did not happen
+              downsampledBlock(userIndex, elem.index) = 1
+            }
+          }
+        }
+
+        keys -> downsampledBlock
+    }
+
+    // Unpin raw interaction matrix
+    drmI.uncache()
+
+    downSampledDrmI
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 3a03e58..7a1fb2d 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -40,16 +40,37 @@ object SparkEngine extends DistributedEngine {
     val n = drm.ncol
 
     drm.rdd
-        // Throw away keys
-        .map(_._2)
-        // Fold() doesn't work with kryo still. So work around it.
-        .mapPartitions(iter => {
+      // Throw away keys
+      .map(_._2)
+      // Fold() doesn't work with kryo still. So work around it.
+      .mapPartitions(iter => {
       val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) => acc += v)
       Iterator(acc)
     })
-        // Since we preallocated new accumulator vector per partition, this must not cause
any side
-        // effects now.
-        .reduce(_ += _)
+      // Since we preallocated new accumulator vector per partition, this must not cause
any side
+      // effects now.
+      .reduce(_ += _)
+  }
+
+  def numNonZeroElementsPerColumn[K:ClassTag](drm: CheckpointedDrm[K]): Vector = {
+    val n = drm.ncol
+
+    drm.rdd
+      // Throw away keys
+      .map(_._2)
+      // Fold() doesn't work with kryo still. So work around it.
+      .mapPartitions(iter => {
+      val acc = ((new DenseVector(n): Vector) /: iter){(acc, v) =>
+        v.nonZeroes().foreach { elem =>
+          if (elem.get() > 0) acc(elem.index) += 1
+        }
+        acc
+      }
+      Iterator(acc)
+    })
+      // Since we preallocated new accumulator vector per partition, this must not cause
any side
+      // effects now.
+      .reduce(_ += _)
   }
 
   /** Engine-specific colMeans implementation based on a checkpoint. */

http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index c4ef0d3..e9fd7ac 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -95,7 +95,9 @@ package object sparkbindings {
         do {
           val cp = r.readLine()
           if (cp == null)
-            throw new IllegalArgumentException("Unable to read output from \"mahout classpath\"")
+            throw new IllegalArgumentException(
+              "Unable to read output from \"mahout -spark classpath\". Is SPARK_HOME defined?"
+            )
 
           val j = cp.split(File.pathSeparatorChar)
           if (j.size > 10) {

http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
new file mode 100644
index 0000000..3c05a42
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf
+
+import org.scalatest.FunSuite
+import org.apache.mahout.test.MahoutSuite
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.scalabindings.MatrixOps
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math._
+import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+
+/* values 
+A =
+1	1	0	0	0
+0	0	1	1	0
+0	0	0	0	1
+1	0	0	1	0
+
+B =
+1	1	1	1	0
+1	1	1	1	0
+0	0	1	0	1
+1	1	0	1	0
+ */
+
+class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLocalContext
{
+
+  test("cooccurrence [A'A], [B'A] boolbean data using LLR") {
+    val a = dense((1, 1, 0, 0, 0), (0, 0, 1, 1, 0), (0, 0, 0, 0, 1), (1, 0, 0, 1, 0))
+    val b = dense((1, 1, 1, 1, 0), (1, 1, 1, 1, 0), (0, 0, 1, 0, 1), (1, 1, 0, 1, 0))
+    val drmA = drmParallelize(m = a, numPartitions = 2)
+    val drmB = drmParallelize(m = b, numPartitions = 2)
+
+    // correct cooccurrence with LLR
+    val matrixLLRCoocAtAControl = dense(
+      (0.0, 1.7260924347106847, 0, 0, 0),
+      (1.7260924347106847, 0, 0, 0, 0),
+      (0, 0, 0, 1.7260924347106847, 0),
+      (0, 0, 1.7260924347106847, 0, 0),
+      (0, 0, 0, 0, 0)
+    )
+
+    // correct cross-cooccurrence with LLR
+    val matrixLLRCoocBtAControl = dense(
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+      (0, 0, 0, 0, 4.498681156950466)
+    )
+
+    //self similarity
+    val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+    val matrixSelfCooc = drmCooc(0).checkpoint().collect
+    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+    var n = (new MatrixOps(m = diffMatrix)).norm
+    n should be < 1E-10
+
+    //cross similarity
+    val matrixCrossCooc = drmCooc(1).checkpoint().collect
+    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
+    n = (new MatrixOps(m = diff2Matrix)).norm
+    n should be < 1E-10
+  }
+
+  test("cooccurrence [A'A], [B'A] double data using LLR") {
+    val a = dense((100000.0D, 1.0D, 0.0D, 0.0D, 0.0D), (0.0D, 0.0D, 10.0D, 1.0D, 0.0D), (0.0D,
0.0D, 0.0D, 0.0D, 1000.0D), (1.0D, 0.0D, 0.0D, 10.0D, 0.0D))
+    val b = dense((10000.0D, 100.0D, 1000.0D, 1.0D, 0.0D), (10.0D, 1.0D, 10000000.0D, 10.0D,
0.0D), (0.0D, 0.0D, 1000.0D, 0.0D, 100.0D), (100.0D, 1.0D, 0.0D, 100000.0D, 0.0D))
+    val drmA = drmParallelize(m = a, numPartitions = 2)
+    val drmB = drmParallelize(m = b, numPartitions = 2)
+
+    // correct cooccurrence with LLR
+    val matrixLLRCoocAtAControl = dense(
+      (0.0, 1.7260924347106847, 0, 0, 0),
+      (1.7260924347106847, 0, 0, 0, 0),
+      (0, 0, 0, 1.7260924347106847, 0),
+      (0, 0, 1.7260924347106847, 0, 0),
+      (0, 0, 0, 0, 0)
+    )
+
+    // correct cross-cooccurrence with LLR
+    val matrixLLRCoocBtAControl = dense(
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+      (0, 0, 0, 0, 4.498681156950466)
+    )
+
+    //self similarity
+    val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+    val matrixSelfCooc = drmCooc(0).checkpoint().collect
+    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+    var n = (new MatrixOps(m = diffMatrix)).norm
+    n should be < 1E-10
+
+    //cross similarity
+    val matrixCrossCooc = drmCooc(1).checkpoint().collect
+    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
+    n = (new MatrixOps(m = diff2Matrix)).norm
+    n should be < 1E-10
+  }
+
+  test("cooccurrence [A'A], [B'A] integer data using LLR") {
+    val a = dense((1000, 10, 0, 0, 0), (0, 0, 10000, 10, 0), (0, 0, 0, 0, 100), (10000, 0,
0, 1000, 0))
+    val b = dense((100, 1000, 10000, 10000, 0), (10000, 1000, 100, 10, 0), (0, 0, 10, 0,
100), (10, 100, 0, 1000, 0))
+    val drmA = drmParallelize(m = a, numPartitions = 2)
+    val drmB = drmParallelize(m = b, numPartitions = 2)
+
+    // correct cooccurrence with LLR
+    val matrixLLRCoocAtAControl = dense(
+      (0.0, 1.7260924347106847, 0, 0, 0),
+      (1.7260924347106847, 0, 0, 0, 0),
+      (0, 0, 0, 1.7260924347106847, 0),
+      (0, 0, 1.7260924347106847, 0, 0),
+      (0, 0, 0, 0, 0)
+    )
+
+    // correct cross-cooccurrence with LLR
+    val matrixLLRCoocBtAControl = dense(
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+      (0, 0, 0, 0, 4.498681156950466)
+    )
+
+    //self similarity
+    val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+    //var cp = drmSelfCooc(0).checkpoint()
+    //cp.writeDRM("/tmp/cooc-spark/")//to get values written
+    val matrixSelfCooc = drmCooc(0).checkpoint().collect
+    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+    var n = (new MatrixOps(m = diffMatrix)).norm
+    n should be < 1E-10
+
+    //cross similarity
+    val matrixCrossCooc = drmCooc(1).checkpoint().collect
+    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
+    n = (new MatrixOps(m = diff2Matrix)).norm
+    n should be < 1E-10
+  }
+
+  test("LLR calc") {
+    val numInteractionsWithAandB = 10L
+    val numInteractionsWithA = 100L
+    val numInteractionsWithB = 200L
+    val numInteractions = 10000l
+
+    val llr = CooccurrenceAnalysis.loglikelihoodRatio(numInteractionsWithA, numInteractionsWithB,
numInteractionsWithAandB, numInteractions)
+
+    assert(llr == 17.19462327013025)
+  }
+
+  test("downsampling by number per row") {
+    val a = dense((1, 1, 1, 1, 0),
+      (1, 1, 1, 1, 1),
+      (0, 0, 0, 0, 1),
+      (1, 1, 0, 1, 0)
+    )
+    val drmA: DrmLike[Int] = drmParallelize(m = a, numPartitions = 2)
+
+    val downSampledDrm = CooccurrenceAnalysis.sampleDownAndBinarize(drmA, 0xdeadbeef, 4)
+    //count non-zero values, should be == 7
+    var numValues = 0
+    val m = downSampledDrm.collect
+    val it = m.iterator()
+    while (it.hasNext) {
+      val v = it.next().vector()
+      val nonZeroIt = v.nonZeroes().iterator()
+      while (nonZeroIt.hasNext) {
+        numValues += 1
+        nonZeroIt.next()
+      }
+    }
+
+    assert(numValues == 8) //Don't change the random seed or this may fail.
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
index 6152426..30a602b 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
@@ -463,4 +463,15 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext
{
     drmA.colMeans() should equal (inCoreA.colMeans())
   }
 
+  test("numNonZeroElementsPerColumn") {
+    val inCoreA = dense(
+      (0, 2),
+      (3, 4),
+      (0, 30)
+    )
+    val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+    drmA.numNonZeroElementsPerColumn() should equal (inCoreA.numNonZeroElementsPerColumn())
+  }
+
 }


Mime
View raw message