spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [28/69] [abbrv] [partial] Initial work to rename package to org.apache.spark
Date Sun, 01 Sep 2013 22:00:34 GMT
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
deleted file mode 100644
index cc1285d..0000000
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ /dev/null
@@ -1,703 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.nio.ByteBuffer
-import java.util.{Date, HashMap => JHashMap}
-import java.text.SimpleDateFormat
-
-import scala.collection.{mutable, Map}
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.io.SequenceFile.CompressionType
-import org.apache.hadoop.mapred.FileOutputCommitter
-import org.apache.hadoop.mapred.FileOutputFormat
-import org.apache.hadoop.mapred.SparkHadoopWriter
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.OutputFormat
-
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat,
-    RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, SparkHadoopMapReduceUtil}
-import org.apache.hadoop.security.UserGroupInformation
-
-import spark.partial.BoundedDouble
-import spark.partial.PartialResult
-import spark.rdd._
-import spark.SparkContext._
-import spark.Partitioner._
-
-/**
- * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
- * Import `spark.SparkContext._` at the top of your program to use these functions.
- */
-class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
-  extends Logging
-  with SparkHadoopMapReduceUtil
-  with Serializable {
-
-  /**
-   * Generic function to combine the elements for each key using a custom set of aggregation
-   * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
-   * Note that V and C can be different -- for example, one might group an RDD of type
-   * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
-   *
-   * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
-   * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
-   * - `mergeCombiners`, to combine two C's into a single one.
-   *
-   * In addition, users can control the partitioning of the output RDD, and whether to perform
-   * map-side aggregation (if a mapper can produce multiple items with the same key).
-   */
-  def combineByKey[C](createCombiner: V => C,
-      mergeValue: (C, V) => C,
-      mergeCombiners: (C, C) => C,
-      partitioner: Partitioner,
-      mapSideCombine: Boolean = true,
-      serializerClass: String = null): RDD[(K, C)] = {
-    if (getKeyClass().isArray) {
-      if (mapSideCombine) {
-        throw new SparkException("Cannot use map-side combining with array keys.")
-      }
-      if (partitioner.isInstanceOf[HashPartitioner]) {
-        throw new SparkException("Default partitioner cannot partition array keys.")
-      }
-    }
-    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
-    if (self.partitioner == Some(partitioner)) {
-      self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
-    } else if (mapSideCombine) {
-      val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
-      val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
-        .setSerializer(serializerClass)
-      partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true)
-    } else {
-      // Don't apply map-side combiner.
-      // A sanity check to make sure mergeCombiners is not defined.
-      assert(mergeCombiners == null)
-      val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
-      values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
-    }
-  }
-
-  /**
-   * Simplified version of combineByKey that hash-partitions the output RDD.
-   */
-  def combineByKey[C](createCombiner: V => C,
-      mergeValue: (C, V) => C,
-      mergeCombiners: (C, C) => C,
-      numPartitions: Int): RDD[(K, C)] = {
-    combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
-  }
-
-  /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
-   */
-  def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
-    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
-    val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
-    val zeroArray = new Array[Byte](zeroBuffer.limit)
-    zeroBuffer.get(zeroArray)
-
-    // When deserializing, use a lazy val to create just one instance of the serializer per task
-    lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
-    def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
-
-    combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
-  }
-
-  /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
-   */
-  def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = {
-    foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
-  }
-
-  /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
-   */
-  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = {
-    foldByKey(zeroValue, defaultPartitioner(self))(func)
-  }
-
-  /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce.
-   */
-  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
-    combineByKey[V]((v: V) => v, func, func, partitioner)
-  }
-
-  /**
-   * Merge the values for each key using an associative reduce function, but return the results
-   * immediately to the master as a Map. This will also perform the merging locally on each mapper
-   * before sending results to a reducer, similarly to a "combiner" in MapReduce.
-   */
-  def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
-
-    if (getKeyClass().isArray) {
-      throw new SparkException("reduceByKeyLocally() does not support array keys")
-    }
-
-    def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
-      val map = new JHashMap[K, V]
-      iter.foreach { case (k, v) =>
-        val old = map.get(k)
-        map.put(k, if (old == null) v else func(old, v))
-      }
-      Iterator(map)
-    }
-
-    def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
-      m2.foreach { case (k, v) =>
-        val old = m1.get(k)
-        m1.put(k, if (old == null) v else func(old, v))
-      }
-      m1
-    }
-
-    self.mapPartitions(reducePartition).reduce(mergeMaps)
-  }
-
-  /** Alias for reduceByKeyLocally */
-  def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func)
-
-  /** Count the number of elements for each key, and return the result to the master as a Map. */
-  def countByKey(): Map[K, Long] = self.map(_._1).countByValue()
-
-  /**
-   * (Experimental) Approximate version of countByKey that can return a partial result if it does
-   * not finish within a timeout.
-   */
-  def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
-      : PartialResult[Map[K, BoundedDouble]] = {
-    self.map(_._1).countByValueApprox(timeout, confidence)
-  }
-
-  /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
-   */
-  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
-    reduceByKey(new HashPartitioner(numPartitions), func)
-  }
-
-  /**
-   * Group the values for each key in the RDD into a single sequence. Allows controlling the
-   * partitioning of the resulting key-value pair RDD by passing a Partitioner.
-   */
-  def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
-    // groupByKey shouldn't use map side combine because map side combine does not
-    // reduce the amount of data shuffled and requires all map side data be inserted
-    // into a hash table, leading to more objects in the old gen.
-    def createCombiner(v: V) = ArrayBuffer(v)
-    def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
-    val bufs = combineByKey[ArrayBuffer[V]](
-      createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
-    bufs.asInstanceOf[RDD[(K, Seq[V])]]
-  }
-
-  /**
-   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
-   * resulting RDD with into `numPartitions` partitions.
-   */
-  def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = {
-    groupByKey(new HashPartitioner(numPartitions))
-  }
-
-  /**
-   * Return a copy of the RDD partitioned using the specified partitioner.
-   */
-  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
-    if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
-      throw new SparkException("Default partitioner cannot partition array keys.")
-    }
-    new ShuffledRDD[K, V, (K, V)](self, partitioner)
-  }
-
-  /**
-   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
-   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
-   * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
-   */
-  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
-    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
-      for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
-    }
-  }
-
-  /**
-   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
-   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
-   * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
-   * partition the output RDD.
-   */
-  def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
-    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
-      if (ws.isEmpty) {
-        vs.iterator.map(v => (v, None))
-      } else {
-        for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
-      }
-    }
-  }
-
-  /**
-   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
-   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
-   * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
-   * partition the output RDD.
-   */
-  def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
-      : RDD[(K, (Option[V], W))] = {
-    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
-      if (vs.isEmpty) {
-        ws.iterator.map(w => (None, w))
-      } else {
-        for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
-      }
-    }
-  }
-
-  /**
-   * Simplified version of combineByKey that hash-partitions the resulting RDD using the
-   * existing partitioner/parallelism level.
-   */
-  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
-      : RDD[(K, C)] = {
-    combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
-  }
-
-  /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
-   * parallelism level.
-   */
-  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
-    reduceByKey(defaultPartitioner(self), func)
-  }
-
-  /**
-   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
-   * resulting RDD with the existing partitioner/parallelism level.
-   */
-  def groupByKey(): RDD[(K, Seq[V])] = {
-    groupByKey(defaultPartitioner(self))
-  }
-
-  /**
-   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
-   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
-   * (k, v2) is in `other`. Performs a hash join across the cluster.
-   */
-  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
-    join(other, defaultPartitioner(self, other))
-  }
-
-  /**
-   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
-   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
-   * (k, v2) is in `other`. Performs a hash join across the cluster.
-   */
-  def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = {
-    join(other, new HashPartitioner(numPartitions))
-  }
-
-  /**
-   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
-   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
-   * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
-   * using the existing partitioner/parallelism level.
-   */
-  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
-    leftOuterJoin(other, defaultPartitioner(self, other))
-  }
-
-  /**
-   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
-   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
-   * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
-   * into `numPartitions` partitions.
-   */
-  def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = {
-    leftOuterJoin(other, new HashPartitioner(numPartitions))
-  }
-
-  /**
-   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
-   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
-   * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
-   * RDD using the existing partitioner/parallelism level.
-   */
-  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
-    rightOuterJoin(other, defaultPartitioner(self, other))
-  }
-
-  /**
-   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
-   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
-   * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
-   * RDD into the given number of partitions.
-   */
-  def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = {
-    rightOuterJoin(other, new HashPartitioner(numPartitions))
-  }
-
-  /**
-   * Return the key-value pairs in this RDD to the master as a Map.
-   */
-  def collectAsMap(): Map[K, V] = {
-    val data = self.toArray()
-    val map = new mutable.HashMap[K, V]
-    map.sizeHint(data.length)
-    data.foreach { case (k, v) => map.put(k, v) }
-    map
-  }
-
-  /**
-   * Pass each value in the key-value pair RDD through a map function without changing the keys;
-   * this also retains the original RDD's partitioning.
-   */
-  def mapValues[U](f: V => U): RDD[(K, U)] = {
-    val cleanF = self.context.clean(f)
-    new MappedValuesRDD(self, cleanF)
-  }
-
-  /**
-   * Pass each value in the key-value pair RDD through a flatMap function without changing the
-   * keys; this also retains the original RDD's partitioning.
-   */
-  def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
-    val cleanF = self.context.clean(f)
-    new FlatMappedValuesRDD(self, cleanF)
-  }
-
-  /**
-   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
-   * list of values for that key in `this` as well as `other`.
-   */
-  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
-    if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
-      throw new SparkException("Default partitioner cannot partition array keys.")
-    }
-    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
-    val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
-    prfs.mapValues { case Seq(vs, ws) =>
-      (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
-    }
-  }
-
-  /**
-   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
-   * tuple with the list of values for that key in `this`, `other1` and `other2`.
-   */
-  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
-      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
-    if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
-      throw new SparkException("Default partitioner cannot partition array keys.")
-    }
-    val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
-    val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
-    prfs.mapValues { case Seq(vs, w1s, w2s) =>
-      (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
-    }
-  }
-
-  /**
-   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
-   * list of values for that key in `this` as well as `other`.
-   */
-  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
-    cogroup(other, defaultPartitioner(self, other))
-  }
-
-  /**
-   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
-   * tuple with the list of values for that key in `this`, `other1` and `other2`.
-   */
-  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
-      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
-    cogroup(other1, other2, defaultPartitioner(self, other1, other2))
-  }
-
-  /**
-   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
-   * list of values for that key in `this` as well as `other`.
-   */
-  def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
-    cogroup(other, new HashPartitioner(numPartitions))
-  }
-
-  /**
-   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
-   * tuple with the list of values for that key in `this`, `other1` and `other2`.
-   */
-  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
-      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
-    cogroup(other1, other2, new HashPartitioner(numPartitions))
-  }
-
-  /** Alias for cogroup. */
-  def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
-    cogroup(other, defaultPartitioner(self, other))
-  }
-
-  /** Alias for cogroup. */
-  def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
-      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
-    cogroup(other1, other2, defaultPartitioner(self, other1, other2))
-  }
-
-  /**
-   * Return an RDD with the pairs from `this` whose keys are not in `other`.
-   *
-   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
-   * RDD will be <= us.
-   */
-  def subtractByKey[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, V)] =
-    subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size)))
-
-  /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
-  def subtractByKey[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
-    subtractByKey(other, new HashPartitioner(numPartitions))
-
-  /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
-  def subtractByKey[W: ClassManifest](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] =
-    new SubtractedRDD[K, V, W](self, other, p)
-
-  /**
-   * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
-   * RDD has a known partitioner by only searching the partition that the key maps to.
-   */
-  def lookup(key: K): Seq[V] = {
-    self.partitioner match {
-      case Some(p) =>
-        val index = p.getPartition(key)
-        def process(it: Iterator[(K, V)]): Seq[V] = {
-          val buf = new ArrayBuffer[V]
-          for ((k, v) <- it if k == key) {
-            buf += v
-          }
-          buf
-        }
-        val res = self.context.runJob(self, process _, Array(index), false)
-        res(0)
-      case None =>
-        self.filter(_._1 == key).map(_._2).collect()
-    }
-  }
-
-  /**
-   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
-   * supporting the key and value types K and V in this RDD.
-   */
-  def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
-    saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
-  }
-
-  /**
-   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
-   * supporting the key and value types K and V in this RDD. Compress the result with the
-   * supplied codec.
-   */
-  def saveAsHadoopFile[F <: OutputFormat[K, V]](
-      path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassManifest[F]) {
-    saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]], codec)
-  }
-
-  /**
-   * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
-   * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
-   */
-  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
-    saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
-  }
-
-  /**
-   * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
-   * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
-   */
-  def saveAsNewAPIHadoopFile(
-      path: String,
-      keyClass: Class[_],
-      valueClass: Class[_],
-      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
-      conf: Configuration = self.context.hadoopConfiguration) {
-    val job = new NewAPIHadoopJob(conf)
-    job.setOutputKeyClass(keyClass)
-    job.setOutputValueClass(valueClass)
-    val wrappedConf = new SerializableWritable(job.getConfiguration)
-    NewFileOutputFormat.setOutputPath(job, new Path(path))
-    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    val jobtrackerID = formatter.format(new Date())
-    val stageId = self.id
-    def writeShard(context: spark.TaskContext, iter: Iterator[(K,V)]): Int = {
-      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
-      // around by taking a mod. We expect that no task will be attempted 2 billion times.
-      val attemptNumber = (context.attemptId % Int.MaxValue).toInt
-      /* "reduce task" <split #> <attempt # = spark task #> */
-      val attemptId = newTaskAttemptID(jobtrackerID, stageId, false, context.splitId, attemptNumber)
-      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
-      val format = outputFormatClass.newInstance
-      val committer = format.getOutputCommitter(hadoopContext)
-      committer.setupTask(hadoopContext)
-      val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
-      while (iter.hasNext) {
-        val (k, v) = iter.next
-        writer.write(k, v)
-      }
-      writer.close(hadoopContext)
-      committer.commitTask(hadoopContext)
-      return 1
-    }
-    val jobFormat = outputFormatClass.newInstance
-    /* apparently we need a TaskAttemptID to construct an OutputCommitter;
-     * however we're only going to use this local OutputCommitter for
-     * setupJob/commitJob, so we just use a dummy "map" task.
-     */
-    val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, true, 0, 0)
-    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
-    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
-    jobCommitter.setupJob(jobTaskContext)
-    val count = self.context.runJob(self, writeShard _).sum
-    jobCommitter.commitJob(jobTaskContext)
-    jobCommitter.cleanupJob(jobTaskContext)
-  }
-
-  /**
-   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
-   * supporting the key and value types K and V in this RDD. Compress with the supplied codec.
-   */
-  def saveAsHadoopFile(
-      path: String,
-      keyClass: Class[_],
-      valueClass: Class[_],
-      outputFormatClass: Class[_ <: OutputFormat[_, _]],
-      codec: Class[_ <: CompressionCodec]) {
-    saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass,
-      new JobConf(self.context.hadoopConfiguration), Some(codec))
-  }
-
-  /**
-   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
-   * supporting the key and value types K and V in this RDD.
-   */
-  def saveAsHadoopFile(
-      path: String,
-      keyClass: Class[_],
-      valueClass: Class[_],
-      outputFormatClass: Class[_ <: OutputFormat[_, _]],
-      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
-      codec: Option[Class[_ <: CompressionCodec]] = None) {
-    conf.setOutputKeyClass(keyClass)
-    conf.setOutputValueClass(valueClass)
-    // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug
-    conf.set("mapred.output.format.class", outputFormatClass.getName)
-    for (c <- codec) {
-      conf.setCompressMapOutput(true)
-      conf.set("mapred.output.compress", "true")
-      conf.setMapOutputCompressorClass(c)
-      conf.set("mapred.output.compression.codec", c.getCanonicalName)
-      conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
-    }
-    conf.setOutputCommitter(classOf[FileOutputCommitter])
-    FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
-    saveAsHadoopDataset(conf)
-  }
-
-  /**
-   * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
-   * that storage system. The JobConf should set an OutputFormat and any output paths required
-   * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
-   * MapReduce job.
-   */
-  def saveAsHadoopDataset(conf: JobConf) {
-    val outputFormatClass = conf.getOutputFormat
-    val keyClass = conf.getOutputKeyClass
-    val valueClass = conf.getOutputValueClass
-    if (outputFormatClass == null) {
-      throw new SparkException("Output format class not set")
-    }
-    if (keyClass == null) {
-      throw new SparkException("Output key class not set")
-    }
-    if (valueClass == null) {
-      throw new SparkException("Output value class not set")
-    }
-
-    logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
-
-    val writer = new SparkHadoopWriter(conf)
-    writer.preSetup()
-
-    def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
-      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
-      // around by taking a mod. We expect that no task will be attempted 2 billion times.
-      val attemptNumber = (context.attemptId % Int.MaxValue).toInt
-
-      writer.setup(context.stageId, context.splitId, attemptNumber)
-      writer.open()
-
-      var count = 0
-      while(iter.hasNext) {
-        val record = iter.next()
-        count += 1
-        writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
-      }
-
-      writer.close()
-      writer.commit()
-    }
-
-    self.context.runJob(self, writeToFile _)
-    writer.commitJob()
-    writer.cleanup()
-  }
-
-  /**
-   * Return an RDD with the keys of each tuple.
-   */
-  def keys: RDD[K] = self.map(_._1)
-
-  /**
-   * Return an RDD with the values of each tuple.
-   */
-  def values: RDD[V] = self.map(_._2)
-
-  private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
-
-  private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
-}
-
-
-private[spark] object Manifests {
-  val seqSeqManifest = classManifest[Seq[Seq[_]]]
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/Partition.scala b/core/src/main/scala/spark/Partition.scala
deleted file mode 100644
index 2a4edce..0000000
--- a/core/src/main/scala/spark/Partition.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-/**
- * A partition of an RDD.
- */
-trait Partition extends Serializable {
-  /**
-   * Get the split's index within its parent RDD
-   */
-  def index: Int
-  
-  // A better default implementation of HashCode
-  override def hashCode(): Int = index
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
deleted file mode 100644
index 65da823..0000000
--- a/core/src/main/scala/spark/Partitioner.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-/**
- * An object that defines how the elements in a key-value pair RDD are partitioned by key.
- * Maps each key to a partition ID, from 0 to `numPartitions - 1`.
- */
-abstract class Partitioner extends Serializable {
-  def numPartitions: Int
-  def getPartition(key: Any): Int
-}
-
-object Partitioner {
-  /**
-   * Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
-   *
-   * If any of the RDDs already has a partitioner, choose that one.
-   *
-   * Otherwise, we use a default HashPartitioner. For the number of partitions, if
-   * spark.default.parallelism is set, then we'll use the value from SparkContext
-   * defaultParallelism, otherwise we'll use the max number of upstream partitions.
-   *
-   * Unless spark.default.parallelism is set, He number of partitions will be the
-   * same as the number of partitions in the largest upstream RDD, as this should
-   * be least likely to cause out-of-memory errors.
-   *
-   * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
-   */
-  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
-    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
-    for (r <- bySize if r.partitioner != None) {
-      return r.partitioner.get
-    }
-    if (System.getProperty("spark.default.parallelism") != null) {
-      return new HashPartitioner(rdd.context.defaultParallelism)
-    } else {
-      return new HashPartitioner(bySize.head.partitions.size)
-    }
-  }
-}
-
-/**
- * A [[spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`.
- *
- * Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
- * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
- * produce an unexpected or incorrect result.
- */
-class HashPartitioner(partitions: Int) extends Partitioner {
-  def numPartitions = partitions
-
-  def getPartition(key: Any): Int = key match {
-    case null => 0
-    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
-  }
-  
-  override def equals(other: Any): Boolean = other match {
-    case h: HashPartitioner =>
-      h.numPartitions == numPartitions
-    case _ =>
-      false
-  }
-}
-
-/**
- * A [[spark.Partitioner]] that partitions sortable records by range into roughly equal ranges.
- * Determines the ranges by sampling the RDD passed in.
- */
-class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
-    partitions: Int,
-    @transient rdd: RDD[_ <: Product2[K,V]],
-    private val ascending: Boolean = true) 
-  extends Partitioner {
-
-  // An array of upper bounds for the first (partitions - 1) partitions
-  private val rangeBounds: Array[K] = {
-    if (partitions == 1) {
-      Array()
-    } else {
-      val rddSize = rdd.count()
-      val maxSampleSize = partitions * 20.0
-      val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0)
-      val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sortWith(_ < _)
-      if (rddSample.length == 0) {
-        Array()
-      } else {
-        val bounds = new Array[K](partitions - 1)
-        for (i <- 0 until partitions - 1) {
-          val index = (rddSample.length - 1) * (i + 1) / partitions
-          bounds(i) = rddSample(index)
-        }
-        bounds
-      }
-    }
-  }
-
-  def numPartitions = partitions
-
-  def getPartition(key: Any): Int = {
-    // TODO: Use a binary search here if number of partitions is large
-    val k = key.asInstanceOf[K]
-    var partition = 0
-    while (partition < rangeBounds.length && k > rangeBounds(partition)) {
-      partition += 1
-    }
-    if (ascending) {
-      partition
-    } else {
-      rangeBounds.length - partition
-    }
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case r: RangePartitioner[_,_] =>
-      r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
-    case _ =>
-      false
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
deleted file mode 100644
index 25a6951..0000000
--- a/core/src/main/scala/spark/RDD.scala
+++ /dev/null
@@ -1,957 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.util.Random
-
-import scala.collection.Map
-import scala.collection.JavaConversions.mapAsScalaMap
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.TextOutputFormat
-
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
-
-import spark.Partitioner._
-import spark.api.java.JavaRDD
-import spark.partial.BoundedDouble
-import spark.partial.CountEvaluator
-import spark.partial.GroupedCountEvaluator
-import spark.partial.PartialResult
-import spark.rdd.CoalescedRDD
-import spark.rdd.CartesianRDD
-import spark.rdd.FilteredRDD
-import spark.rdd.FlatMappedRDD
-import spark.rdd.GlommedRDD
-import spark.rdd.MappedRDD
-import spark.rdd.MapPartitionsRDD
-import spark.rdd.MapPartitionsWithIndexRDD
-import spark.rdd.PipedRDD
-import spark.rdd.SampledRDD
-import spark.rdd.ShuffledRDD
-import spark.rdd.UnionRDD
-import spark.rdd.ZippedRDD
-import spark.rdd.ZippedPartitionsRDD2
-import spark.rdd.ZippedPartitionsRDD3
-import spark.rdd.ZippedPartitionsRDD4
-import spark.storage.StorageLevel
-import spark.util.BoundedPriorityQueue
-
-import SparkContext._
-
-/**
- * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
- * partitioned collection of elements that can be operated on in parallel. This class contains the
- * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
- * [[spark.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such
- * as `groupByKey` and `join`; [[spark.DoubleRDDFunctions]] contains operations available only on
- * RDDs of Doubles; and [[spark.SequenceFileRDDFunctions]] contains operations available on RDDs
- * that can be saved as SequenceFiles. These operations are automatically available on any RDD of
- * the right type (e.g. RDD[(Int, Int)] through implicit conversions when you
- * `import spark.SparkContext._`.
- *
- * Internally, each RDD is characterized by five main properties:
- *
- *  - A list of partitions
- *  - A function for computing each split
- *  - A list of dependencies on other RDDs
- *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
- *    an HDFS file)
- *
- * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
- * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
- * reading data from a new storage system) by overriding these functions. Please refer to the
- * [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
- * on RDD internals.
- */
-abstract class RDD[T: ClassManifest](
-    @transient private var sc: SparkContext,
-    @transient private var deps: Seq[Dependency[_]]
-  ) extends Serializable with Logging {
-
-  /** Construct an RDD with just a one-to-one dependency on one parent */
-  def this(@transient oneParent: RDD[_]) =
-    this(oneParent.context , List(new OneToOneDependency(oneParent)))
-
-  // =======================================================================
-  // Methods that should be implemented by subclasses of RDD
-  // =======================================================================
-
-  /** Implemented by subclasses to compute a given partition. */
-  def compute(split: Partition, context: TaskContext): Iterator[T]
-
-  /**
-   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
-   * be called once, so it is safe to implement a time-consuming computation in it.
-   */
-  protected def getPartitions: Array[Partition]
-
-  /**
-   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
-   * be called once, so it is safe to implement a time-consuming computation in it.
-   */
-  protected def getDependencies: Seq[Dependency[_]] = deps
-
-  /** Optionally overridden by subclasses to specify placement preferences. */
-  protected def getPreferredLocations(split: Partition): Seq[String] = Nil
-
-  /** Optionally overridden by subclasses to specify how they are partitioned. */
-  val partitioner: Option[Partitioner] = None
-
-  // =======================================================================
-  // Methods and fields available on all RDDs
-  // =======================================================================
-
-  /** The SparkContext that created this RDD. */
-  def sparkContext: SparkContext = sc
-
-  /** A unique ID for this RDD (within its SparkContext). */
-  val id: Int = sc.newRddId()
-
-  /** A friendly name for this RDD */
-  var name: String = null
-
-  /** Assign a name to this RDD */
-  def setName(_name: String) = {
-    name = _name
-    this
-  }
-
-  /** User-defined generator of this RDD*/
-  var generator = Utils.getCallSiteInfo.firstUserClass
-
-  /** Reset generator*/
-  def setGenerator(_generator: String) = {
-    generator = _generator
-  }
-
-  /**
-   * Set this RDD's storage level to persist its values across operations after the first time
-   * it is computed. This can only be used to assign a new storage level if the RDD does not
-   * have a storage level set yet..
-   */
-  def persist(newLevel: StorageLevel): RDD[T] = {
-    // TODO: Handle changes of StorageLevel
-    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
-      throw new UnsupportedOperationException(
-        "Cannot change storage level of an RDD after it was already assigned a level")
-    }
-    storageLevel = newLevel
-    // Register the RDD with the SparkContext
-    sc.persistentRdds(id) = this
-    this
-  }
-
-  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
-  def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
-
-  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
-  def cache(): RDD[T] = persist()
-
-  /**
-   * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
-   *
-   * @param blocking Whether to block until all blocks are deleted.
-   * @return This RDD.
-   */
-  def unpersist(blocking: Boolean = true): RDD[T] = {
-    logInfo("Removing RDD " + id + " from persistence list")
-    sc.env.blockManager.master.removeRdd(id, blocking)
-    sc.persistentRdds.remove(id)
-    storageLevel = StorageLevel.NONE
-    this
-  }
-
-  /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
-  def getStorageLevel = storageLevel
-
-  // Our dependencies and partitions will be gotten by calling subclass's methods below, and will
-  // be overwritten when we're checkpointed
-  private var dependencies_ : Seq[Dependency[_]] = null
-  @transient private var partitions_ : Array[Partition] = null
-
-  /** An Option holding our checkpoint RDD, if we are checkpointed */
-  private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)
-
-  /**
-   * Get the list of dependencies of this RDD, taking into account whether the
-   * RDD is checkpointed or not.
-   */
-  final def dependencies: Seq[Dependency[_]] = {
-    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
-      if (dependencies_ == null) {
-        dependencies_ = getDependencies
-      }
-      dependencies_
-    }
-  }
-
-  /**
-   * Get the array of partitions of this RDD, taking into account whether the
-   * RDD is checkpointed or not.
-   */
-  final def partitions: Array[Partition] = {
-    checkpointRDD.map(_.partitions).getOrElse {
-      if (partitions_ == null) {
-        partitions_ = getPartitions
-      }
-      partitions_
-    }
-  }
-
-  /**
-   * Get the preferred locations of a partition (as hostnames), taking into account whether the
-   * RDD is checkpointed.
-   */
-  final def preferredLocations(split: Partition): Seq[String] = {
-    checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
-      getPreferredLocations(split)
-    }
-  }
-
-  /**
-   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
-   * This should ''not'' be called by users directly, but is available for implementors of custom
-   * subclasses of RDD.
-   */
-  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
-    if (storageLevel != StorageLevel.NONE) {
-      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
-    } else {
-      computeOrReadCheckpoint(split, context)
-    }
-  }
-
-  /**
-   * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
-   */
-  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
-    if (isCheckpointed) {
-      firstParent[T].iterator(split, context)
-    } else {
-      compute(split, context)
-    }
-  }
-
-  // Transformations (return a new RDD)
-
-  /**
-   * Return a new RDD by applying a function to all elements of this RDD.
-   */
-  def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
-
-  /**
-   *  Return a new RDD by first applying a function to all elements of this
-   *  RDD, and then flattening the results.
-   */
-  def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] =
-    new FlatMappedRDD(this, sc.clean(f))
-
-  /**
-   * Return a new RDD containing only the elements that satisfy a predicate.
-   */
-  def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
-
-  /**
-   * Return a new RDD containing the distinct elements in this RDD.
-   */
-  def distinct(numPartitions: Int): RDD[T] =
-    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
-
-  def distinct(): RDD[T] = distinct(partitions.size)
-
-  /**
-   * Return a new RDD that is reduced into `numPartitions` partitions.
-   */
-  def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
-    if (shuffle) {
-      // include a shuffle step so that our upstream tasks are still distributed
-      new CoalescedRDD(
-        new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)),
-        new HashPartitioner(numPartitions)),
-        numPartitions).keys
-    } else {
-      new CoalescedRDD(this, numPartitions)
-    }
-  }
-
-  /**
-   * Return a sampled subset of this RDD.
-   */
-  def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
-    new SampledRDD(this, withReplacement, fraction, seed)
-
-  def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
-    var fraction = 0.0
-    var total = 0
-    val multiplier = 3.0
-    val initialCount = this.count()
-    var maxSelected = 0
-
-    if (num < 0) {
-      throw new IllegalArgumentException("Negative number of elements requested")
-    }
-
-    if (initialCount > Integer.MAX_VALUE - 1) {
-      maxSelected = Integer.MAX_VALUE - 1
-    } else {
-      maxSelected = initialCount.toInt
-    }
-
-    if (num > initialCount && !withReplacement) {
-      total = maxSelected
-      fraction = multiplier * (maxSelected + 1) / initialCount
-    } else {
-      fraction = multiplier * (num + 1) / initialCount
-      total = num
-    }
-
-    val rand = new Random(seed)
-    var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
-
-    // If the first sample didn't turn out large enough, keep trying to take samples;
-    // this shouldn't happen often because we use a big multiplier for thei initial size
-    while (samples.length < total) {
-      samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
-    }
-
-    Utils.randomizeInPlace(samples, rand).take(total)
-  }
-
-  /**
-   * Return the union of this RDD and another one. Any identical elements will appear multiple
-   * times (use `.distinct()` to eliminate them).
-   */
-  def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))
-
-  /**
-   * Return the union of this RDD and another one. Any identical elements will appear multiple
-   * times (use `.distinct()` to eliminate them).
-   */
-  def ++(other: RDD[T]): RDD[T] = this.union(other)
-
-  /**
-   * Return an RDD created by coalescing all elements within each partition into an array.
-   */
-  def glom(): RDD[Array[T]] = new GlommedRDD(this)
-
-  /**
-   * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
-   * elements (a, b) where a is in `this` and b is in `other`.
-   */
-  def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
-
-  /**
-   * Return an RDD of grouped items.
-   */
-  def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] =
-    groupBy[K](f, defaultPartitioner(this))
-
-  /**
-   * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
-   * mapping to that key.
-   */
-  def groupBy[K: ClassManifest](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
-    groupBy(f, new HashPartitioner(numPartitions))
-
-  /**
-   * Return an RDD of grouped items.
-   */
-  def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
-    val cleanF = sc.clean(f)
-    this.map(t => (cleanF(t), t)).groupByKey(p)
-  }
-
-  /**
-   * Return an RDD created by piping elements to a forked external process.
-   */
-  def pipe(command: String): RDD[String] = new PipedRDD(this, command)
-
-  /**
-   * Return an RDD created by piping elements to a forked external process.
-   */
-  def pipe(command: String, env: Map[String, String]): RDD[String] =
-    new PipedRDD(this, command, env)
-
-
-  /**
-   * Return an RDD created by piping elements to a forked external process.
-   * The print behavior can be customized by providing two functions.
-   *
-   * @param command command to run in forked process.
-   * @param env environment variables to set.
-   * @param printPipeContext Before piping elements, this function is called as an oppotunity
-   *                         to pipe context data. Print line function (like out.println) will be
-   *                         passed as printPipeContext's parameter.
-   * @param printRDDElement Use this function to customize how to pipe elements. This function
-   *                        will be called with each RDD element as the 1st parameter, and the
-   *                        print line function (like out.println()) as the 2nd parameter.
-   *                        An example of pipe the RDD data of groupBy() in a streaming way,
-   *                        instead of constructing a huge String to concat all the elements:
-   *                        def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
-   *                          for (e <- record._2){f(e)}
-   * @return the result RDD
-   */
-  def pipe(
-      command: Seq[String],
-      env: Map[String, String] = Map(),
-      printPipeContext: (String => Unit) => Unit = null,
-      printRDDElement: (T, String => Unit) => Unit = null): RDD[String] =
-    new PipedRDD(this, command, env,
-      if (printPipeContext ne null) sc.clean(printPipeContext) else null,
-      if (printRDDElement ne null) sc.clean(printRDDElement) else null)
-
-  /**
-   * Return a new RDD by applying a function to each partition of this RDD.
-   */
-  def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U],
-    preservesPartitioning: Boolean = false): RDD[U] =
-    new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
-
-  /**
-   * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
-   * of the original partition.
-   */
-  def mapPartitionsWithIndex[U: ClassManifest](
-    f: (Int, Iterator[T]) => Iterator[U],
-    preservesPartitioning: Boolean = false): RDD[U] =
-    new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
-
-  /**
-   * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
-   * of the original partition.
-   */
-  @deprecated("use mapPartitionsWithIndex", "0.7.0")
-  def mapPartitionsWithSplit[U: ClassManifest](
-    f: (Int, Iterator[T]) => Iterator[U],
-    preservesPartitioning: Boolean = false): RDD[U] =
-    new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
-
-  /**
-   * Maps f over this RDD, where f takes an additional parameter of type A.  This
-   * additional parameter is produced by constructA, which is called in each
-   * partition with the index of that partition.
-   */
-  def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
-    (f:(T, A) => U): RDD[U] = {
-      def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
-        val a = constructA(index)
-        iter.map(t => f(t, a))
-      }
-    new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
-  }
-
-  /**
-   * FlatMaps f over this RDD, where f takes an additional parameter of type A.  This
-   * additional parameter is produced by constructA, which is called in each
-   * partition with the index of that partition.
-   */
-  def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
-    (f:(T, A) => Seq[U]): RDD[U] = {
-      def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
-        val a = constructA(index)
-        iter.flatMap(t => f(t, a))
-      }
-    new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
-  }
-
-  /**
-   * Applies f to each element of this RDD, where f takes an additional parameter of type A.
-   * This additional parameter is produced by constructA, which is called in each
-   * partition with the index of that partition.
-   */
-  def foreachWith[A: ClassManifest](constructA: Int => A)
-    (f:(T, A) => Unit) {
-      def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
-        val a = constructA(index)
-        iter.map(t => {f(t, a); t})
-      }
-    (new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {})
-  }
-
-  /**
-   * Filters this RDD with p, where p takes an additional parameter of type A.  This
-   * additional parameter is produced by constructA, which is called in each
-   * partition with the index of that partition.
-   */
-  def filterWith[A: ClassManifest](constructA: Int => A)
-    (p:(T, A) => Boolean): RDD[T] = {
-      def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
-        val a = constructA(index)
-        iter.filter(t => p(t, a))
-      }
-    new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)
-  }
-
-  /**
-   * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
-   * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
-   * partitions* and the *same number of elements in each partition* (e.g. one was made through
-   * a map on the other).
-   */
-  def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
-
-  /**
-   * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
-   * applying a function to the zipped partitions. Assumes that all the RDDs have the
-   * *same number of partitions*, but does *not* require them to have the same number
-   * of elements in each partition.
-   */
-  def zipPartitions[B: ClassManifest, V: ClassManifest]
-      (rdd2: RDD[B])
-      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
-    new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
-
-  def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
-      (rdd2: RDD[B], rdd3: RDD[C])
-      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
-    new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
-
-  def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
-      (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
-      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
-    new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
-
-
-  // Actions (launch a job to return a value to the user program)
-
-  /**
-   * Applies a function f to all elements of this RDD.
-   */
-  def foreach(f: T => Unit) {
-    val cleanF = sc.clean(f)
-    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
-  }
-
-  /**
-   * Applies a function f to each partition of this RDD.
-   */
-  def foreachPartition(f: Iterator[T] => Unit) {
-    val cleanF = sc.clean(f)
-    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
-  }
-
-  /**
-   * Return an array that contains all of the elements in this RDD.
-   */
-  def collect(): Array[T] = {
-    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
-    Array.concat(results: _*)
-  }
-
-  /**
-   * Return an array that contains all of the elements in this RDD.
-   */
-  def toArray(): Array[T] = collect()
-
-  /**
-   * Return an RDD that contains all matching values by applying `f`.
-   */
-  def collect[U: ClassManifest](f: PartialFunction[T, U]): RDD[U] = {
-    filter(f.isDefinedAt).map(f)
-  }
-
-  /**
-   * Return an RDD with the elements from `this` that are not in `other`.
-   *
-   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
-   * RDD will be <= us.
-   */
-  def subtract(other: RDD[T]): RDD[T] =
-    subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))
-
-  /**
-   * Return an RDD with the elements from `this` that are not in `other`.
-   */
-  def subtract(other: RDD[T], numPartitions: Int): RDD[T] =
-    subtract(other, new HashPartitioner(numPartitions))
-
-  /**
-   * Return an RDD with the elements from `this` that are not in `other`.
-   */
-  def subtract(other: RDD[T], p: Partitioner): RDD[T] = {
-    if (partitioner == Some(p)) {
-      // Our partitioner knows how to handle T (which, since we have a partitioner, is
-      // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
-      val p2 = new Partitioner() {
-        override def numPartitions = p.numPartitions
-        override def getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1)
-      }
-      // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
-      // anyway, and when calling .keys, will not have a partitioner set, even though
-      // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
-      // partitioned by the right/real keys (e.g. p).
-      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
-    } else {
-      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
-    }
-  }
-
-  /**
-   * Reduces the elements of this RDD using the specified commutative and associative binary operator.
-   */
-  def reduce(f: (T, T) => T): T = {
-    val cleanF = sc.clean(f)
-    val reducePartition: Iterator[T] => Option[T] = iter => {
-      if (iter.hasNext) {
-        Some(iter.reduceLeft(cleanF))
-      } else {
-        None
-      }
-    }
-    var jobResult: Option[T] = None
-    val mergeResult = (index: Int, taskResult: Option[T]) => {
-      if (taskResult != None) {
-        jobResult = jobResult match {
-          case Some(value) => Some(f(value, taskResult.get))
-          case None => taskResult
-        }
-      }
-    }
-    sc.runJob(this, reducePartition, mergeResult)
-    // Get the final result out of our Option, or throw an exception if the RDD was empty
-    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
-  }
-
-  /**
-   * Aggregate the elements of each partition, and then the results for all the partitions, using a
-   * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
-   * modify t1 and return it as its result value to avoid object allocation; however, it should not
-   * modify t2.
-   */
-  def fold(zeroValue: T)(op: (T, T) => T): T = {
-    // Clone the zero value since we will also be serializing it as part of tasks
-    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
-    val cleanOp = sc.clean(op)
-    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
-    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
-    sc.runJob(this, foldPartition, mergeResult)
-    jobResult
-  }
-
-  /**
-   * Aggregate the elements of each partition, and then the results for all the partitions, using
-   * given combine functions and a neutral "zero value". This function can return a different result
-   * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
-   * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
-   * allowed to modify and return their first argument instead of creating a new U to avoid memory
-   * allocation.
-   */
-  def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
-    // Clone the zero value since we will also be serializing it as part of tasks
-    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
-    val cleanSeqOp = sc.clean(seqOp)
-    val cleanCombOp = sc.clean(combOp)
-    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
-    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
-    sc.runJob(this, aggregatePartition, mergeResult)
-    jobResult
-  }
-
-  /**
-   * Return the number of elements in the RDD.
-   */
-  def count(): Long = {
-    sc.runJob(this, (iter: Iterator[T]) => {
-      var result = 0L
-      while (iter.hasNext) {
-        result += 1L
-        iter.next()
-      }
-      result
-    }).sum
-  }
-
-  /**
-   * (Experimental) Approximate version of count() that returns a potentially incomplete result
-   * within a timeout, even if not all tasks have finished.
-   */
-  def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
-    val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
-      var result = 0L
-      while (iter.hasNext) {
-        result += 1L
-        iter.next()
-      }
-      result
-    }
-    val evaluator = new CountEvaluator(partitions.size, confidence)
-    sc.runApproximateJob(this, countElements, evaluator, timeout)
-  }
-
-  /**
-   * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
-   * combine step happens locally on the master, equivalent to running a single reduce task.
-   */
-  def countByValue(): Map[T, Long] = {
-    if (elementClassManifest.erasure.isArray) {
-      throw new SparkException("countByValue() does not support arrays")
-    }
-    // TODO: This should perhaps be distributed by default.
-    def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = {
-      val map = new OLMap[T]
-      while (iter.hasNext) {
-        val v = iter.next()
-        map.put(v, map.getLong(v) + 1L)
-      }
-      Iterator(map)
-    }
-    def mergeMaps(m1: OLMap[T], m2: OLMap[T]): OLMap[T] = {
-      val iter = m2.object2LongEntrySet.fastIterator()
-      while (iter.hasNext) {
-        val entry = iter.next()
-        m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
-      }
-      return m1
-    }
-    val myResult = mapPartitions(countPartition).reduce(mergeMaps)
-    myResult.asInstanceOf[java.util.Map[T, Long]]   // Will be wrapped as a Scala mutable Map
-  }
-
-  /**
-   * (Experimental) Approximate version of countByValue().
-   */
-  def countByValueApprox(
-      timeout: Long,
-      confidence: Double = 0.95
-      ): PartialResult[Map[T, BoundedDouble]] = {
-    if (elementClassManifest.erasure.isArray) {
-      throw new SparkException("countByValueApprox() does not support arrays")
-    }
-    val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) =>
-      val map = new OLMap[T]
-      while (iter.hasNext) {
-        val v = iter.next()
-        map.put(v, map.getLong(v) + 1L)
-      }
-      map
-    }
-    val evaluator = new GroupedCountEvaluator[T](partitions.size, confidence)
-    sc.runApproximateJob(this, countPartition, evaluator, timeout)
-  }
-
-  /**
-   * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
-   * it will be slow if a lot of partitions are required. In that case, use collect() to get the
-   * whole RDD instead.
-   */
-  def take(num: Int): Array[T] = {
-    if (num == 0) {
-      return new Array[T](0)
-    }
-    val buf = new ArrayBuffer[T]
-    var p = 0
-    while (buf.size < num && p < partitions.size) {
-      val left = num - buf.size
-      val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true)
-      buf ++= res(0)
-      if (buf.size == num)
-        return buf.toArray
-      p += 1
-    }
-    return buf.toArray
-  }
-
-  /**
-   * Return the first element in this RDD.
-   */
-  def first(): T = take(1) match {
-    case Array(t) => t
-    case _ => throw new UnsupportedOperationException("empty collection")
-  }
-
-  /**
-   * Returns the top K elements from this RDD as defined by
-   * the specified implicit Ordering[T].
-   * @param num the number of top elements to return
-   * @param ord the implicit ordering for T
-   * @return an array of top elements
-   */
-  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
-    mapPartitions { items =>
-      val queue = new BoundedPriorityQueue[T](num)
-      queue ++= items
-      Iterator.single(queue)
-    }.reduce { (queue1, queue2) =>
-      queue1 ++= queue2
-      queue1
-    }.toArray.sorted(ord.reverse)
-  }
-
-  /**
-   * Returns the first K elements from this RDD as defined by
-   * the specified implicit Ordering[T] and maintains the
-   * ordering.
-   * @param num the number of top elements to return
-   * @param ord the implicit ordering for T
-   * @return an array of top elements
-   */
-  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
-
-  /**
-   * Save this RDD as a text file, using string representations of elements.
-   */
-  def saveAsTextFile(path: String) {
-    this.map(x => (NullWritable.get(), new Text(x.toString)))
-      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
-  }
-
-  /**
-   * Save this RDD as a compressed text file, using string representations of elements.
-   */
-  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
-    this.map(x => (NullWritable.get(), new Text(x.toString)))
-      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
-  }
-
-  /**
-   * Save this RDD as a SequenceFile of serialized objects.
-   */
-  def saveAsObjectFile(path: String) {
-    this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
-      .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
-      .saveAsSequenceFile(path)
-  }
-
-  /**
-   * Creates tuples of the elements in this RDD by applying `f`.
-   */
-  def keyBy[K](f: T => K): RDD[(K, T)] = {
-    map(x => (f(x), x))
-  }
-
-  /** A private method for tests, to look at the contents of each partition */
-  private[spark] def collectPartitions(): Array[Array[T]] = {
-    sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
-  }
-
-  /**
-   * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
-   * directory set with SparkContext.setCheckpointDir() and all references to its parent
-   * RDDs will be removed. This function must be called before any job has been
-   * executed on this RDD. It is strongly recommended that this RDD is persisted in
-   * memory, otherwise saving it on a file will require recomputation.
-   */
-  def checkpoint() {
-    if (context.checkpointDir.isEmpty) {
-      throw new Exception("Checkpoint directory has not been set in the SparkContext")
-    } else if (checkpointData.isEmpty) {
-      checkpointData = Some(new RDDCheckpointData(this))
-      checkpointData.get.markForCheckpoint()
-    }
-  }
-
-  /**
-   * Return whether this RDD has been checkpointed or not
-   */
-  def isCheckpointed: Boolean = {
-    checkpointData.map(_.isCheckpointed).getOrElse(false)
-  }
-
-  /**
-   * Gets the name of the file to which this RDD was checkpointed
-   */
-  def getCheckpointFile: Option[String] = {
-    checkpointData.flatMap(_.getCheckpointFile)
-  }
-
-  // =======================================================================
-  // Other internal methods and fields
-  // =======================================================================
-
-  private var storageLevel: StorageLevel = StorageLevel.NONE
-
-  /** Record user function generating this RDD. */
-  private[spark] val origin = Utils.formatSparkCallSite
-
-  private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]
-
-  private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
-
-  /** Returns the first parent RDD */
-  protected[spark] def firstParent[U: ClassManifest] = {
-    dependencies.head.rdd.asInstanceOf[RDD[U]]
-  }
-
-  /** The [[spark.SparkContext]] that this RDD was created on. */
-  def context = sc
-
-  // Avoid handling doCheckpoint multiple times to prevent excessive recursion
-  private var doCheckpointCalled = false
-
-  /**
-   * Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler
-   * after a job using this RDD has completed (therefore the RDD has been materialized and
-   * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
-   */
-  private[spark] def doCheckpoint() {
-    if (!doCheckpointCalled) {
-      doCheckpointCalled = true
-      if (checkpointData.isDefined) {
-        checkpointData.get.doCheckpoint()
-      } else {
-        dependencies.foreach(_.rdd.doCheckpoint())
-      }
-    }
-  }
-
-  /**
-   * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
-   * created from the checkpoint file, and forget its old dependencies and partitions.
-   */
-  private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
-    clearDependencies()
-    partitions_ = null
-    deps = null    // Forget the constructor argument for dependencies too
-  }
-
-  /**
-   * Clears the dependencies of this RDD. This method must ensure that all references
-   * to the original parent RDDs is removed to enable the parent RDDs to be garbage
-   * collected. Subclasses of RDD may override this method for implementing their own cleaning
-   * logic. See [[spark.rdd.UnionRDD]] for an example.
-   */
-  protected def clearDependencies() {
-    dependencies_ = null
-  }
-
-  /** A description of this RDD and its recursive dependencies for debugging. */
-  def toDebugString: String = {
-    def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = {
-      Seq(prefix + rdd + " (" + rdd.partitions.size + " partitions)") ++
-        rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + "  "))
-    }
-    debugString(this).mkString("\n")
-  }
-
-  override def toString: String = "%s%s[%d] at %s".format(
-    Option(name).map(_ + " ").getOrElse(""),
-    getClass.getSimpleName,
-    id,
-    origin)
-
-  def toJavaRDD() : JavaRDD[T] = {
-    new JavaRDD(this)(elementClassManifest)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/RDDCheckpointData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala
deleted file mode 100644
index b615f82..0000000
--- a/core/src/main/scala/spark/RDDCheckpointData.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.conf.Configuration
-import rdd.{CheckpointRDD, CoalescedRDD}
-import scheduler.{ResultTask, ShuffleMapTask}
-
-/**
- * Enumeration to manage state transitions of an RDD through checkpointing
- * [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ]
- */
-private[spark] object CheckpointState extends Enumeration {
-  type CheckpointState = Value
-  val Initialized, MarkedForCheckpoint, CheckpointingInProgress, Checkpointed = Value
-}
-
-/**
- * This class contains all the information related to RDD checkpointing. Each instance of this class
- * is associated with a RDD. It manages process of checkpointing of the associated RDD, as well as,
- * manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations
- * of the checkpointed RDD.
- */
-private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
-  extends Logging with Serializable {
-
-  import CheckpointState._
-
-  // The checkpoint state of the associated RDD.
-  var cpState = Initialized
-
-  // The file to which the associated RDD has been checkpointed to
-  @transient var cpFile: Option[String] = None
-
-  // The CheckpointRDD created from the checkpoint file, that is, the new parent the associated RDD.
-  var cpRDD: Option[RDD[T]] = None
-
-  // Mark the RDD for checkpointing
-  def markForCheckpoint() {
-    RDDCheckpointData.synchronized {
-      if (cpState == Initialized) cpState = MarkedForCheckpoint
-    }
-  }
-
-  // Is the RDD already checkpointed
-  def isCheckpointed: Boolean = {
-    RDDCheckpointData.synchronized { cpState == Checkpointed }
-  }
-
-  // Get the file to which this RDD was checkpointed to as an Option
-  def getCheckpointFile: Option[String] = {
-    RDDCheckpointData.synchronized { cpFile }
-  }
-
-  // Do the checkpointing of the RDD. Called after the first job using that RDD is over.
-  def doCheckpoint() {
-    // If it is marked for checkpointing AND checkpointing is not already in progress,
-    // then set it to be in progress, else return
-    RDDCheckpointData.synchronized {
-      if (cpState == MarkedForCheckpoint) {
-        cpState = CheckpointingInProgress
-      } else {
-        return
-      }
-    }
-
-    // Create the output path for the checkpoint
-    val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
-    val fs = path.getFileSystem(new Configuration())
-    if (!fs.mkdirs(path)) {
-      throw new SparkException("Failed to create checkpoint path " + path)
-    }
-
-    // Save to file, and reload it as an RDD
-    rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
-    val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
-
-    // Change the dependencies and partitions of the RDD
-    RDDCheckpointData.synchronized {
-      cpFile = Some(path.toString)
-      cpRDD = Some(newRDD)
-      rdd.markCheckpointed(newRDD)   // Update the RDD's dependencies and partitions
-      cpState = Checkpointed
-      RDDCheckpointData.clearTaskCaches()
-      logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
-    }
-  }
-
-  // Get preferred location of a split after checkpointing
-  def getPreferredLocations(split: Partition): Seq[String] = {
-    RDDCheckpointData.synchronized {
-      cpRDD.get.preferredLocations(split)
-    }
-  }
-
-  def getPartitions: Array[Partition] = {
-    RDDCheckpointData.synchronized {
-      cpRDD.get.partitions
-    }
-  }
-
-  def checkpointRDD: Option[RDD[T]] = {
-    RDDCheckpointData.synchronized {
-      cpRDD
-    }
-  }
-}
-
-private[spark] object RDDCheckpointData {
-  def clearTaskCaches() {
-    ShuffleMapTask.clearCache()
-    ResultTask.clearCache()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
deleted file mode 100644
index 9f30b7f..0000000
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io.EOFException
-import java.net.URL
-import java.io.ObjectInputStream
-import java.util.concurrent.atomic.AtomicLong
-import java.util.HashSet
-import java.util.Random
-import java.util.Date
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.Map
-import scala.collection.mutable.HashMap
-
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.OutputFormat
-import org.apache.hadoop.mapred.TextOutputFormat
-import org.apache.hadoop.mapred.SequenceFileOutputFormat
-import org.apache.hadoop.mapred.OutputCommitter
-import org.apache.hadoop.mapred.FileOutputCommitter
-import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.Text
-
-import spark.SparkContext._
-
-/**
- * Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile,
- * through an implicit conversion. Note that this can't be part of PairRDDFunctions because
- * we need more implicit parameters to convert our keys and values to Writable.
- *
- * Users should import `spark.SparkContext._` at the top of their program to use these functions.
- */
-class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](
-    self: RDD[(K, V)])
-  extends Logging
-  with Serializable {
-
-  private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
-    val c = {
-      if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
-        classManifest[T].erasure
-      } else {
-        // We get the type of the Writable class by looking at the apply method which converts
-        // from T to Writable. Since we have two apply methods we filter out the one which
-        // is not of the form "java.lang.Object apply(java.lang.Object)"
-        implicitly[T => Writable].getClass.getDeclaredMethods().filter(
-            m => m.getReturnType().toString != "class java.lang.Object" &&
-                 m.getName() == "apply")(0).getReturnType
-
-      }
-       // TODO: use something like WritableConverter to avoid reflection
-    }
-    c.asInstanceOf[Class[_ <: Writable]]
-  }
-
-  /**
-   * Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key
-   * and value types. If the key or value are Writable, then we use their classes directly;
-   * otherwise we map primitive types such as Int and Double to IntWritable, DoubleWritable, etc,
-   * byte arrays to BytesWritable, and Strings to Text. The `path` can be on any Hadoop-supported
-   * file system.
-   */
-  def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) {
-    def anyToWritable[U <% Writable](u: U): Writable = u
-
-    val keyClass = getWritableClass[K]
-    val valueClass = getWritableClass[V]
-    val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
-    val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
-
-    logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
-    val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
-    val jobConf = new JobConf(self.context.hadoopConfiguration)
-    if (!convertKey && !convertValue) {
-      self.saveAsHadoopFile(path, keyClass, valueClass, format, jobConf, codec)
-    } else if (!convertKey && convertValue) {
-      self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(
-        path, keyClass, valueClass, format, jobConf, codec)
-    } else if (convertKey && !convertValue) {
-      self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(
-        path, keyClass, valueClass, format, jobConf, codec)
-    } else if (convertKey && convertValue) {
-      self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(
-        path, keyClass, valueClass, format, jobConf, codec)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/SerializableWritable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/SerializableWritable.scala b/core/src/main/scala/spark/SerializableWritable.scala
deleted file mode 100644
index 936d8e6..0000000
--- a/core/src/main/scala/spark/SerializableWritable.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io._
-
-import org.apache.hadoop.io.ObjectWritable
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.conf.Configuration
-
-class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
-  def value = t
-  override def toString = t.toString
-
-  private def writeObject(out: ObjectOutputStream) {
-    out.defaultWriteObject()
-    new ObjectWritable(t).write(out)
-  }
-
-  private def readObject(in: ObjectInputStream) {
-    in.defaultReadObject()
-    val ow = new ObjectWritable()
-    ow.setConf(new Configuration())
-    ow.readFields(in)
-    t = ow.get().asInstanceOf[T]
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ShuffleFetcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ShuffleFetcher.scala b/core/src/main/scala/spark/ShuffleFetcher.scala
deleted file mode 100644
index a6839cf..0000000
--- a/core/src/main/scala/spark/ShuffleFetcher.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import spark.executor.TaskMetrics
-import spark.serializer.Serializer
-
-
-private[spark] abstract class ShuffleFetcher {
-
-  /**
-   * Fetch the shuffle outputs for a given ShuffleDependency.
-   * @return An iterator over the elements of the fetched shuffle outputs.
-   */
-  def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics,
-      serializer: Serializer = SparkEnv.get.serializerManager.default): Iterator[T]
-
-  /** Stop the fetcher */
-  def stop() {}
-}


Mime
View raw message