spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mengxr <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-1485][MLLIB] Implement Butterfly AllRed...
Date Wed, 23 Apr 2014 21:10:09 GMT
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/506#discussion_r11924002
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala ---
    @@ -44,6 +44,54 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) {
           new SlidingRDD[T](self, windowSize)
         }
       }
    +
    +  /**
    +   * Returns an RDD with the specified slice of partitions.
    +   */
    +  def slicePartitions(slice: Seq[Int]): RDD[T] = {
    +    new PartitionSlicingRDD(self, slice)
    +  }
    +
    +  /**
    +   * Computes the all-reduced RDD of the parent RDD, which has the same number of partitions
and
    +   * locality information as its parent RDD. Each partition contains only one record,
which is the
    +   * same as calling `RDD#reduce` on its parent RDD.
    +   *
    +   * @param f reducer
    +   * @return all-reduced RDD
    +   */
    +  def allReduce(f: (T, T) => T): RDD[T] = {
    +    val numPartitions = self.partitions.size
    +    require(numPartitions > 0, "Parent RDD does not have any partitions.")
    +    val nextPowerOfTwo = {
    +      var i = 0
    +      while ((numPartitions >> i) > 0) {
    +        i += 1
    +      }
    +      1 << i
    +    }
    +    var butterfly = self.mapPartitions( (iter) =>
    +      Iterator(iter.reduce(f)),
    +      preservesPartitioning = true
    +    ).cache()
    +
    +    if (nextPowerOfTwo > numPartitions) {
    +      val padding = self.context.parallelize(Seq.empty[T], nextPowerOfTwo - numPartitions)
    +      butterfly = butterfly.union(padding)
    +    }
    +
    +    var offset = nextPowerOfTwo >> 1
    +    while (offset > 0) {
    +      butterfly = new ButterflyReducedRDD[T](butterfly, f, offset).cache()
    --- End diff --
    
    Actually, I thought about doing that. I prefer lazy transformations, given the fact that
old cached RDDs will be cleared from memory for new ones. But I am not sure whether cleaning
is reliable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message