spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject spark git commit: [SPARK-5714][Mllib] Refactor initial step of LDA to remove redundant operations
Date Wed, 11 Feb 2015 05:51:25 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 63af90c3c -> ba3aa8fcb


[SPARK-5714][Mllib] Refactor initial step of LDA to remove redundant operations

The `initialState` of LDA performs several RDD operations that looks redundant. This pr tries
to simplify these operations.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #4501 from viirya/sim_lda and squashes the following commits:

4870fe4 [Liang-Chi Hsieh] For comments.
9af1487 [Liang-Chi Hsieh] Refactor initial step of LDA to remove redundant operations.

(cherry picked from commit f86a89a2e081ee4593ce03398c2283fd77daac6e)
Signed-off-by: Xiangrui Meng <meng@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba3aa8fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba3aa8fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba3aa8fc

Branch: refs/heads/branch-1.3
Commit: ba3aa8fcb63968026f94dc389b4db419992fabf7
Parents: 63af90c
Author: Liang-Chi Hsieh <viirya@gmail.com>
Authored: Tue Feb 10 21:51:15 2015 -0800
Committer: Xiangrui Meng <meng@databricks.com>
Committed: Tue Feb 10 21:51:22 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/mllib/clustering/LDA.scala | 37 +++++++-------------
 1 file changed, 13 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ba3aa8fc/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
index a1d3df0..5e17c8d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
@@ -450,34 +450,23 @@ private[clustering] object LDA {
 
     // Create vertices.
     // Initially, we use random soft assignments of tokens to topics (random gamma).
-    val edgesWithGamma: RDD[(Edge[TokenCount], TopicCounts)] =
-      edges.mapPartitionsWithIndex { case (partIndex, partEdges) =>
-        val random = new Random(partIndex + randomSeed)
-        partEdges.map { edge =>
-          // Create a random gamma_{wjk}
-          (edge, normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0))
+    def createVertices(): RDD[(VertexId, TopicCounts)] = {
+      val verticesTMP: RDD[(VertexId, TopicCounts)] =
+        edges.mapPartitionsWithIndex { case (partIndex, partEdges) =>
+          val random = new Random(partIndex + randomSeed)
+          partEdges.flatMap { edge =>
+            val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0)
+            val sum = gamma * edge.attr
+            Seq((edge.srcId, sum), (edge.dstId, sum))
+          }
         }
-      }
-    def createVertices(sendToWhere: Edge[TokenCount] => VertexId): RDD[(VertexId, TopicCounts)]
= {
-      val verticesTMP: RDD[(VertexId, (TokenCount, TopicCounts))] =
-        edgesWithGamma.map { case (edge, gamma: TopicCounts) =>
-          (sendToWhere(edge), (edge.attr, gamma))
-        }
-      verticesTMP.aggregateByKey(BDV.zeros[Double](k))(
-        (sum, t) => {
-          brzAxpy(t._1, t._2, sum)
-          sum
-        },
-        (sum0, sum1) => {
-          sum0 += sum1
-        }
-      )
+      verticesTMP.reduceByKey(_ + _)
     }
-    val docVertices = createVertices(_.srcId)
-    val termVertices = createVertices(_.dstId)
+
+    val docTermVertices = createVertices()
 
     // Partition such that edges are grouped by document
-    val graph = Graph(docVertices ++ termVertices, edges)
+    val graph = Graph(docTermVertices, edges)
       .partitionBy(PartitionStrategy.EdgePartition1D)
 
     new EMOptimizer(graph, k, vocabSize, docConcentration, topicConcentration, checkpointInterval)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message