spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [45/69] [abbrv] [partial] Initial work to rename package to org.apache.spark
Date Sun, 01 Sep 2013 22:00:51 GMT
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
new file mode 100644
index 0000000..0f422d9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io._
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+
+import akka.actor._
+import akka.dispatch._
+import akka.pattern.ask
+import akka.remote._
+import akka.util.Duration
+
+
+import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
+
+
+private[spark] sealed trait MapOutputTrackerMessage
+private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
+  extends MapOutputTrackerMessage
+private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
+
+private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
+  def receive = {
+    case GetMapOutputStatuses(shuffleId: Int, requester: String) =>
+      logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + requester)
+      sender ! tracker.getSerializedLocations(shuffleId)
+
+    case StopMapOutputTracker =>
+      logInfo("MapOutputTrackerActor stopped!")
+      sender ! true
+      context.stop(self)
+  }
+}
+
+private[spark] class MapOutputTracker extends Logging {
+
+  private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+  
+  // Set to the MapOutputTrackerActor living on the driver
+  var trackerActor: ActorRef = _
+
+  private var mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
+
+  // Incremented every time a fetch fails so that client nodes know to clear
+  // their cache of map output locations if this happens.
+  private var epoch: Long = 0
+  private val epochLock = new java.lang.Object
+
+  // Cache a serialized version of the output statuses for each shuffle to send them out faster
+  var cacheEpoch = epoch
+  private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]
+
+  val metadataCleaner = new MetadataCleaner("MapOutputTracker", this.cleanup)
+
+  // Send a message to the trackerActor and get its result within a default timeout, or
+  // throw a SparkException if this fails.
+  def askTracker(message: Any): Any = {
+    try {
+      val future = trackerActor.ask(message)(timeout)
+      return Await.result(future, timeout)
+    } catch {
+      case e: Exception =>
+        throw new SparkException("Error communicating with MapOutputTracker", e)
+    }
+  }
+
+  // Send a one-way message to the trackerActor, to which we expect it to reply with true.
+  def communicate(message: Any) {
+    if (askTracker(message) != true) {
+      throw new SparkException("Error reply received from MapOutputTracker")
+    }
+  }
+
+  def registerShuffle(shuffleId: Int, numMaps: Int) {
+    if (mapStatuses.putIfAbsent(shuffleId, new Array[MapStatus](numMaps)).isDefined) {
+      throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
+    }
+  }
+
+  def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) {
+    var array = mapStatuses(shuffleId)
+    array.synchronized {
+      array(mapId) = status
+    }
+  }
+
+  def registerMapOutputs(
+      shuffleId: Int,
+      statuses: Array[MapStatus],
+      changeEpoch: Boolean = false) {
+    mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses)
+    if (changeEpoch) {
+      incrementEpoch()
+    }
+  }
+
+  def unregisterMapOutput(shuffleId: Int, mapId: Int, bmAddress: BlockManagerId) {
+    var arrayOpt = mapStatuses.get(shuffleId)
+    if (arrayOpt.isDefined && arrayOpt.get != null) {
+      var array = arrayOpt.get
+      array.synchronized {
+        if (array(mapId) != null && array(mapId).location == bmAddress) {
+          array(mapId) = null
+        }
+      }
+      incrementEpoch()
+    } else {
+      throw new SparkException("unregisterMapOutput called for nonexistent shuffle ID")
+    }
+  }
+
+  // Remembers which map output locations are currently being fetched on a worker
+  private val fetching = new HashSet[Int]
+
+  // Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle
+  def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
+    val statuses = mapStatuses.get(shuffleId).orNull
+    if (statuses == null) {
+      logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
+      var fetchedStatuses: Array[MapStatus] = null
+      fetching.synchronized {
+        if (fetching.contains(shuffleId)) {
+          // Someone else is fetching it; wait for them to be done
+          while (fetching.contains(shuffleId)) {
+            try {
+              fetching.wait()
+            } catch {
+              case e: InterruptedException =>
+            }
+          }
+        }
+
+        // Either while we waited the fetch happened successfully, or
+        // someone fetched it in between the get and the fetching.synchronized.
+        fetchedStatuses = mapStatuses.get(shuffleId).orNull
+        if (fetchedStatuses == null) {
+          // We have to do the fetch, get others to wait for us.
+          fetching += shuffleId
+        }
+      }
+      
+      if (fetchedStatuses == null) {
+        // We won the race to fetch the output locs; do so
+        logInfo("Doing the fetch; tracker actor = " + trackerActor)
+        val hostPort = Utils.localHostPort()
+        // This try-finally prevents hangs due to timeouts:
+        try {
+          val fetchedBytes =
+            askTracker(GetMapOutputStatuses(shuffleId, hostPort)).asInstanceOf[Array[Byte]]
+          fetchedStatuses = deserializeStatuses(fetchedBytes)
+          logInfo("Got the output locations")
+          mapStatuses.put(shuffleId, fetchedStatuses)
+        } finally {
+          fetching.synchronized {
+            fetching -= shuffleId
+            fetching.notifyAll()
+          }
+        }
+      }
+      if (fetchedStatuses != null) {
+        fetchedStatuses.synchronized {
+          return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
+        }
+      }
+      else{
+        throw new FetchFailedException(null, shuffleId, -1, reduceId,
+          new Exception("Missing all output locations for shuffle " + shuffleId))
+      }      
+    } else {
+      statuses.synchronized {
+        return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
+      }
+    }
+  }
+
+  private def cleanup(cleanupTime: Long) {
+    mapStatuses.clearOldValues(cleanupTime)
+    cachedSerializedStatuses.clearOldValues(cleanupTime)
+  }
+
+  def stop() {
+    communicate(StopMapOutputTracker)
+    mapStatuses.clear()
+    metadataCleaner.cancel()
+    trackerActor = null
+  }
+
+  // Called on master to increment the epoch number
+  def incrementEpoch() {
+    epochLock.synchronized {
+      epoch += 1
+      logDebug("Increasing epoch to " + epoch)
+    }
+  }
+
+  // Called on master or workers to get current epoch number
+  def getEpoch: Long = {
+    epochLock.synchronized {
+      return epoch
+    }
+  }
+
+  // Called on workers to update the epoch number, potentially clearing old outputs
+  // because of a fetch failure. (Each worker task calls this with the latest epoch
+  // number on the master at the time it was created.)
+  def updateEpoch(newEpoch: Long) {
+    epochLock.synchronized {
+      if (newEpoch > epoch) {
+        logInfo("Updating epoch to " + newEpoch + " and clearing cache")
+        // mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
+        mapStatuses.clear()
+        epoch = newEpoch
+      }
+    }
+  }
+
+  def getSerializedLocations(shuffleId: Int): Array[Byte] = {
+    var statuses: Array[MapStatus] = null
+    var epochGotten: Long = -1
+    epochLock.synchronized {
+      if (epoch > cacheEpoch) {
+        cachedSerializedStatuses.clear()
+        cacheEpoch = epoch
+      }
+      cachedSerializedStatuses.get(shuffleId) match {
+        case Some(bytes) =>
+          return bytes
+        case None =>
+          statuses = mapStatuses(shuffleId)
+          epochGotten = epoch
+      }
+    }
+    // If we got here, we failed to find the serialized locations in the cache, so we pulled
+    // out a snapshot of the locations as "locs"; let's serialize and return that
+    val bytes = serializeStatuses(statuses)
+    logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
+    // Add them into the table only if the epoch hasn't changed while we were working
+    epochLock.synchronized {
+      if (epoch == epochGotten) {
+        cachedSerializedStatuses(shuffleId) = bytes
+      }
+    }
+    return bytes
+  }
+
+  // Serialize an array of map output locations into an efficient byte format so that we can send
+  // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
+  // generally be pretty compressible because many map outputs will be on the same hostname.
+  private def serializeStatuses(statuses: Array[MapStatus]): Array[Byte] = {
+    val out = new ByteArrayOutputStream
+    val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
+    // Since statuses can be modified in parallel, sync on it
+    statuses.synchronized {
+      objOut.writeObject(statuses)
+    }
+    objOut.close()
+    out.toByteArray
+  }
+
+  // Opposite of serializeStatuses.
+  def deserializeStatuses(bytes: Array[Byte]): Array[MapStatus] = {
+    val objIn = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)))
+    objIn.readObject().
+      // // drop all null's from status - not sure why they are occuring though. Causes NPE downstream in slave if present
+      // comment this out - nulls could be due to missing location ? 
+      asInstanceOf[Array[MapStatus]] // .filter( _ != null )
+  }
+}
+
+private[spark] object MapOutputTracker {
+  private val LOG_BASE = 1.1
+
+  // Convert an array of MapStatuses to locations and sizes for a given reduce ID. If
+  // any of the statuses is null (indicating a missing location due to a failed mapper),
+  // throw a FetchFailedException.
+  private def convertMapStatuses(
+        shuffleId: Int,
+        reduceId: Int,
+        statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
+    assert (statuses != null)
+    statuses.map {
+      status => 
+        if (status == null) {
+          throw new FetchFailedException(null, shuffleId, -1, reduceId,
+            new Exception("Missing an output location for shuffle " + shuffleId))
+        } else {
+          (status.location, decompressSize(status.compressedSizes(reduceId)))
+        }
+    }
+  }
+
+  /**
+   * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
+   * We do this by encoding the log base 1.1 of the size as an integer, which can support
+   * sizes up to 35 GB with at most 10% error.
+   */
+  def compressSize(size: Long): Byte = {
+    if (size == 0) {
+      0
+    } else if (size <= 1L) {
+      1
+    } else {
+      math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
+    }
+  }
+
+  /**
+   * Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
+   */
+  def decompressSize(compressedSize: Byte): Long = {
+    if (compressedSize == 0) {
+      0
+    } else {
+      math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/PairRDDFunctions.scala
new file mode 100644
index 0000000..d046e7c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/PairRDDFunctions.scala
@@ -0,0 +1,703 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.nio.ByteBuffer
+import java.util.{Date, HashMap => JHashMap}
+import java.text.SimpleDateFormat
+
+import scala.collection.{mutable, Map}
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.io.SequenceFile.CompressionType
+import org.apache.hadoop.mapred.FileOutputCommitter
+import org.apache.hadoop.mapred.FileOutputFormat
+import org.apache.hadoop.mapred.SparkHadoopWriter
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.OutputFormat
+
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat,
+    RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, SparkHadoopMapReduceUtil}
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.partial.BoundedDouble
+import org.apache.spark.partial.PartialResult
+import org.apache.spark.rdd._
+import org.apache.spark.SparkContext._
+import org.apache.spark.Partitioner._
+
+/**
+ * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
+ * Import `spark.SparkContext._` at the top of your program to use these functions.
+ */
+class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
+  extends Logging
+  with SparkHadoopMapReduceUtil
+  with Serializable {
+
+  /**
+   * Generic function to combine the elements for each key using a custom set of aggregation
+   * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
+   * Note that V and C can be different -- for example, one might group an RDD of type
+   * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
+   *
+   * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
+   * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
+   * - `mergeCombiners`, to combine two C's into a single one.
+   *
+   * In addition, users can control the partitioning of the output RDD, and whether to perform
+   * map-side aggregation (if a mapper can produce multiple items with the same key).
+   */
+  def combineByKey[C](createCombiner: V => C,
+      mergeValue: (C, V) => C,
+      mergeCombiners: (C, C) => C,
+      partitioner: Partitioner,
+      mapSideCombine: Boolean = true,
+      serializerClass: String = null): RDD[(K, C)] = {
+    if (getKeyClass().isArray) {
+      if (mapSideCombine) {
+        throw new SparkException("Cannot use map-side combining with array keys.")
+      }
+      if (partitioner.isInstanceOf[HashPartitioner]) {
+        throw new SparkException("Default partitioner cannot partition array keys.")
+      }
+    }
+    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
+    if (self.partitioner == Some(partitioner)) {
+      self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+    } else if (mapSideCombine) {
+      val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+      val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
+        .setSerializer(serializerClass)
+      partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true)
+    } else {
+      // Don't apply map-side combiner.
+      // A sanity check to make sure mergeCombiners is not defined.
+      assert(mergeCombiners == null)
+      val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
+      values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+    }
+  }
+
+  /**
+   * Simplified version of combineByKey that hash-partitions the output RDD.
+   */
+  def combineByKey[C](createCombiner: V => C,
+      mergeValue: (C, V) => C,
+      mergeCombiners: (C, C) => C,
+      numPartitions: Int): RDD[(K, C)] = {
+    combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
+  }
+
+  /**
+   * Merge the values for each key using an associative function and a neutral "zero value" which may
+   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+   * list concatenation, 0 for addition, or 1 for multiplication.).
+   */
+  def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
+    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
+    val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
+    val zeroArray = new Array[Byte](zeroBuffer.limit)
+    zeroBuffer.get(zeroArray)
+
+    // When deserializing, use a lazy val to create just one instance of the serializer per task
+    lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
+    def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
+
+    combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
+  }
+
+  /**
+   * Merge the values for each key using an associative function and a neutral "zero value" which may
+   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+   * list concatenation, 0 for addition, or 1 for multiplication.).
+   */
+  def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = {
+    foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
+  }
+
+  /**
+   * Merge the values for each key using an associative function and a neutral "zero value" which may
+   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+   * list concatenation, 0 for addition, or 1 for multiplication.).
+   */
+  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = {
+    foldByKey(zeroValue, defaultPartitioner(self))(func)
+  }
+
+  /**
+   * Merge the values for each key using an associative reduce function. This will also perform
+   * the merging locally on each mapper before sending results to a reducer, similarly to a
+   * "combiner" in MapReduce.
+   */
+  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
+    combineByKey[V]((v: V) => v, func, func, partitioner)
+  }
+
+  /**
+   * Merge the values for each key using an associative reduce function, but return the results
+   * immediately to the master as a Map. This will also perform the merging locally on each mapper
+   * before sending results to a reducer, similarly to a "combiner" in MapReduce.
+   */
+  def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
+
+    if (getKeyClass().isArray) {
+      throw new SparkException("reduceByKeyLocally() does not support array keys")
+    }
+
+    def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
+      val map = new JHashMap[K, V]
+      iter.foreach { case (k, v) =>
+        val old = map.get(k)
+        map.put(k, if (old == null) v else func(old, v))
+      }
+      Iterator(map)
+    }
+
+    def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
+      m2.foreach { case (k, v) =>
+        val old = m1.get(k)
+        m1.put(k, if (old == null) v else func(old, v))
+      }
+      m1
+    }
+
+    self.mapPartitions(reducePartition).reduce(mergeMaps)
+  }
+
+  /** Alias for reduceByKeyLocally */
+  def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func)
+
+  /** Count the number of elements for each key, and return the result to the master as a Map. */
+  def countByKey(): Map[K, Long] = self.map(_._1).countByValue()
+
+  /**
+   * (Experimental) Approximate version of countByKey that can return a partial result if it does
+   * not finish within a timeout.
+   */
+  def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
+      : PartialResult[Map[K, BoundedDouble]] = {
+    self.map(_._1).countByValueApprox(timeout, confidence)
+  }
+
+  /**
+   * Merge the values for each key using an associative reduce function. This will also perform
+   * the merging locally on each mapper before sending results to a reducer, similarly to a
+   * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
+   */
+  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
+    reduceByKey(new HashPartitioner(numPartitions), func)
+  }
+
+  /**
+   * Group the values for each key in the RDD into a single sequence. Allows controlling the
+   * partitioning of the resulting key-value pair RDD by passing a Partitioner.
+   */
+  def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
+    // groupByKey shouldn't use map side combine because map side combine does not
+    // reduce the amount of data shuffled and requires all map side data be inserted
+    // into a hash table, leading to more objects in the old gen.
+    def createCombiner(v: V) = ArrayBuffer(v)
+    def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
+    val bufs = combineByKey[ArrayBuffer[V]](
+      createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
+    bufs.asInstanceOf[RDD[(K, Seq[V])]]
+  }
+
+  /**
+   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
+   * resulting RDD with into `numPartitions` partitions.
+   */
+  def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = {
+    groupByKey(new HashPartitioner(numPartitions))
+  }
+
+  /**
+   * Return a copy of the RDD partitioned using the specified partitioner.
+   */
+  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
+    if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
+      throw new SparkException("Default partitioner cannot partition array keys.")
+    }
+    new ShuffledRDD[K, V, (K, V)](self, partitioner)
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+   * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
+   */
+  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
+    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+      for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
+    }
+  }
+
+  /**
+   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
+   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
+   * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
+   * partition the output RDD.
+   */
+  def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
+    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+      if (ws.isEmpty) {
+        vs.iterator.map(v => (v, None))
+      } else {
+        for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
+      }
+    }
+  }
+
+  /**
+   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
+   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
+   * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
+   * partition the output RDD.
+   */
+  def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
+      : RDD[(K, (Option[V], W))] = {
+    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+      if (vs.isEmpty) {
+        ws.iterator.map(w => (None, w))
+      } else {
+        for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
+      }
+    }
+  }
+
+  /**
+   * Simplified version of combineByKey that hash-partitions the resulting RDD using the
+   * existing partitioner/parallelism level.
+   */
+  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
+      : RDD[(K, C)] = {
+    combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
+  }
+
+  /**
+   * Merge the values for each key using an associative reduce function. This will also perform
+   * the merging locally on each mapper before sending results to a reducer, similarly to a
+   * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
+   * parallelism level.
+   */
+  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
+    reduceByKey(defaultPartitioner(self), func)
+  }
+
+  /**
+   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
+   * resulting RDD with the existing partitioner/parallelism level.
+   */
+  def groupByKey(): RDD[(K, Seq[V])] = {
+    groupByKey(defaultPartitioner(self))
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+   * (k, v2) is in `other`. Performs a hash join across the cluster.
+   */
+  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
+    join(other, defaultPartitioner(self, other))
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+   * (k, v2) is in `other`. Performs a hash join across the cluster.
+   */
+  def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = {
+    join(other, new HashPartitioner(numPartitions))
+  }
+
+  /**
+   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
+   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
+   * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
+   * using the existing partitioner/parallelism level.
+   */
+  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
+    leftOuterJoin(other, defaultPartitioner(self, other))
+  }
+
+  /**
+   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
+   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
+   * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
+   * into `numPartitions` partitions.
+   */
+  def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = {
+    leftOuterJoin(other, new HashPartitioner(numPartitions))
+  }
+
+  /**
+   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
+   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
+   * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
+   * RDD using the existing partitioner/parallelism level.
+   */
+  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
+    rightOuterJoin(other, defaultPartitioner(self, other))
+  }
+
+  /**
+   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
+   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
+   * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
+   * RDD into the given number of partitions.
+   */
+  def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = {
+    rightOuterJoin(other, new HashPartitioner(numPartitions))
+  }
+
+  /**
+   * Return the key-value pairs in this RDD to the master as a Map.
+   */
+  def collectAsMap(): Map[K, V] = {
+    val data = self.toArray()
+    val map = new mutable.HashMap[K, V]
+    map.sizeHint(data.length)
+    data.foreach { case (k, v) => map.put(k, v) }
+    map
+  }
+
+  /**
+   * Pass each value in the key-value pair RDD through a map function without changing the keys;
+   * this also retains the original RDD's partitioning.
+   */
+  def mapValues[U](f: V => U): RDD[(K, U)] = {
+    val cleanF = self.context.clean(f)
+    new MappedValuesRDD(self, cleanF)
+  }
+
+  /**
+   * Pass each value in the key-value pair RDD through a flatMap function without changing the
+   * keys; this also retains the original RDD's partitioning.
+   */
+  def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
+    val cleanF = self.context.clean(f)
+    new FlatMappedValuesRDD(self, cleanF)
+  }
+
+  /**
+   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+   * list of values for that key in `this` as well as `other`.
+   */
+  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
+    if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
+      throw new SparkException("Default partitioner cannot partition array keys.")
+    }
+    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
+    val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
+    prfs.mapValues { case Seq(vs, ws) =>
+      (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
+    }
+  }
+
+  /**
+   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
+   * tuple with the list of values for that key in `this`, `other1` and `other2`.
+   */
+  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
+      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+    if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
+      throw new SparkException("Default partitioner cannot partition array keys.")
+    }
+    val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
+    val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
+    prfs.mapValues { case Seq(vs, w1s, w2s) =>
+      (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
+    }
+  }
+
+  /**
+   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+   * list of values for that key in `this` as well as `other`.
+   */
+  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+    cogroup(other, defaultPartitioner(self, other))
+  }
+
+  /**
+   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
+   * tuple with the list of values for that key in `this`, `other1` and `other2`.
+   */
+  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
+      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+    cogroup(other1, other2, defaultPartitioner(self, other1, other2))
+  }
+
+  /**
+   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+   * list of values for that key in `this` as well as `other`.
+   */
+  def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
+    cogroup(other, new HashPartitioner(numPartitions))
+  }
+
+  /**
+   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
+   * tuple with the list of values for that key in `this`, `other1` and `other2`.
+   */
+  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
+      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+    cogroup(other1, other2, new HashPartitioner(numPartitions))
+  }
+
+  /** Alias for cogroup. */
+  def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+    cogroup(other, defaultPartitioner(self, other))
+  }
+
+  /** Alias for cogroup. */
+  def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
+      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+    cogroup(other1, other2, defaultPartitioner(self, other1, other2))
+  }
+
+  /**
+   * Return an RDD with the pairs from `this` whose keys are not in `other`.
+   *
+   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+   * RDD will be <= us.
+   */
+  def subtractByKey[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, V)] =
+    subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size)))
+
+  /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
+  def subtractByKey[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
+    subtractByKey(other, new HashPartitioner(numPartitions))
+
+  /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
+  def subtractByKey[W: ClassManifest](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] =
+    new SubtractedRDD[K, V, W](self, other, p)
+
+  /**
+   * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
+   * RDD has a known partitioner by only searching the partition that the key maps to.
+   */
+  def lookup(key: K): Seq[V] = {
+    self.partitioner match {
+      case Some(p) =>
+        val index = p.getPartition(key)
+        def process(it: Iterator[(K, V)]): Seq[V] = {
+          val buf = new ArrayBuffer[V]
+          for ((k, v) <- it if k == key) {
+            buf += v
+          }
+          buf
+        }
+        val res = self.context.runJob(self, process _, Array(index), false)
+        res(0)
+      case None =>
+        self.filter(_._1 == key).map(_._2).collect()
+    }
+  }
+
+  /**
+   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+   * supporting the key and value types K and V in this RDD.
+   */
+  def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
+    saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+  }
+
+  /**
+   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+   * supporting the key and value types K and V in this RDD. Compress the result with the
+   * supplied codec.
+   */
+  def saveAsHadoopFile[F <: OutputFormat[K, V]](
+      path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassManifest[F]) {
+    saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]], codec)
+  }
+
+  /**
+   * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
+   * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
+   */
+  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
+    saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+  }
+
+  /**
+   * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
+   * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
+   */
+  def saveAsNewAPIHadoopFile(
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+      conf: Configuration = self.context.hadoopConfiguration) {
+    val job = new NewAPIHadoopJob(conf)
+    job.setOutputKeyClass(keyClass)
+    job.setOutputValueClass(valueClass)
+    val wrappedConf = new SerializableWritable(job.getConfiguration)
+    NewFileOutputFormat.setOutputPath(job, new Path(path))
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    val jobtrackerID = formatter.format(new Date())
+    val stageId = self.id
+    def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
+      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
+      // around by taking a mod. We expect that no task will be attempted 2 billion times.
+      val attemptNumber = (context.attemptId % Int.MaxValue).toInt
+      /* "reduce task" <split #> <attempt # = spark task #> */
+      val attemptId = newTaskAttemptID(jobtrackerID, stageId, false, context.splitId, attemptNumber)
+      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
+      val format = outputFormatClass.newInstance
+      val committer = format.getOutputCommitter(hadoopContext)
+      committer.setupTask(hadoopContext)
+      val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
+      while (iter.hasNext) {
+        val (k, v) = iter.next()
+        writer.write(k, v)
+      }
+      writer.close(hadoopContext)
+      committer.commitTask(hadoopContext)
+      return 1
+    }
+    val jobFormat = outputFormatClass.newInstance
+    /* apparently we need a TaskAttemptID to construct an OutputCommitter;
+     * however we're only going to use this local OutputCommitter for
+     * setupJob/commitJob, so we just use a dummy "map" task.
+     */
+    val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, true, 0, 0)
+    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
+    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
+    jobCommitter.setupJob(jobTaskContext)
+    val count = self.context.runJob(self, writeShard _).sum
+    jobCommitter.commitJob(jobTaskContext)
+    jobCommitter.cleanupJob(jobTaskContext)
+  }
+
+  /**
+   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+   * supporting the key and value types K and V in this RDD. Compress with the supplied codec.
+   */
+  def saveAsHadoopFile(
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[_ <: OutputFormat[_, _]],
+      codec: Class[_ <: CompressionCodec]) {
+    saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass,
+      new JobConf(self.context.hadoopConfiguration), Some(codec))
+  }
+
+  /**
+   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+   * supporting the key and value types K and V in this RDD.
+   */
+  def saveAsHadoopFile(
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[_ <: OutputFormat[_, _]],
+      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
+      codec: Option[Class[_ <: CompressionCodec]] = None) {
+    conf.setOutputKeyClass(keyClass)
+    conf.setOutputValueClass(valueClass)
+    // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug
+    conf.set("mapred.output.format.class", outputFormatClass.getName)
+    for (c <- codec) {
+      conf.setCompressMapOutput(true)
+      conf.set("mapred.output.compress", "true")
+      conf.setMapOutputCompressorClass(c)
+      conf.set("mapred.output.compression.codec", c.getCanonicalName)
+      conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
+    }
+    conf.setOutputCommitter(classOf[FileOutputCommitter])
+    FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
+    saveAsHadoopDataset(conf)
+  }
+
+  /**
+   * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
+   * that storage system. The JobConf should set an OutputFormat and any output paths required
+   * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
+   * MapReduce job.
+   */
+  def saveAsHadoopDataset(conf: JobConf) {
+    val outputFormatClass = conf.getOutputFormat
+    val keyClass = conf.getOutputKeyClass
+    val valueClass = conf.getOutputValueClass
+    if (outputFormatClass == null) {
+      throw new SparkException("Output format class not set")
+    }
+    if (keyClass == null) {
+      throw new SparkException("Output key class not set")
+    }
+    if (valueClass == null) {
+      throw new SparkException("Output value class not set")
+    }
+
+    logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
+
+    val writer = new SparkHadoopWriter(conf)
+    writer.preSetup()
+
+    def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
+      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
+      // around by taking a mod. We expect that no task will be attempted 2 billion times.
+      val attemptNumber = (context.attemptId % Int.MaxValue).toInt
+
+      writer.setup(context.stageId, context.splitId, attemptNumber)
+      writer.open()
+
+      var count = 0
+      while(iter.hasNext) {
+        val record = iter.next()
+        count += 1
+        writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
+      }
+
+      writer.close()
+      writer.commit()
+    }
+
+    self.context.runJob(self, writeToFile _)
+    writer.commitJob()
+    writer.cleanup()
+  }
+
+  /**
+   * Return an RDD with the keys of each tuple.
+   */
+  def keys: RDD[K] = self.map(_._1)
+
+  /**
+   * Return an RDD with the values of each tuple.
+   */
+  def values: RDD[V] = self.map(_._2)
+
+  private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
+
+  private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
+}
+
+
+private[spark] object Manifests {
+  val seqSeqManifest = classManifest[Seq[Seq[_]]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partition.scala b/core/src/main/scala/org/apache/spark/Partition.scala
new file mode 100644
index 0000000..87914a0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/Partition.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * A partition of an RDD.
+ */
+trait Partition extends Serializable {
+  /**
+   * Get the split's index within its parent RDD
+   */
+  def index: Int
+  
+  // A better default implementation of HashCode
+  override def hashCode(): Int = index
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
new file mode 100644
index 0000000..4dce260
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * An object that defines how the elements in a key-value pair RDD are partitioned by key.
+ * Maps each key to a partition ID, from 0 to `numPartitions - 1`.
+ */
+abstract class Partitioner extends Serializable {
+  def numPartitions: Int
+  def getPartition(key: Any): Int
+}
+
+object Partitioner {
+  /**
+   * Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
+   *
+   * If any of the RDDs already has a partitioner, choose that one.
+   *
+   * Otherwise, we use a default HashPartitioner. For the number of partitions, if
+   * spark.default.parallelism is set, then we'll use the value from SparkContext
+   * defaultParallelism, otherwise we'll use the max number of upstream partitions.
+   *
+   * Unless spark.default.parallelism is set, He number of partitions will be the
+   * same as the number of partitions in the largest upstream RDD, as this should
+   * be least likely to cause out-of-memory errors.
+   *
+   * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
+   */
+  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
+    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
+    for (r <- bySize if r.partitioner != None) {
+      return r.partitioner.get
+    }
+    if (System.getProperty("spark.default.parallelism") != null) {
+      return new HashPartitioner(rdd.context.defaultParallelism)
+    } else {
+      return new HashPartitioner(bySize.head.partitions.size)
+    }
+  }
+}
+
+/**
+ * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`.
+ *
+ * Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
+ * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
+ * produce an unexpected or incorrect result.
+ */
+class HashPartitioner(partitions: Int) extends Partitioner {
+  def numPartitions = partitions
+
+  def getPartition(key: Any): Int = key match {
+    case null => 0
+    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
+  }
+  
+  override def equals(other: Any): Boolean = other match {
+    case h: HashPartitioner =>
+      h.numPartitions == numPartitions
+    case _ =>
+      false
+  }
+}
+
+/**
+ * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly equal ranges.
+ * Determines the ranges by sampling the RDD passed in.
+ */
+class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
+    partitions: Int,
+    @transient rdd: RDD[_ <: Product2[K,V]],
+    private val ascending: Boolean = true) 
+  extends Partitioner {
+
+  // An array of upper bounds for the first (partitions - 1) partitions
+  private val rangeBounds: Array[K] = {
+    if (partitions == 1) {
+      Array()
+    } else {
+      val rddSize = rdd.count()
+      val maxSampleSize = partitions * 20.0
+      val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0)
+      val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sortWith(_ < _)
+      if (rddSample.length == 0) {
+        Array()
+      } else {
+        val bounds = new Array[K](partitions - 1)
+        for (i <- 0 until partitions - 1) {
+          val index = (rddSample.length - 1) * (i + 1) / partitions
+          bounds(i) = rddSample(index)
+        }
+        bounds
+      }
+    }
+  }
+
+  def numPartitions = partitions
+
+  def getPartition(key: Any): Int = {
+    // TODO: Use a binary search here if number of partitions is large
+    val k = key.asInstanceOf[K]
+    var partition = 0
+    while (partition < rangeBounds.length && k > rangeBounds(partition)) {
+      partition += 1
+    }
+    if (ascending) {
+      partition
+    } else {
+      rangeBounds.length - partition
+    }
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case r: RangePartitioner[_,_] =>
+      r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
+    case _ =>
+      false
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/RDD.scala b/core/src/main/scala/org/apache/spark/RDD.scala
new file mode 100644
index 0000000..0d1f07f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/RDD.scala
@@ -0,0 +1,957 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.Random
+
+import scala.collection.Map
+import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.io.BytesWritable
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapred.TextOutputFormat
+
+import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+
+import org.apache.spark.Partitioner._
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.partial.BoundedDouble
+import org.apache.spark.partial.CountEvaluator
+import org.apache.spark.partial.GroupedCountEvaluator
+import org.apache.spark.partial.PartialResult
+import org.apache.spark.rdd.CoalescedRDD
+import org.apache.spark.rdd.CartesianRDD
+import org.apache.spark.rdd.FilteredRDD
+import org.apache.spark.rdd.FlatMappedRDD
+import org.apache.spark.rdd.GlommedRDD
+import org.apache.spark.rdd.MappedRDD
+import org.apache.spark.rdd.MapPartitionsRDD
+import org.apache.spark.rdd.MapPartitionsWithIndexRDD
+import org.apache.spark.rdd.PipedRDD
+import org.apache.spark.rdd.SampledRDD
+import org.apache.spark.rdd.ShuffledRDD
+import org.apache.spark.rdd.UnionRDD
+import org.apache.spark.rdd.ZippedRDD
+import org.apache.spark.rdd.ZippedPartitionsRDD2
+import org.apache.spark.rdd.ZippedPartitionsRDD3
+import org.apache.spark.rdd.ZippedPartitionsRDD4
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.BoundedPriorityQueue
+
+import SparkContext._
+
+/**
+ * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
+ * partitioned collection of elements that can be operated on in parallel. This class contains the
+ * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
+ * [[org.apache.spark.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such
+ * as `groupByKey` and `join`; [[org.apache.spark.DoubleRDDFunctions]] contains operations available only on
+ * RDDs of Doubles; and [[org.apache.spark.SequenceFileRDDFunctions]] contains operations available on RDDs
+ * that can be saved as SequenceFiles. These operations are automatically available on any RDD of
+ * the right type (e.g. RDD[(Int, Int)] through implicit conversions when you
+ * `import org.apache.spark.SparkContext._`.
+ *
+ * Internally, each RDD is characterized by five main properties:
+ *
+ *  - A list of partitions
+ *  - A function for computing each split
+ *  - A list of dependencies on other RDDs
+ *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
+ *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
+ *    an HDFS file)
+ *
+ * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
+ * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
+ * reading data from a new storage system) by overriding these functions. Please refer to the
+ * [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
+ * on RDD internals.
+ */
+abstract class RDD[T: ClassManifest](
+    @transient private var sc: SparkContext,
+    @transient private var deps: Seq[Dependency[_]]
+  ) extends Serializable with Logging {
+
+  /** Construct an RDD with just a one-to-one dependency on one parent */
+  def this(@transient oneParent: RDD[_]) =
+    this(oneParent.context , List(new OneToOneDependency(oneParent)))
+
+  // =======================================================================
+  // Methods that should be implemented by subclasses of RDD
+  // =======================================================================
+
+  /** Implemented by subclasses to compute a given partition. */
+  def compute(split: Partition, context: TaskContext): Iterator[T]
+
+  /**
+   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
+   * be called once, so it is safe to implement a time-consuming computation in it.
+   */
+  protected def getPartitions: Array[Partition]
+
+  /**
+   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
+   * be called once, so it is safe to implement a time-consuming computation in it.
+   */
+  protected def getDependencies: Seq[Dependency[_]] = deps
+
+  /** Optionally overridden by subclasses to specify placement preferences. */
+  protected def getPreferredLocations(split: Partition): Seq[String] = Nil
+
+  /** Optionally overridden by subclasses to specify how they are partitioned. */
+  val partitioner: Option[Partitioner] = None
+
+  // =======================================================================
+  // Methods and fields available on all RDDs
+  // =======================================================================
+
+  /** The SparkContext that created this RDD. */
+  def sparkContext: SparkContext = sc
+
+  /** A unique ID for this RDD (within its SparkContext). */
+  val id: Int = sc.newRddId()
+
+  /** A friendly name for this RDD */
+  var name: String = null
+
+  /** Assign a name to this RDD */
+  def setName(_name: String) = {
+    name = _name
+    this
+  }
+
+  /** User-defined generator of this RDD*/
+  var generator = Utils.getCallSiteInfo.firstUserClass
+
+  /** Reset generator*/
+  def setGenerator(_generator: String) = {
+    generator = _generator
+  }
+
+  /**
+   * Set this RDD's storage level to persist its values across operations after the first time
+   * it is computed. This can only be used to assign a new storage level if the RDD does not
+   * have a storage level set yet..
+   */
+  def persist(newLevel: StorageLevel): RDD[T] = {
+    // TODO: Handle changes of StorageLevel
+    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
+      throw new UnsupportedOperationException(
+        "Cannot change storage level of an RDD after it was already assigned a level")
+    }
+    storageLevel = newLevel
+    // Register the RDD with the SparkContext
+    sc.persistentRdds(id) = this
+    this
+  }
+
+  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+  def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
+
+  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+  def cache(): RDD[T] = persist()
+
+  /**
+   * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+   *
+   * @param blocking Whether to block until all blocks are deleted.
+   * @return This RDD.
+   */
+  def unpersist(blocking: Boolean = true): RDD[T] = {
+    logInfo("Removing RDD " + id + " from persistence list")
+    sc.env.blockManager.master.removeRdd(id, blocking)
+    sc.persistentRdds.remove(id)
+    storageLevel = StorageLevel.NONE
+    this
+  }
+
+  /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
+  def getStorageLevel = storageLevel
+
+  // Our dependencies and partitions will be gotten by calling subclass's methods below, and will
+  // be overwritten when we're checkpointed
+  private var dependencies_ : Seq[Dependency[_]] = null
+  @transient private var partitions_ : Array[Partition] = null
+
+  /** An Option holding our checkpoint RDD, if we are checkpointed */
+  private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)
+
+  /**
+   * Get the list of dependencies of this RDD, taking into account whether the
+   * RDD is checkpointed or not.
+   */
+  final def dependencies: Seq[Dependency[_]] = {
+    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
+      if (dependencies_ == null) {
+        dependencies_ = getDependencies
+      }
+      dependencies_
+    }
+  }
+
+  /**
+   * Get the array of partitions of this RDD, taking into account whether the
+   * RDD is checkpointed or not.
+   */
+  final def partitions: Array[Partition] = {
+    checkpointRDD.map(_.partitions).getOrElse {
+      if (partitions_ == null) {
+        partitions_ = getPartitions
+      }
+      partitions_
+    }
+  }
+
+  /**
+   * Get the preferred locations of a partition (as hostnames), taking into account whether the
+   * RDD is checkpointed.
+   */
+  final def preferredLocations(split: Partition): Seq[String] = {
+    checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
+      getPreferredLocations(split)
+    }
+  }
+
+  /**
+   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
+   * This should ''not'' be called by users directly, but is available for implementors of custom
+   * subclasses of RDD.
+   */
+  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
+    if (storageLevel != StorageLevel.NONE) {
+      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
+    } else {
+      computeOrReadCheckpoint(split, context)
+    }
+  }
+
+  /**
+   * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
+   */
+  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
+    if (isCheckpointed) {
+      firstParent[T].iterator(split, context)
+    } else {
+      compute(split, context)
+    }
+  }
+
+  // Transformations (return a new RDD)
+
+  /**
+   * Return a new RDD by applying a function to all elements of this RDD.
+   */
+  def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
+
+  /**
+   *  Return a new RDD by first applying a function to all elements of this
+   *  RDD, and then flattening the results.
+   */
+  def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] =
+    new FlatMappedRDD(this, sc.clean(f))
+
+  /**
+   * Return a new RDD containing only the elements that satisfy a predicate.
+   */
+  def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
+
+  /**
+   * Return a new RDD containing the distinct elements in this RDD.
+   */
+  def distinct(numPartitions: Int): RDD[T] =
+    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+
+  def distinct(): RDD[T] = distinct(partitions.size)
+
+  /**
+   * Return a new RDD that is reduced into `numPartitions` partitions.
+   */
+  def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
+    if (shuffle) {
+      // include a shuffle step so that our upstream tasks are still distributed
+      new CoalescedRDD(
+        new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)),
+        new HashPartitioner(numPartitions)),
+        numPartitions).keys
+    } else {
+      new CoalescedRDD(this, numPartitions)
+    }
+  }
+
+  /**
+   * Return a sampled subset of this RDD.
+   */
+  def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
+    new SampledRDD(this, withReplacement, fraction, seed)
+
+  def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
+    var fraction = 0.0
+    var total = 0
+    val multiplier = 3.0
+    val initialCount = this.count()
+    var maxSelected = 0
+
+    if (num < 0) {
+      throw new IllegalArgumentException("Negative number of elements requested")
+    }
+
+    if (initialCount > Integer.MAX_VALUE - 1) {
+      maxSelected = Integer.MAX_VALUE - 1
+    } else {
+      maxSelected = initialCount.toInt
+    }
+
+    if (num > initialCount && !withReplacement) {
+      total = maxSelected
+      fraction = multiplier * (maxSelected + 1) / initialCount
+    } else {
+      fraction = multiplier * (num + 1) / initialCount
+      total = num
+    }
+
+    val rand = new Random(seed)
+    var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
+
+    // If the first sample didn't turn out large enough, keep trying to take samples;
+    // this shouldn't happen often because we use a big multiplier for thei initial size
+    while (samples.length < total) {
+      samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
+    }
+
+    Utils.randomizeInPlace(samples, rand).take(total)
+  }
+
+  /**
+   * Return the union of this RDD and another one. Any identical elements will appear multiple
+   * times (use `.distinct()` to eliminate them).
+   */
+  def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))
+
+  /**
+   * Return the union of this RDD and another one. Any identical elements will appear multiple
+   * times (use `.distinct()` to eliminate them).
+   */
+  def ++(other: RDD[T]): RDD[T] = this.union(other)
+
+  /**
+   * Return an RDD created by coalescing all elements within each partition into an array.
+   */
+  def glom(): RDD[Array[T]] = new GlommedRDD(this)
+
+  /**
+   * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
+   * elements (a, b) where a is in `this` and b is in `other`.
+   */
+  def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
+
+  /**
+   * Return an RDD of grouped items.
+   */
+  def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] =
+    groupBy[K](f, defaultPartitioner(this))
+
+  /**
+   * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
+   * mapping to that key.
+   */
+  def groupBy[K: ClassManifest](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
+    groupBy(f, new HashPartitioner(numPartitions))
+
+  /**
+   * Return an RDD of grouped items.
+   */
+  def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
+    val cleanF = sc.clean(f)
+    this.map(t => (cleanF(t), t)).groupByKey(p)
+  }
+
+  /**
+   * Return an RDD created by piping elements to a forked external process.
+   */
+  def pipe(command: String): RDD[String] = new PipedRDD(this, command)
+
+  /**
+   * Return an RDD created by piping elements to a forked external process.
+   */
+  def pipe(command: String, env: Map[String, String]): RDD[String] =
+    new PipedRDD(this, command, env)
+
+
+  /**
+   * Return an RDD created by piping elements to a forked external process.
+   * The print behavior can be customized by providing two functions.
+   *
+   * @param command command to run in forked process.
+   * @param env environment variables to set.
+   * @param printPipeContext Before piping elements, this function is called as an oppotunity
+   *                         to pipe context data. Print line function (like out.println) will be
+   *                         passed as printPipeContext's parameter.
+   * @param printRDDElement Use this function to customize how to pipe elements. This function
+   *                        will be called with each RDD element as the 1st parameter, and the
+   *                        print line function (like out.println()) as the 2nd parameter.
+   *                        An example of pipe the RDD data of groupBy() in a streaming way,
+   *                        instead of constructing a huge String to concat all the elements:
+   *                        def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
+   *                          for (e <- record._2){f(e)}
+   * @return the result RDD
+   */
+  def pipe(
+      command: Seq[String],
+      env: Map[String, String] = Map(),
+      printPipeContext: (String => Unit) => Unit = null,
+      printRDDElement: (T, String => Unit) => Unit = null): RDD[String] =
+    new PipedRDD(this, command, env,
+      if (printPipeContext ne null) sc.clean(printPipeContext) else null,
+      if (printRDDElement ne null) sc.clean(printRDDElement) else null)
+
+  /**
+   * Return a new RDD by applying a function to each partition of this RDD.
+   */
+  def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U],
+    preservesPartitioning: Boolean = false): RDD[U] =
+    new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
+
+  /**
+   * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
+   * of the original partition.
+   */
+  def mapPartitionsWithIndex[U: ClassManifest](
+    f: (Int, Iterator[T]) => Iterator[U],
+    preservesPartitioning: Boolean = false): RDD[U] =
+    new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
+
+  /**
+   * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
+   * of the original partition.
+   */
+  @deprecated("use mapPartitionsWithIndex", "0.7.0")
+  def mapPartitionsWithSplit[U: ClassManifest](
+    f: (Int, Iterator[T]) => Iterator[U],
+    preservesPartitioning: Boolean = false): RDD[U] =
+    new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
+
+  /**
+   * Maps f over this RDD, where f takes an additional parameter of type A.  This
+   * additional parameter is produced by constructA, which is called in each
+   * partition with the index of that partition.
+   */
+  def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
+    (f:(T, A) => U): RDD[U] = {
+      def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
+        val a = constructA(index)
+        iter.map(t => f(t, a))
+      }
+    new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+  }
+
+  /**
+   * FlatMaps f over this RDD, where f takes an additional parameter of type A.  This
+   * additional parameter is produced by constructA, which is called in each
+   * partition with the index of that partition.
+   */
+  def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
+    (f:(T, A) => Seq[U]): RDD[U] = {
+      def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
+        val a = constructA(index)
+        iter.flatMap(t => f(t, a))
+      }
+    new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+  }
+
+  /**
+   * Applies f to each element of this RDD, where f takes an additional parameter of type A.
+   * This additional parameter is produced by constructA, which is called in each
+   * partition with the index of that partition.
+   */
+  def foreachWith[A: ClassManifest](constructA: Int => A)
+    (f:(T, A) => Unit) {
+      def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
+        val a = constructA(index)
+        iter.map(t => {f(t, a); t})
+      }
+    (new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {})
+  }
+
+  /**
+   * Filters this RDD with p, where p takes an additional parameter of type A.  This
+   * additional parameter is produced by constructA, which is called in each
+   * partition with the index of that partition.
+   */
+  def filterWith[A: ClassManifest](constructA: Int => A)
+    (p:(T, A) => Boolean): RDD[T] = {
+      def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
+        val a = constructA(index)
+        iter.filter(t => p(t, a))
+      }
+    new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)
+  }
+
+  /**
+   * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
+   * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
+   * partitions* and the *same number of elements in each partition* (e.g. one was made through
+   * a map on the other).
+   */
+  def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
+
+  /**
+   * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
+   * applying a function to the zipped partitions. Assumes that all the RDDs have the
+   * *same number of partitions*, but does *not* require them to have the same number
+   * of elements in each partition.
+   */
+  def zipPartitions[B: ClassManifest, V: ClassManifest]
+      (rdd2: RDD[B])
+      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
+    new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
+
+  def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
+      (rdd2: RDD[B], rdd3: RDD[C])
+      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
+    new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
+
+  def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
+      (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
+      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
+    new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
+
+
+  // Actions (launch a job to return a value to the user program)
+
+  /**
+   * Applies a function f to all elements of this RDD.
+   */
+  def foreach(f: T => Unit) {
+    val cleanF = sc.clean(f)
+    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
+  }
+
+  /**
+   * Applies a function f to each partition of this RDD.
+   */
+  def foreachPartition(f: Iterator[T] => Unit) {
+    val cleanF = sc.clean(f)
+    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
+  }
+
+  /**
+   * Return an array that contains all of the elements in this RDD.
+   */
+  def collect(): Array[T] = {
+    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
+    Array.concat(results: _*)
+  }
+
+  /**
+   * Return an array that contains all of the elements in this RDD.
+   */
+  def toArray(): Array[T] = collect()
+
+  /**
+   * Return an RDD that contains all matching values by applying `f`.
+   */
+  def collect[U: ClassManifest](f: PartialFunction[T, U]): RDD[U] = {
+    filter(f.isDefinedAt).map(f)
+  }
+
+  /**
+   * Return an RDD with the elements from `this` that are not in `other`.
+   *
+   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+   * RDD will be <= us.
+   */
+  def subtract(other: RDD[T]): RDD[T] =
+    subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))
+
+  /**
+   * Return an RDD with the elements from `this` that are not in `other`.
+   */
+  def subtract(other: RDD[T], numPartitions: Int): RDD[T] =
+    subtract(other, new HashPartitioner(numPartitions))
+
+  /**
+   * Return an RDD with the elements from `this` that are not in `other`.
+   */
+  def subtract(other: RDD[T], p: Partitioner): RDD[T] = {
+    if (partitioner == Some(p)) {
+      // Our partitioner knows how to handle T (which, since we have a partitioner, is
+      // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
+      val p2 = new Partitioner() {
+        override def numPartitions = p.numPartitions
+        override def getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1)
+      }
+      // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
+      // anyway, and when calling .keys, will not have a partitioner set, even though
+      // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
+      // partitioned by the right/real keys (e.g. p).
+      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
+    } else {
+      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
+    }
+  }
+
+  /**
+   * Reduces the elements of this RDD using the specified commutative and associative binary operator.
+   */
+  def reduce(f: (T, T) => T): T = {
+    val cleanF = sc.clean(f)
+    val reducePartition: Iterator[T] => Option[T] = iter => {
+      if (iter.hasNext) {
+        Some(iter.reduceLeft(cleanF))
+      } else {
+        None
+      }
+    }
+    var jobResult: Option[T] = None
+    val mergeResult = (index: Int, taskResult: Option[T]) => {
+      if (taskResult != None) {
+        jobResult = jobResult match {
+          case Some(value) => Some(f(value, taskResult.get))
+          case None => taskResult
+        }
+      }
+    }
+    sc.runJob(this, reducePartition, mergeResult)
+    // Get the final result out of our Option, or throw an exception if the RDD was empty
+    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
+  }
+
+  /**
+   * Aggregate the elements of each partition, and then the results for all the partitions, using a
+   * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
+   * modify t1 and return it as its result value to avoid object allocation; however, it should not
+   * modify t2.
+   */
+  def fold(zeroValue: T)(op: (T, T) => T): T = {
+    // Clone the zero value since we will also be serializing it as part of tasks
+    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
+    val cleanOp = sc.clean(op)
+    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
+    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
+    sc.runJob(this, foldPartition, mergeResult)
+    jobResult
+  }
+
+  /**
+   * Aggregate the elements of each partition, and then the results for all the partitions, using
+   * given combine functions and a neutral "zero value". This function can return a different result
+   * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
+   * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
+   * allowed to modify and return their first argument instead of creating a new U to avoid memory
+   * allocation.
+   */
+  def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
+    // Clone the zero value since we will also be serializing it as part of tasks
+    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
+    val cleanSeqOp = sc.clean(seqOp)
+    val cleanCombOp = sc.clean(combOp)
+    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
+    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
+    sc.runJob(this, aggregatePartition, mergeResult)
+    jobResult
+  }
+
+  /**
+   * Return the number of elements in the RDD.
+   */
+  def count(): Long = {
+    sc.runJob(this, (iter: Iterator[T]) => {
+      var result = 0L
+      while (iter.hasNext) {
+        result += 1L
+        iter.next()
+      }
+      result
+    }).sum
+  }
+
+  /**
+   * (Experimental) Approximate version of count() that returns a potentially incomplete result
+   * within a timeout, even if not all tasks have finished.
+   */
+  def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
+    val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
+      var result = 0L
+      while (iter.hasNext) {
+        result += 1L
+        iter.next()
+      }
+      result
+    }
+    val evaluator = new CountEvaluator(partitions.size, confidence)
+    sc.runApproximateJob(this, countElements, evaluator, timeout)
+  }
+
+  /**
+   * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
+   * combine step happens locally on the master, equivalent to running a single reduce task.
+   */
+  def countByValue(): Map[T, Long] = {
+    if (elementClassManifest.erasure.isArray) {
+      throw new SparkException("countByValue() does not support arrays")
+    }
+    // TODO: This should perhaps be distributed by default.
+    def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = {
+      val map = new OLMap[T]
+      while (iter.hasNext) {
+        val v = iter.next()
+        map.put(v, map.getLong(v) + 1L)
+      }
+      Iterator(map)
+    }
+    def mergeMaps(m1: OLMap[T], m2: OLMap[T]): OLMap[T] = {
+      val iter = m2.object2LongEntrySet.fastIterator()
+      while (iter.hasNext) {
+        val entry = iter.next()
+        m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
+      }
+      return m1
+    }
+    val myResult = mapPartitions(countPartition).reduce(mergeMaps)
+    myResult.asInstanceOf[java.util.Map[T, Long]]   // Will be wrapped as a Scala mutable Map
+  }
+
+  /**
+   * (Experimental) Approximate version of countByValue().
+   */
+  def countByValueApprox(
+      timeout: Long,
+      confidence: Double = 0.95
+      ): PartialResult[Map[T, BoundedDouble]] = {
+    if (elementClassManifest.erasure.isArray) {
+      throw new SparkException("countByValueApprox() does not support arrays")
+    }
+    val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) =>
+      val map = new OLMap[T]
+      while (iter.hasNext) {
+        val v = iter.next()
+        map.put(v, map.getLong(v) + 1L)
+      }
+      map
+    }
+    val evaluator = new GroupedCountEvaluator[T](partitions.size, confidence)
+    sc.runApproximateJob(this, countPartition, evaluator, timeout)
+  }
+
+  /**
+   * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
+   * it will be slow if a lot of partitions are required. In that case, use collect() to get the
+   * whole RDD instead.
+   */
+  def take(num: Int): Array[T] = {
+    if (num == 0) {
+      return new Array[T](0)
+    }
+    val buf = new ArrayBuffer[T]
+    var p = 0
+    while (buf.size < num && p < partitions.size) {
+      val left = num - buf.size
+      val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true)
+      buf ++= res(0)
+      if (buf.size == num)
+        return buf.toArray
+      p += 1
+    }
+    return buf.toArray
+  }
+
+  /**
+   * Return the first element in this RDD.
+   */
+  def first(): T = take(1) match {
+    case Array(t) => t
+    case _ => throw new UnsupportedOperationException("empty collection")
+  }
+
+  /**
+   * Returns the top K elements from this RDD as defined by
+   * the specified implicit Ordering[T].
+   * @param num the number of top elements to return
+   * @param ord the implicit ordering for T
+   * @return an array of top elements
+   */
+  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
+    mapPartitions { items =>
+      val queue = new BoundedPriorityQueue[T](num)
+      queue ++= items
+      Iterator.single(queue)
+    }.reduce { (queue1, queue2) =>
+      queue1 ++= queue2
+      queue1
+    }.toArray.sorted(ord.reverse)
+  }
+
+  /**
+   * Returns the first K elements from this RDD as defined by
+   * the specified implicit Ordering[T] and maintains the
+   * ordering.
+   * @param num the number of top elements to return
+   * @param ord the implicit ordering for T
+   * @return an array of top elements
+   */
+  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
+
+  /**
+   * Save this RDD as a text file, using string representations of elements.
+   */
+  def saveAsTextFile(path: String) {
+    this.map(x => (NullWritable.get(), new Text(x.toString)))
+      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
+  }
+
+  /**
+   * Save this RDD as a compressed text file, using string representations of elements.
+   */
+  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
+    this.map(x => (NullWritable.get(), new Text(x.toString)))
+      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
+  }
+
+  /**
+   * Save this RDD as a SequenceFile of serialized objects.
+   */
+  def saveAsObjectFile(path: String) {
+    this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
+      .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
+      .saveAsSequenceFile(path)
+  }
+
+  /**
+   * Creates tuples of the elements in this RDD by applying `f`.
+   */
+  def keyBy[K](f: T => K): RDD[(K, T)] = {
+    map(x => (f(x), x))
+  }
+
+  /** A private method for tests, to look at the contents of each partition */
+  private[spark] def collectPartitions(): Array[Array[T]] = {
+    sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
+  }
+
+  /**
+   * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
+   * directory set with SparkContext.setCheckpointDir() and all references to its parent
+   * RDDs will be removed. This function must be called before any job has been
+   * executed on this RDD. It is strongly recommended that this RDD is persisted in
+   * memory, otherwise saving it on a file will require recomputation.
+   */
+  def checkpoint() {
+    if (context.checkpointDir.isEmpty) {
+      throw new Exception("Checkpoint directory has not been set in the SparkContext")
+    } else if (checkpointData.isEmpty) {
+      checkpointData = Some(new RDDCheckpointData(this))
+      checkpointData.get.markForCheckpoint()
+    }
+  }
+
+  /**
+   * Return whether this RDD has been checkpointed or not
+   */
+  def isCheckpointed: Boolean = {
+    checkpointData.map(_.isCheckpointed).getOrElse(false)
+  }
+
+  /**
+   * Gets the name of the file to which this RDD was checkpointed
+   */
+  def getCheckpointFile: Option[String] = {
+    checkpointData.flatMap(_.getCheckpointFile)
+  }
+
+  // =======================================================================
+  // Other internal methods and fields
+  // =======================================================================
+
+  private var storageLevel: StorageLevel = StorageLevel.NONE
+
+  /** Record user function generating this RDD. */
+  private[spark] val origin = Utils.formatSparkCallSite
+
+  private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]
+
+  private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
+
+  /** Returns the first parent RDD */
+  protected[spark] def firstParent[U: ClassManifest] = {
+    dependencies.head.rdd.asInstanceOf[RDD[U]]
+  }
+
+  /** The [[org.apache.spark.SparkContext]] that this RDD was created on. */
+  def context = sc
+
+  // Avoid handling doCheckpoint multiple times to prevent excessive recursion
+  private var doCheckpointCalled = false
+
+  /**
+   * Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler
+   * after a job using this RDD has completed (therefore the RDD has been materialized and
+   * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
+   */
+  private[spark] def doCheckpoint() {
+    if (!doCheckpointCalled) {
+      doCheckpointCalled = true
+      if (checkpointData.isDefined) {
+        checkpointData.get.doCheckpoint()
+      } else {
+        dependencies.foreach(_.rdd.doCheckpoint())
+      }
+    }
+  }
+
+  /**
+   * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
+   * created from the checkpoint file, and forget its old dependencies and partitions.
+   */
+  private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
+    clearDependencies()
+    partitions_ = null
+    deps = null    // Forget the constructor argument for dependencies too
+  }
+
+  /**
+   * Clears the dependencies of this RDD. This method must ensure that all references
+   * to the original parent RDDs is removed to enable the parent RDDs to be garbage
+   * collected. Subclasses of RDD may override this method for implementing their own cleaning
+   * logic. See [[org.apache.spark.rdd.UnionRDD]] for an example.
+   */
+  protected def clearDependencies() {
+    dependencies_ = null
+  }
+
+  /** A description of this RDD and its recursive dependencies for debugging. */
+  def toDebugString: String = {
+    def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = {
+      Seq(prefix + rdd + " (" + rdd.partitions.size + " partitions)") ++
+        rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + "  "))
+    }
+    debugString(this).mkString("\n")
+  }
+
+  override def toString: String = "%s%s[%d] at %s".format(
+    Option(name).map(_ + " ").getOrElse(""),
+    getClass.getSimpleName,
+    id,
+    origin)
+
+  def toJavaRDD() : JavaRDD[T] = {
+    new JavaRDD(this)(elementClassManifest)
+  }
+
+}


Mime
View raw message