spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sethah <...@git.apache.org>
Subject [GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Date Mon, 10 Oct 2016 18:02:25 GMT
Github user sethah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15342#discussion_r82653577
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
    @@ -258,149 +252,106 @@ class KMeans private (
             }
         }
         val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
    -    logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds)
+
    -      " seconds.")
    +    logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.")
     
    -    val active = Array.fill(numRuns)(true)
    -    val costs = Array.fill(numRuns)(0.0)
    -
    -    var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
    +    var converged = false
    +    var cost = 0.0
         var iteration = 0
     
         val iterationStartTime = System.nanoTime()
     
    -    instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
    +    instr.foreach(_.logNumFeatures(centers.head.vector.size))
     
    -    // Execute iterations of Lloyd's algorithm until all runs have converged
    -    while (iteration < maxIterations && !activeRuns.isEmpty) {
    -      type WeightedPoint = (Vector, Long)
    -      def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = {
    -        axpy(1.0, x._1, y._1)
    -        (y._1, x._2 + y._2)
    -      }
    -
    -      val activeCenters = activeRuns.map(r => centers(r)).toArray
    -      val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
    -
    -      val bcActiveCenters = sc.broadcast(activeCenters)
    +    // Execute iterations of Lloyd's algorithm until converged
    +    while (iteration < maxIterations && !converged) {
    +      val costAccum = sc.doubleAccumulator
    +      val bcCenters = sc.broadcast(centers)
     
           // Find the sum and count of points mapping to each center
           val totalContribs = data.mapPartitions { points =>
    -        val thisActiveCenters = bcActiveCenters.value
    -        val runs = thisActiveCenters.length
    -        val k = thisActiveCenters(0).length
    -        val dims = thisActiveCenters(0)(0).vector.size
    +        val thisCenters = bcCenters.value
    +        val dims = thisCenters.head.vector.size
     
    -        val sums = Array.fill(runs, k)(Vectors.zeros(dims))
    -        val counts = Array.fill(runs, k)(0L)
    +        val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
    +        val counts = Array.fill(thisCenters.length)(0L)
     
             points.foreach { point =>
    -          (0 until runs).foreach { i =>
    -            val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point)
    -            costAccums(i).add(cost)
    -            val sum = sums(i)(bestCenter)
    -            axpy(1.0, point.vector, sum)
    -            counts(i)(bestCenter) += 1
    -          }
    +          val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
    +          costAccum.add(cost)
    +          val sum = sums(bestCenter)
    +          axpy(1.0, point.vector, sum)
    +          counts(bestCenter) += 1
             }
     
    -        val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
    -          ((i, j), (sums(i)(j), counts(i)(j)))
    -        }
    -        contribs.iterator
    -      }.reduceByKey(mergeContribs).collectAsMap()
    -
    -      bcActiveCenters.destroy(blocking = false)
    -
    -      // Update the cluster centers and costs for each active run
    -      for ((run, i) <- activeRuns.zipWithIndex) {
    -        var changed = false
    -        var j = 0
    -        while (j < k) {
    -          val (sum, count) = totalContribs((i, j))
    -          if (count != 0) {
    -            scal(1.0 / count, sum)
    -            val newCenter = new VectorWithNorm(sum)
    -            if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon *
epsilon) {
    -              changed = true
    -            }
    -            centers(run)(j) = newCenter
    -          }
    -          j += 1
    -        }
    -        if (!changed) {
    -          active(run) = false
    -          logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations")
    +        counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator
    +      }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
    +        axpy(1.0, sum2, sum1)
    +        (sum1, count1 + count2)
    +      }.collectAsMap()
    +
    +      bcCenters.destroy(blocking = false)
    +
    +      // Update the cluster centers and costs
    +      converged = true
    --- End diff --
    
    The logic is the same, yes, but it seems really strange to set something to false, then
each iteration set it to true and then set it back false if some condition. Why not leave
it false and change to true if convergence criteria is met? This is basically a trivial detail,
so only change it if you want. I'm fine either way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message