spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [36/69] [abbrv] [partial] Initial work to rename package to org.apache.spark
Date Sun, 01 Sep 2013 21:59:20 GMT
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala
new file mode 100644
index 0000000..1683050
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{RDD, Partition, TaskContext}
+
+
+/**
+ * A variant of the MapPartitionsRDD that passes the partition index into the
+ * closure. This can be used to generate or collect partition specific
+ * information such as the number of tuples in a partition.
+ */
+private[spark]
+class MapPartitionsWithIndexRDD[U: ClassManifest, T: ClassManifest](
+    prev: RDD[T],
+    f: (Int, Iterator[T]) => Iterator[U],
+    preservesPartitioning: Boolean
+  ) extends RDD[U](prev) {
+
+  override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+  override val partitioner = if (preservesPartitioning) prev.partitioner else None
+
+  override def compute(split: Partition, context: TaskContext) =
+    f(split.index, firstParent[T].iterator(split, context))
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
new file mode 100644
index 0000000..26d4806
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{RDD, Partition, TaskContext}
+
+private[spark]
+class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U)
+  extends RDD[U](prev) {
+
+  override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+  override def compute(split: Partition, context: TaskContext) =
+    firstParent[T].iterator(split, context).map(f)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
new file mode 100644
index 0000000..a405e9a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+
+import org.apache.spark.{TaskContext, Partition, RDD}
+
+private[spark]
+class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
+  extends RDD[(K, U)](prev) {
+
+  override def getPartitions = firstParent[Product2[K, U]].partitions
+
+  override val partitioner = firstParent[Product2[K, U]].partitioner
+
+  override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
+    firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
new file mode 100644
index 0000000..114b504
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+
+import org.apache.spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
+
+
+private[spark]
+class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
+  extends Partition {
+
+  val serializableHadoopSplit = new SerializableWritable(rawSplit)
+
+  override def hashCode(): Int = (41 * (41 + rddId) + index)
+}
+
+class NewHadoopRDD[K, V](
+    sc : SparkContext,
+    inputFormatClass: Class[_ <: InputFormat[K, V]],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    @transient conf: Configuration)
+  extends RDD[(K, V)](sc, Nil)
+  with SparkHadoopMapReduceUtil
+  with Logging {
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
+  private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+  // private val serializableConf = new SerializableWritable(conf)
+
+  private val jobtrackerId: String = {
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    formatter.format(new Date())
+  }
+
+  @transient private val jobId = new JobID(jobtrackerId, id)
+
+  override def getPartitions: Array[Partition] = {
+    val inputFormat = inputFormatClass.newInstance
+    if (inputFormat.isInstanceOf[Configurable]) {
+      inputFormat.asInstanceOf[Configurable].setConf(conf)
+    }
+    val jobContext = newJobContext(conf, jobId)
+    val rawSplits = inputFormat.getSplits(jobContext).toArray
+    val result = new Array[Partition](rawSplits.size)
+    for (i <- 0 until rawSplits.size) {
+      result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+    }
+    result
+  }
+
+  override def compute(theSplit: Partition, context: TaskContext) = new Iterator[(K, V)] {
+    val split = theSplit.asInstanceOf[NewHadoopPartition]
+    logInfo("Input split: " + split.serializableHadoopSplit)
+    val conf = confBroadcast.value.value
+    val attemptId = newTaskAttemptID(jobtrackerId, id, true, split.index, 0)
+    val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
+    val format = inputFormatClass.newInstance
+    if (format.isInstanceOf[Configurable]) {
+      format.asInstanceOf[Configurable].setConf(conf)
+    }
+    val reader = format.createRecordReader(
+      split.serializableHadoopSplit.value, hadoopAttemptContext)
+    reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+
+    // Register an on-task-completion callback to close the input stream.
+    context.addOnCompleteCallback(() => close())
+
+    var havePair = false
+    var finished = false
+
+    override def hasNext: Boolean = {
+      if (!finished && !havePair) {
+        finished = !reader.nextKeyValue
+        havePair = !finished
+      }
+      !finished
+    }
+
+    override def next: (K, V) = {
+      if (!hasNext) {
+        throw new java.util.NoSuchElementException("End of stream")
+      }
+      havePair = false
+      return (reader.getCurrentKey, reader.getCurrentValue)
+    }
+
+    private def close() {
+      try {
+        reader.close()
+      } catch {
+        case e: Exception => logWarning("Exception in RecordReader.close()", e)
+      }
+    }
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    val theSplit = split.asInstanceOf[NewHadoopPartition]
+    theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")
+  }
+
+  def getConf: Configuration = confBroadcast.value.value
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
new file mode 100644
index 0000000..4c3df0e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{RangePartitioner, Logging, RDD}
+
+/**
+ * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
+ * an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
+ * functions. They will work with any key type that has a `scala.math.Ordered` implementation.
+ */
+class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest,
+                          V: ClassManifest,
+                          P <: Product2[K, V] : ClassManifest](
+    self: RDD[P])
+  extends Logging with Serializable {
+
+  /**
+   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
+   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
+   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
+   * order of the keys).
+   */
+  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
+    val part = new RangePartitioner(numPartitions, self, ascending)
+    val shuffled = new ShuffledRDD[K, V, P](self, part)
+    shuffled.mapPartitions(iter => {
+      val buf = iter.toArray
+      if (ascending) {
+        buf.sortWith((x, y) => x._1 < y._1).iterator
+      } else {
+        buf.sortWith((x, y) => x._1 > y._1).iterator
+      }
+    }, preservesPartitioning = true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
new file mode 100644
index 0000000..8db3611
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.immutable.NumericRange
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.Map
+import org.apache.spark._
+import java.io._
+import scala.Serializable
+
+private[spark] class ParallelCollectionPartition[T: ClassManifest](
+    var rddId: Long,
+    var slice: Int,
+    var values: Seq[T])
+    extends Partition with Serializable {
+
+  def iterator: Iterator[T] = values.iterator
+
+  override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt
+
+  override def equals(other: Any): Boolean = other match {
+    case that: ParallelCollectionPartition[_] => (this.rddId == that.rddId && this.slice == that.slice)
+    case _ => false
+  }
+
+  override def index: Int = slice
+
+  @throws(classOf[IOException])
+  private def writeObject(out: ObjectOutputStream): Unit = {
+
+    val sfactory = SparkEnv.get.serializer
+
+    // Treat java serializer with default action rather than going thru serialization, to avoid a
+    // separate serialization header.
+
+    sfactory match {
+      case js: JavaSerializer => out.defaultWriteObject()
+      case _ =>
+        out.writeLong(rddId)
+        out.writeInt(slice)
+
+        val ser = sfactory.newInstance()
+        Utils.serializeViaNestedStream(out, ser)(_.writeObject(values))
+    }
+  }
+
+  @throws(classOf[IOException])
+  private def readObject(in: ObjectInputStream): Unit = {
+
+    val sfactory = SparkEnv.get.serializer
+    sfactory match {
+      case js: JavaSerializer => in.defaultReadObject()
+      case _ =>
+        rddId = in.readLong()
+        slice = in.readInt()
+
+        val ser = sfactory.newInstance()
+        Utils.deserializeViaNestedStream(in, ser)(ds => values = ds.readObject())
+    }
+  }
+}
+
+private[spark] class ParallelCollectionRDD[T: ClassManifest](
+    @transient sc: SparkContext,
+    @transient data: Seq[T],
+    numSlices: Int,
+    locationPrefs: Map[Int, Seq[String]])
+    extends RDD[T](sc, Nil) {
+  // TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets
+  // cached. It might be worthwhile to write the data to a file in the DFS and read it in the split
+  // instead.
+  // UPDATE: A parallel collection can be checkpointed to HDFS, which achieves this goal.
+
+  override def getPartitions: Array[Partition] = {
+    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
+    slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
+  }
+
+  override def compute(s: Partition, context: TaskContext) =
+    s.asInstanceOf[ParallelCollectionPartition[T]].iterator
+
+  override def getPreferredLocations(s: Partition): Seq[String] = {
+    locationPrefs.getOrElse(s.index, Nil)
+  }
+}
+
+private object ParallelCollectionRDD {
+  /**
+   * Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
+   * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
+   * it efficient to run Spark over RDDs representing large sets of numbers.
+   */
+  def slice[T: ClassManifest](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
+    if (numSlices < 1) {
+      throw new IllegalArgumentException("Positive number of slices required")
+    }
+    seq match {
+      case r: Range.Inclusive => {
+        val sign = if (r.step < 0) {
+          -1
+        } else {
+          1
+        }
+        slice(new Range(
+          r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
+      }
+      case r: Range => {
+        (0 until numSlices).map(i => {
+          val start = ((i * r.length.toLong) / numSlices).toInt
+          val end = (((i + 1) * r.length.toLong) / numSlices).toInt
+          new Range(r.start + start * r.step, r.start + end * r.step, r.step)
+        }).asInstanceOf[Seq[Seq[T]]]
+      }
+      case nr: NumericRange[_] => {
+        // For ranges of Long, Double, BigInteger, etc
+        val slices = new ArrayBuffer[Seq[T]](numSlices)
+        val sliceSize = (nr.size + numSlices - 1) / numSlices // Round up to catch everything
+        var r = nr
+        for (i <- 0 until numSlices) {
+          slices += r.take(sliceSize).asInstanceOf[Seq[T]]
+          r = r.drop(sliceSize)
+        }
+        slices
+      }
+      case _ => {
+        val array = seq.toArray // To prevent O(n^2) operations for List etc
+        (0 until numSlices).map(i => {
+          val start = ((i * array.length.toLong) / numSlices).toInt
+          val end = (((i + 1) * array.length.toLong) / numSlices).toInt
+          array.slice(start, end).toSeq
+        })
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
new file mode 100644
index 0000000..8e79a5c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{NarrowDependency, RDD, SparkEnv, Partition, TaskContext}
+
+
+class PartitionPruningRDDPartition(idx: Int, val parentSplit: Partition) extends Partition {
+  override val index = idx
+}
+
+
+/**
+ * Represents a dependency between the PartitionPruningRDD and its parent. In this
+ * case, the child RDD contains a subset of partitions of the parents'.
+ */
+class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
+  extends NarrowDependency[T](rdd) {
+
+  @transient
+  val partitions: Array[Partition] = rdd.partitions.zipWithIndex
+    .filter(s => partitionFilterFunc(s._2))
+    .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }
+
+  override def getParents(partitionId: Int) = List(partitions(partitionId).index)
+}
+
+
+/**
+ * A RDD used to prune RDD partitions/partitions so we can avoid launching tasks on
+ * all partitions. An example use case: If we know the RDD is partitioned by range,
+ * and the execution DAG has a filter on the key, we can avoid launching tasks
+ * on partitions that don't have the range covering the key.
+ */
+class PartitionPruningRDD[T: ClassManifest](
+    @transient prev: RDD[T],
+    @transient partitionFilterFunc: Int => Boolean)
+  extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {
+
+  override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(
+    split.asInstanceOf[PartitionPruningRDDPartition].parentSplit, context)
+
+  override protected def getPartitions: Array[Partition] =
+    getDependencies.head.asInstanceOf[PruneDependency[T]].partitions
+}
+
+
+object PartitionPruningRDD {
+
+  /**
+   * Create a PartitionPruningRDD. This function can be used to create the PartitionPruningRDD
+   * when its type T is not known at compile time.
+   */
+  def create[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean) = {
+    new PartitionPruningRDD[T](rdd, partitionFilterFunc)(rdd.elementClassManifest)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
new file mode 100644
index 0000000..98498d5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.io.PrintWriter
+import java.util.StringTokenizer
+
+import scala.collection.Map
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+
+import org.apache.spark.{RDD, SparkEnv, Partition, TaskContext}
+import org.apache.spark.broadcast.Broadcast
+
+
+/**
+ * An RDD that pipes the contents of each parent partition through an external command
+ * (printing them one per line) and returns the output as a collection of strings.
+ */
+class PipedRDD[T: ClassManifest](
+    prev: RDD[T],
+    command: Seq[String],
+    envVars: Map[String, String],
+    printPipeContext: (String => Unit) => Unit,
+    printRDDElement: (T, String => Unit) => Unit)
+  extends RDD[String](prev) {
+
+  // Similar to Runtime.exec(), if we are given a single string, split it into words
+  // using a standard StringTokenizer (i.e. by spaces)
+  def this(
+      prev: RDD[T],
+      command: String,
+      envVars: Map[String, String] = Map(),
+      printPipeContext: (String => Unit) => Unit = null,
+      printRDDElement: (T, String => Unit) => Unit = null) =
+    this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement)
+
+
+  override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+  override def compute(split: Partition, context: TaskContext): Iterator[String] = {
+    val pb = new ProcessBuilder(command)
+    // Add the environmental variables to the process.
+    val currentEnvVars = pb.environment()
+    envVars.foreach { case (variable, value) => currentEnvVars.put(variable, value) }
+
+    val proc = pb.start()
+    val env = SparkEnv.get
+
+    // Start a thread to print the process's stderr to ours
+    new Thread("stderr reader for " + command) {
+      override def run() {
+        for (line <- Source.fromInputStream(proc.getErrorStream).getLines) {
+          System.err.println(line)
+        }
+      }
+    }.start()
+
+    // Start a thread to feed the process input from our parent's iterator
+    new Thread("stdin writer for " + command) {
+      override def run() {
+        SparkEnv.set(env)
+        val out = new PrintWriter(proc.getOutputStream)
+
+        // input the pipe context firstly
+        if (printPipeContext != null) {
+          printPipeContext(out.println(_))
+        }
+        for (elem <- firstParent[T].iterator(split, context)) {
+          if (printRDDElement != null) {
+            printRDDElement(elem, out.println(_))
+          } else {
+            out.println(elem)
+          }
+        }
+        out.close()
+      }
+    }.start()
+
+    // Return an iterator that read lines from the process's stdout
+    val lines = Source.fromInputStream(proc.getInputStream).getLines
+    return new Iterator[String] {
+      def next() = lines.next()
+      def hasNext = {
+        if (lines.hasNext) {
+          true
+        } else {
+          val exitStatus = proc.waitFor()
+          if (exitStatus != 0) {
+            throw new Exception("Subprocess exited with status " + exitStatus)
+          }
+          false
+        }
+      }
+    }
+  }
+}
+
+object PipedRDD {
+  // Split a string into words using a standard StringTokenizer
+  def tokenize(command: String): Seq[String] = {
+    val buf = new ArrayBuffer[String]
+    val tok = new StringTokenizer(command)
+    while(tok.hasMoreElements) {
+      buf += tok.nextToken()
+    }
+    buf
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
new file mode 100644
index 0000000..1e8d89e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.util.Random
+
+import cern.jet.random.Poisson
+import cern.jet.random.engine.DRand
+
+import org.apache.spark.{RDD, Partition, TaskContext}
+
+private[spark]
+class SampledRDDPartition(val prev: Partition, val seed: Int) extends Partition with Serializable {
+  override val index: Int = prev.index
+}
+
+class SampledRDD[T: ClassManifest](
+    prev: RDD[T],
+    withReplacement: Boolean, 
+    frac: Double,
+    seed: Int)
+  extends RDD[T](prev) {
+
+  override def getPartitions: Array[Partition] = {
+    val rg = new Random(seed)
+    firstParent[T].partitions.map(x => new SampledRDDPartition(x, rg.nextInt))
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] =
+    firstParent[T].preferredLocations(split.asInstanceOf[SampledRDDPartition].prev)
+
+  override def compute(splitIn: Partition, context: TaskContext): Iterator[T] = {
+    val split = splitIn.asInstanceOf[SampledRDDPartition]
+    if (withReplacement) {
+      // For large datasets, the expected number of occurrences of each element in a sample with
+      // replacement is Poisson(frac). We use that to get a count for each element.
+      val poisson = new Poisson(frac, new DRand(split.seed))
+      firstParent[T].iterator(split.prev, context).flatMap { element =>
+        val count = poisson.nextInt()
+        if (count == 0) {
+          Iterator.empty  // Avoid object allocation when we return 0 items, which is quite often
+        } else {
+          Iterator.fill(count)(element)
+        }
+      }
+    } else { // Sampling without replacement
+      val rand = new Random(split.seed)
+      firstParent[T].iterator(split.prev, context).filter(x => (rand.nextDouble <= frac))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
new file mode 100644
index 0000000..f0e9ab8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{Dependency, Partitioner, RDD, SparkEnv, ShuffleDependency, Partition, TaskContext}
+
+
+private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
+  override val index = idx
+  override def hashCode(): Int = idx
+}
+
+/**
+ * The resulting RDD from a shuffle (e.g. repartitioning of data).
+ * @param prev the parent RDD.
+ * @param part the partitioner used to partition the RDD
+ * @tparam K the key class.
+ * @tparam V the value class.
+ */
+class ShuffledRDD[K, V, P <: Product2[K, V] : ClassManifest](
+    @transient var prev: RDD[P],
+    part: Partitioner)
+  extends RDD[P](prev.context, Nil) {
+
+  private var serializerClass: String = null
+
+  def setSerializer(cls: String): ShuffledRDD[K, V, P] = {
+    serializerClass = cls
+    this
+  }
+
+  override def getDependencies: Seq[Dependency[_]] = {
+    List(new ShuffleDependency(prev, part, serializerClass))
+  }
+
+  override val partitioner = Some(part)
+
+  override def getPartitions: Array[Partition] = {
+    Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[P] = {
+    val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
+    SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context.taskMetrics,
+      SparkEnv.get.serializerManager.get(serializerClass))
+  }
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    prev = null
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
new file mode 100644
index 0000000..7369dfa
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.util.{HashMap => JHashMap}
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.RDD
+import org.apache.spark.Partitioner
+import org.apache.spark.Dependency
+import org.apache.spark.TaskContext
+import org.apache.spark.Partition
+import org.apache.spark.SparkEnv
+import org.apache.spark.ShuffleDependency
+import org.apache.spark.OneToOneDependency
+
+
+/**
+ * An optimized version of cogroup for set difference/subtraction.
+ *
+ * It is possible to implement this operation with just `cogroup`, but
+ * that is less efficient because all of the entries from `rdd2`, for
+ * both matching and non-matching values in `rdd1`, are kept in the
+ * JHashMap until the end.
+ *
+ * With this implementation, only the entries from `rdd1` are kept in-memory,
+ * and the entries from `rdd2` are essentially streamed, as we only need to
+ * touch each once to decide if the value needs to be removed.
+ *
+ * This is particularly helpful when `rdd1` is much smaller than `rdd2`, as
+ * you can use `rdd1`'s partitioner/partition size and not worry about running
+ * out of memory because of the size of `rdd2`.
+ */
+private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest](
+    @transient var rdd1: RDD[_ <: Product2[K, V]],
+    @transient var rdd2: RDD[_ <: Product2[K, W]],
+    part: Partitioner)
+  extends RDD[(K, V)](rdd1.context, Nil) {
+
+  private var serializerClass: String = null
+
+  def setSerializer(cls: String): SubtractedRDD[K, V, W] = {
+    serializerClass = cls
+    this
+  }
+
+  override def getDependencies: Seq[Dependency[_]] = {
+    Seq(rdd1, rdd2).map { rdd =>
+      if (rdd.partitioner == Some(part)) {
+        logDebug("Adding one-to-one dependency with " + rdd)
+        new OneToOneDependency(rdd)
+      } else {
+        logDebug("Adding shuffle dependency with " + rdd)
+        new ShuffleDependency(rdd, part, serializerClass)
+      }
+    }
+  }
+
+  override def getPartitions: Array[Partition] = {
+    val array = new Array[Partition](part.numPartitions)
+    for (i <- 0 until array.size) {
+      // Each CoGroupPartition will depend on rdd1 and rdd2
+      array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
+        dependencies(j) match {
+          case s: ShuffleDependency[_, _] =>
+            new ShuffleCoGroupSplitDep(s.shuffleId)
+          case _ =>
+            new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
+        }
+      }.toArray)
+    }
+    array
+  }
+
+  override val partitioner = Some(part)
+
+  override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
+    val partition = p.asInstanceOf[CoGroupPartition]
+    val serializer = SparkEnv.get.serializerManager.get(serializerClass)
+    val map = new JHashMap[K, ArrayBuffer[V]]
+    def getSeq(k: K): ArrayBuffer[V] = {
+      val seq = map.get(k)
+      if (seq != null) {
+        seq
+      } else {
+        val seq = new ArrayBuffer[V]()
+        map.put(k, seq)
+        seq
+      }
+    }
+    def integrate(dep: CoGroupSplitDep, op: Product2[K, V] => Unit) = dep match {
+      case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
+        rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, V]]].foreach(op)
+      }
+      case ShuffleCoGroupSplitDep(shuffleId) => {
+        val iter = SparkEnv.get.shuffleFetcher.fetch[Product2[K, V]](shuffleId, partition.index,
+          context.taskMetrics, serializer)
+        iter.foreach(op)
+      }
+    }
+    // the first dep is rdd1; add all values to the map
+    integrate(partition.deps(0), t => getSeq(t._1) += t._2)
+    // the second dep is rdd2; remove all of its keys
+    integrate(partition.deps(1), t => map.remove(t._1))
+    map.iterator.map { t =>  t._2.iterator.map { (t._1, _) } }.flatten
+  }
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    rdd1 = null
+    rdd2 = null
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
new file mode 100644
index 0000000..fd02476
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.{Dependency, RangeDependency, RDD, SparkContext, Partition, TaskContext}
+import java.io.{ObjectOutputStream, IOException}
+
+private[spark] class UnionPartition[T: ClassManifest](idx: Int, rdd: RDD[T], splitIndex: Int)
+  extends Partition {
+
+  var split: Partition = rdd.partitions(splitIndex)
+
+  def iterator(context: TaskContext) = rdd.iterator(split, context)
+
+  def preferredLocations() = rdd.preferredLocations(split)
+
+  override val index: Int = idx
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream) {
+    // Update the reference to parent split at the time of task serialization
+    split = rdd.partitions(splitIndex)
+    oos.defaultWriteObject()
+  }
+}
+
+class UnionRDD[T: ClassManifest](
+    sc: SparkContext,
+    @transient var rdds: Seq[RDD[T]])
+  extends RDD[T](sc, Nil) {  // Nil since we implement getDependencies
+
+  override def getPartitions: Array[Partition] = {
+    val array = new Array[Partition](rdds.map(_.partitions.size).sum)
+    var pos = 0
+    for (rdd <- rdds; split <- rdd.partitions) {
+      array(pos) = new UnionPartition(pos, rdd, split.index)
+      pos += 1
+    }
+    array
+  }
+
+  override def getDependencies: Seq[Dependency[_]] = {
+    val deps = new ArrayBuffer[Dependency[_]]
+    var pos = 0
+    for (rdd <- rdds) {
+      deps += new RangeDependency(rdd, 0, pos, rdd.partitions.size)
+      pos += rdd.partitions.size
+    }
+    deps
+  }
+
+  override def compute(s: Partition, context: TaskContext): Iterator[T] =
+    s.asInstanceOf[UnionPartition[T]].iterator(context)
+
+  override def getPreferredLocations(s: Partition): Seq[String] =
+    s.asInstanceOf[UnionPartition[T]].preferredLocations()
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
new file mode 100644
index 0000000..5ae1db3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
+import java.io.{ObjectOutputStream, IOException}
+
+private[spark] class ZippedPartitionsPartition(
+    idx: Int,
+    @transient rdds: Seq[RDD[_]])
+  extends Partition {
+
+  override val index: Int = idx
+  var partitionValues = rdds.map(rdd => rdd.partitions(idx))
+  def partitions = partitionValues
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream) {
+    // Update the reference to parent split at the time of task serialization
+    partitionValues = rdds.map(rdd => rdd.partitions(idx))
+    oos.defaultWriteObject()
+  }
+}
+
+abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
+    sc: SparkContext,
+    var rdds: Seq[RDD[_]])
+  extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) {
+
+  override def getPartitions: Array[Partition] = {
+    val sizes = rdds.map(x => x.partitions.size)
+    if (!sizes.forall(x => x == sizes(0))) {
+      throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
+    }
+    val array = new Array[Partition](sizes(0))
+    for (i <- 0 until sizes(0)) {
+      array(i) = new ZippedPartitionsPartition(i, rdds)
+    }
+    array
+  }
+
+  override def getPreferredLocations(s: Partition): Seq[String] = {
+    val parts = s.asInstanceOf[ZippedPartitionsPartition].partitions
+    val prefs = rdds.zip(parts).map { case (rdd, p) => rdd.preferredLocations(p) }
+    // Check whether there are any hosts that match all RDDs; otherwise return the union
+    val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y))
+    if (!exactMatchLocations.isEmpty) {
+      exactMatchLocations
+    } else {
+      prefs.flatten.distinct
+    }
+  }
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    rdds = null
+  }
+}
+
+class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest](
+    sc: SparkContext,
+    f: (Iterator[A], Iterator[B]) => Iterator[V],
+    var rdd1: RDD[A],
+    var rdd2: RDD[B])
+  extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) {
+
+  override def compute(s: Partition, context: TaskContext): Iterator[V] = {
+    val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
+    f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context))
+  }
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    rdd1 = null
+    rdd2 = null
+  }
+}
+
+class ZippedPartitionsRDD3
+  [A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest](
+    sc: SparkContext,
+    f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V],
+    var rdd1: RDD[A],
+    var rdd2: RDD[B],
+    var rdd3: RDD[C])
+  extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) {
+
+  override def compute(s: Partition, context: TaskContext): Iterator[V] = {
+    val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
+    f(rdd1.iterator(partitions(0), context),
+      rdd2.iterator(partitions(1), context),
+      rdd3.iterator(partitions(2), context))
+  }
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    rdd1 = null
+    rdd2 = null
+    rdd3 = null
+  }
+}
+
+class ZippedPartitionsRDD4
+  [A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest](
+    sc: SparkContext,
+    f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
+    var rdd1: RDD[A],
+    var rdd2: RDD[B],
+    var rdd3: RDD[C],
+    var rdd4: RDD[D])
+  extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) {
+
+  override def compute(s: Partition, context: TaskContext): Iterator[V] = {
+    val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
+    f(rdd1.iterator(partitions(0), context),
+      rdd2.iterator(partitions(1), context),
+      rdd3.iterator(partitions(2), context),
+      rdd4.iterator(partitions(3), context))
+  }
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    rdd1 = null
+    rdd2 = null
+    rdd3 = null
+    rdd4 = null
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
new file mode 100644
index 0000000..3bd00d2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
+import java.io.{ObjectOutputStream, IOException}
+
+
+private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest](
+    idx: Int,
+    @transient rdd1: RDD[T],
+    @transient rdd2: RDD[U]
+  ) extends Partition {
+
+  var partition1 = rdd1.partitions(idx)
+  var partition2 = rdd2.partitions(idx)
+  override val index: Int = idx
+
+  def partitions = (partition1, partition2)
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream) {
+    // Update the reference to parent partition at the time of task serialization
+    partition1 = rdd1.partitions(idx)
+    partition2 = rdd2.partitions(idx)
+    oos.defaultWriteObject()
+  }
+}
+
+class ZippedRDD[T: ClassManifest, U: ClassManifest](
+    sc: SparkContext,
+    var rdd1: RDD[T],
+    var rdd2: RDD[U])
+  extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))) {
+
+  override def getPartitions: Array[Partition] = {
+    if (rdd1.partitions.size != rdd2.partitions.size) {
+      throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
+    }
+    val array = new Array[Partition](rdd1.partitions.size)
+    for (i <- 0 until rdd1.partitions.size) {
+      array(i) = new ZippedPartition(i, rdd1, rdd2)
+    }
+    array
+  }
+
+  override def compute(s: Partition, context: TaskContext): Iterator[(T, U)] = {
+    val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
+    rdd1.iterator(partition1, context).zip(rdd2.iterator(partition2, context))
+  }
+
+  override def getPreferredLocations(s: Partition): Seq[String] = {
+    val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
+    val pref1 = rdd1.preferredLocations(partition1)
+    val pref2 = rdd2.preferredLocations(partition2)
+    // Check whether there are any hosts that match both RDDs; otherwise return the union
+    val exactMatchLocations = pref1.intersect(pref2)
+    if (!exactMatchLocations.isEmpty) {
+      exactMatchLocations
+    } else {
+      (pref1 ++ pref2).distinct
+    }
+  }
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    rdd1 = null
+    rdd2 = null
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
new file mode 100644
index 0000000..0b04607
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.TaskContext
+
+import java.util.Properties
+
+/**
+ * Tracks information about an active job in the DAGScheduler.
+ */
+private[spark] class ActiveJob(
+    val jobId: Int,
+    val finalStage: Stage,
+    val func: (TaskContext, Iterator[_]) => _,
+    val partitions: Array[Int],
+    val callSite: String,
+    val listener: JobListener,
+    val properties: Properties) {
+
+  val numPartitions = partitions.length
+  val finished = Array.fill[Boolean](numPartitions)(false)
+  var numFinished = 0
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
new file mode 100644
index 0000000..5ac700b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -0,0 +1,849 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.NotSerializableException
+import java.util.Properties
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
+
+import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
+import org.apache.spark.scheduler.cluster.TaskInfo
+import org.apache.spark.storage.{BlockManager, BlockManagerMaster}
+import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
+
+/**
+ * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
+ * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a
+ * minimal schedule to run the job. It then submits stages as TaskSets to an underlying
+ * TaskScheduler implementation that runs them on the cluster.
+ *
+ * In addition to coming up with a DAG of stages, this class also determines the preferred
+ * locations to run each task on, based on the current cache status, and passes these to the
+ * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
+ * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are
+ * not caused by shuffie file loss are handled by the TaskScheduler, which will retry each task
+ * a small number of times before cancelling the whole stage.
+ *
+ * THREADING: This class runs all its logic in a single thread executing the run() method, to which
+ * events are submitted using a synchonized queue (eventQueue). The public API methods, such as
+ * runJob, taskEnded and executorLost, post events asynchronously to this queue. All other methods
+ * should be private.
+ */
+private[spark]
+class DAGScheduler(
+    taskSched: TaskScheduler,
+    mapOutputTracker: MapOutputTracker,
+    blockManagerMaster: BlockManagerMaster,
+    env: SparkEnv)
+  extends TaskSchedulerListener with Logging {
+
+  def this(taskSched: TaskScheduler) {
+    this(taskSched, SparkEnv.get.mapOutputTracker, SparkEnv.get.blockManager.master, SparkEnv.get)
+  }
+  taskSched.setListener(this)
+
+  // Called by TaskScheduler to report task's starting.
+  override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
+    eventQueue.put(BeginEvent(task, taskInfo))
+  }
+
+  // Called by TaskScheduler to report task completions or failures.
+  override def taskEnded(
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Map[Long, Any],
+      taskInfo: TaskInfo,
+      taskMetrics: TaskMetrics) {
+    eventQueue.put(CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
+  }
+
+  // Called by TaskScheduler when an executor fails.
+  override def executorLost(execId: String) {
+    eventQueue.put(ExecutorLost(execId))
+  }
+
+  // Called by TaskScheduler when a host is added
+  override def executorGained(execId: String, host: String) {
+    eventQueue.put(ExecutorGained(execId, host))
+  }
+
+  // Called by TaskScheduler to cancel an entire TaskSet due to repeated failures.
+  override def taskSetFailed(taskSet: TaskSet, reason: String) {
+    eventQueue.put(TaskSetFailed(taskSet, reason))
+  }
+
+  // The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
+  // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
+  // as more failure events come in
+  val RESUBMIT_TIMEOUT = 50L
+
+  // The time, in millis, to wake up between polls of the completion queue in order to potentially
+  // resubmit failed stages
+  val POLL_TIMEOUT = 10L
+
+  private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]
+
+  val nextJobId = new AtomicInteger(0)
+
+  val nextStageId = new AtomicInteger(0)
+
+  val stageIdToStage = new TimeStampedHashMap[Int, Stage]
+
+  val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
+
+  private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
+
+  private val listenerBus = new SparkListenerBus()
+
+  // Contains the locations that each RDD's partitions are cached on
+  private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
+
+  // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with
+  // every task. When we detect a node failing, we note the current epoch number and failed
+  // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results.
+  //
+  // TODO: Garbage collect information about failure epochs when we know there are no more
+  //       stray messages to detect.
+  val failedEpoch = new HashMap[String, Long]
+
+  val idToActiveJob = new HashMap[Int, ActiveJob]
+
+  val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done
+  val running = new HashSet[Stage] // Stages we are running right now
+  val failed = new HashSet[Stage]  // Stages that must be resubmitted due to fetch failures
+  val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] // Missing tasks from each stage
+  var lastFetchFailureTime: Long = 0  // Used to wait a bit to avoid repeated resubmits
+
+  val activeJobs = new HashSet[ActiveJob]
+  val resultStageToJob = new HashMap[Stage, ActiveJob]
+
+  val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
+
+  // Start a thread to run the DAGScheduler event loop
+  def start() {
+    new Thread("DAGScheduler") {
+      setDaemon(true)
+      override def run() {
+        DAGScheduler.this.run()
+      }
+    }.start()
+  }
+
+  def addSparkListener(listener: SparkListener) {
+    listenerBus.addListener(listener)
+  }
+
+  private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
+    if (!cacheLocs.contains(rdd.id)) {
+      val blockIds = rdd.partitions.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray
+      val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster)
+      cacheLocs(rdd.id) = blockIds.map { id =>
+        locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId))
+      }
+    }
+    cacheLocs(rdd.id)
+  }
+
+  private def clearCacheLocs() {
+    cacheLocs.clear()
+  }
+
+  /**
+   * Get or create a shuffle map stage for the given shuffle dependency's map side.
+   * The jobId value passed in will be used if the stage doesn't already exist with
+   * a lower jobId (jobId always increases across jobs.)
+   */
+  private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], jobId: Int): Stage = {
+    shuffleToMapStage.get(shuffleDep.shuffleId) match {
+      case Some(stage) => stage
+      case None =>
+        val stage = newStage(shuffleDep.rdd, Some(shuffleDep), jobId)
+        shuffleToMapStage(shuffleDep.shuffleId) = stage
+        stage
+    }
+  }
+
+  /**
+   * Create a Stage for the given RDD, either as a shuffle map stage (for a ShuffleDependency) or
+   * as a result stage for the final RDD used directly in an action. The stage will also be
+   * associated with the provided jobId.
+   */
+  private def newStage(
+      rdd: RDD[_],
+      shuffleDep: Option[ShuffleDependency[_,_]],
+      jobId: Int,
+      callSite: Option[String] = None)
+    : Stage =
+  {
+    if (shuffleDep != None) {
+      // Kind of ugly: need to register RDDs with the cache and map output tracker here
+      // since we can't do it in the RDD constructor because # of partitions is unknown
+      logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
+      mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size)
+    }
+    val id = nextStageId.getAndIncrement()
+    val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
+    stageIdToStage(id) = stage
+    stageToInfos(stage) = StageInfo(stage)
+    stage
+  }
+
+  /**
+   * Get or create the list of parent stages for a given RDD. The stages will be assigned the
+   * provided jobId if they haven't already been created with a lower jobId.
+   */
+  private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
+    val parents = new HashSet[Stage]
+    val visited = new HashSet[RDD[_]]
+    def visit(r: RDD[_]) {
+      if (!visited(r)) {
+        visited += r
+        // Kind of ugly: need to register RDDs with the cache here since
+        // we can't do it in its constructor because # of partitions is unknown
+        for (dep <- r.dependencies) {
+          dep match {
+            case shufDep: ShuffleDependency[_,_] =>
+              parents += getShuffleMapStage(shufDep, jobId)
+            case _ =>
+              visit(dep.rdd)
+          }
+        }
+      }
+    }
+    visit(rdd)
+    parents.toList
+  }
+
+  private def getMissingParentStages(stage: Stage): List[Stage] = {
+    val missing = new HashSet[Stage]
+    val visited = new HashSet[RDD[_]]
+    def visit(rdd: RDD[_]) {
+      if (!visited(rdd)) {
+        visited += rdd
+        if (getCacheLocs(rdd).contains(Nil)) {
+          for (dep <- rdd.dependencies) {
+            dep match {
+              case shufDep: ShuffleDependency[_,_] =>
+                val mapStage = getShuffleMapStage(shufDep, stage.jobId)
+                if (!mapStage.isAvailable) {
+                  missing += mapStage
+                }
+              case narrowDep: NarrowDependency[_] =>
+                visit(narrowDep.rdd)
+            }
+          }
+        }
+      }
+    }
+    visit(stage.rdd)
+    missing.toList
+  }
+
+  /**
+   * Returns (and does not submit) a JobSubmitted event suitable to run a given job, and a
+   * JobWaiter whose getResult() method will return the result of the job when it is complete.
+   *
+   * The job is assumed to have at least one partition; zero partition jobs should be handled
+   * without a JobSubmitted event.
+   */
+  private[scheduler] def prepareJob[T, U: ClassManifest](
+      finalRdd: RDD[T],
+      func: (TaskContext, Iterator[T]) => U,
+      partitions: Seq[Int],
+      callSite: String,
+      allowLocal: Boolean,
+      resultHandler: (Int, U) => Unit,
+      properties: Properties = null)
+    : (JobSubmitted, JobWaiter[U]) =
+  {
+    assert(partitions.size > 0)
+    val waiter = new JobWaiter(partitions.size, resultHandler)
+    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
+    val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter,
+      properties)
+    (toSubmit, waiter)
+  }
+
+  def runJob[T, U: ClassManifest](
+      finalRdd: RDD[T],
+      func: (TaskContext, Iterator[T]) => U,
+      partitions: Seq[Int],
+      callSite: String,
+      allowLocal: Boolean,
+      resultHandler: (Int, U) => Unit,
+      properties: Properties = null)
+  {
+    if (partitions.size == 0) {
+      return
+    }
+
+    // Check to make sure we are not launching a task on a partition that does not exist.
+    val maxPartitions = finalRdd.partitions.length
+    partitions.find(p => p >= maxPartitions).foreach { p =>
+      throw new IllegalArgumentException(
+        "Attempting to access a non-existent partition: " + p + ". " +
+        "Total number of partitions: " + maxPartitions)
+    }
+
+    val (toSubmit: JobSubmitted, waiter: JobWaiter[_]) = prepareJob(
+        finalRdd, func, partitions, callSite, allowLocal, resultHandler, properties)
+    eventQueue.put(toSubmit)
+    waiter.awaitResult() match {
+      case JobSucceeded => {}
+      case JobFailed(exception: Exception, _) =>
+        logInfo("Failed to run " + callSite)
+        throw exception
+    }
+  }
+
+  def runApproximateJob[T, U, R](
+      rdd: RDD[T],
+      func: (TaskContext, Iterator[T]) => U,
+      evaluator: ApproximateEvaluator[U, R],
+      callSite: String,
+      timeout: Long,
+      properties: Properties = null)
+    : PartialResult[R] =
+  {
+    val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
+    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
+    val partitions = (0 until rdd.partitions.size).toArray
+    eventQueue.put(JobSubmitted(rdd, func2, partitions, allowLocal = false, callSite, listener, properties))
+    listener.awaitResult()    // Will throw an exception if the job fails
+  }
+
+  /**
+   * Process one event retrieved from the event queue.
+   * Returns true if we should stop the event loop.
+   */
+  private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
+    event match {
+      case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) =>
+        val jobId = nextJobId.getAndIncrement()
+        val finalStage = newStage(finalRDD, None, jobId, Some(callSite))
+        val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
+        clearCacheLocs()
+        logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length +
+                " output partitions (allowLocal=" + allowLocal + ")")
+        logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
+        logInfo("Parents of final stage: " + finalStage.parents)
+        logInfo("Missing parents: " + getMissingParentStages(finalStage))
+        if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
+          // Compute very short actions like first() or take() with no parent stages locally.
+          runLocally(job)
+        } else {
+          listenerBus.post(SparkListenerJobStart(job, properties))
+          idToActiveJob(jobId) = job
+          activeJobs += job
+          resultStageToJob(finalStage) = job
+          submitStage(finalStage)
+        }
+
+      case ExecutorGained(execId, host) =>
+        handleExecutorGained(execId, host)
+
+      case ExecutorLost(execId) =>
+        handleExecutorLost(execId)
+
+      case begin: BeginEvent =>
+        listenerBus.post(SparkListenerTaskStart(begin.task, begin.taskInfo))
+
+      case completion: CompletionEvent =>
+        listenerBus.post(SparkListenerTaskEnd(
+          completion.task, completion.reason, completion.taskInfo, completion.taskMetrics))
+        handleTaskCompletion(completion)
+
+      case TaskSetFailed(taskSet, reason) =>
+        abortStage(stageIdToStage(taskSet.stageId), reason)
+
+      case StopDAGScheduler =>
+        // Cancel any active jobs
+        for (job <- activeJobs) {
+          val error = new SparkException("Job cancelled because SparkContext was shut down")
+          job.listener.jobFailed(error)
+          listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, None)))
+        }
+        return true
+    }
+    false
+  }
+
+  /**
+   * Resubmit any failed stages. Ordinarily called after a small amount of time has passed since
+   * the last fetch failure.
+   */
+  private[scheduler] def resubmitFailedStages() {
+    logInfo("Resubmitting failed stages")
+    clearCacheLocs()
+    val failed2 = failed.toArray
+    failed.clear()
+    for (stage <- failed2.sortBy(_.jobId)) {
+      submitStage(stage)
+    }
+  }
+
+  /**
+   * Check for waiting or failed stages which are now eligible for resubmission.
+   * Ordinarily run on every iteration of the event loop.
+   */
+  private[scheduler] def submitWaitingStages() {
+    // TODO: We might want to run this less often, when we are sure that something has become
+    // runnable that wasn't before.
+    logTrace("Checking for newly runnable parent stages")
+    logTrace("running: " + running)
+    logTrace("waiting: " + waiting)
+    logTrace("failed: " + failed)
+    val waiting2 = waiting.toArray
+    waiting.clear()
+    for (stage <- waiting2.sortBy(_.jobId)) {
+      submitStage(stage)
+    }
+  }
+
+
+  /**
+   * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
+   * events and responds by launching tasks. This runs in a dedicated thread and receives events
+   * via the eventQueue.
+   */
+  private def run() {
+    SparkEnv.set(env)
+
+    while (true) {
+      val event = eventQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS)
+      if (event != null) {
+        logDebug("Got event of type " + event.getClass.getName)
+      }
+      this.synchronized { // needed in case other threads makes calls into methods of this class
+        if (event != null) {
+          if (processEvent(event)) {
+            return
+          }
+        }
+
+        val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
+        // Periodically resubmit failed stages if some map output fetches have failed and we have
+        // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
+        // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
+        // the same time, so we want to make sure we've identified all the reduce tasks that depend
+        // on the failed node.
+        if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
+          resubmitFailedStages()
+        } else {
+          submitWaitingStages()
+        }
+      }
+    }
+  }
+
+  /**
+   * Run a job on an RDD locally, assuming it has only a single partition and no dependencies.
+   * We run the operation in a separate thread just in case it takes a bunch of time, so that we
+   * don't block the DAGScheduler event loop or other concurrent jobs.
+   */
+  protected def runLocally(job: ActiveJob) {
+    logInfo("Computing the requested partition locally")
+    new Thread("Local computation of job " + job.jobId) {
+      override def run() {
+        runLocallyWithinThread(job)
+      }
+    }.start()
+  }
+
+  // Broken out for easier testing in DAGSchedulerSuite.
+  protected def runLocallyWithinThread(job: ActiveJob) {
+    try {
+      SparkEnv.set(env)
+      val rdd = job.finalStage.rdd
+      val split = rdd.partitions(job.partitions(0))
+      val taskContext = new TaskContext(job.finalStage.id, job.partitions(0), 0)
+      try {
+        val result = job.func(taskContext, rdd.iterator(split, taskContext))
+        job.listener.taskSucceeded(0, result)
+      } finally {
+        taskContext.executeOnCompleteCallbacks()
+      }
+    } catch {
+      case e: Exception =>
+        job.listener.jobFailed(e)
+    }
+  }
+
+  /** Submits stage, but first recursively submits any missing parents. */
+  private def submitStage(stage: Stage) {
+    logDebug("submitStage(" + stage + ")")
+    if (!waiting(stage) && !running(stage) && !failed(stage)) {
+      val missing = getMissingParentStages(stage).sortBy(_.id)
+      logDebug("missing: " + missing)
+      if (missing == Nil) {
+        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
+        submitMissingTasks(stage)
+        running += stage
+      } else {
+        for (parent <- missing) {
+          submitStage(parent)
+        }
+        waiting += stage
+      }
+    }
+  }
+
+  /** Called when stage's parents are available and we can now do its task. */
+  private def submitMissingTasks(stage: Stage) {
+    logDebug("submitMissingTasks(" + stage + ")")
+    // Get our pending tasks and remember them in our pendingTasks entry
+    val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
+    myPending.clear()
+    var tasks = ArrayBuffer[Task[_]]()
+    if (stage.isShuffleMap) {
+      for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
+        val locs = getPreferredLocs(stage.rdd, p)
+        tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
+      }
+    } else {
+      // This is a final stage; figure out its job's missing partitions
+      val job = resultStageToJob(stage)
+      for (id <- 0 until job.numPartitions if !job.finished(id)) {
+        val partition = job.partitions(id)
+        val locs = getPreferredLocs(stage.rdd, partition)
+        tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
+      }
+    }
+    // must be run listener before possible NotSerializableException
+    // should be "StageSubmitted" first and then "JobEnded"
+    val properties = idToActiveJob(stage.jobId).properties
+    listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties))
+
+    if (tasks.size > 0) {
+      // Preemptively serialize a task to make sure it can be serialized. We are catching this
+      // exception here because it would be fairly hard to catch the non-serializable exception
+      // down the road, where we have several different implementations for local scheduler and
+      // cluster schedulers.
+      try {
+        SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head)
+      } catch {
+        case e: NotSerializableException =>
+          abortStage(stage, e.toString)
+          running -= stage
+          return
+      }
+
+      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
+      myPending ++= tasks
+      logDebug("New pending tasks: " + myPending)
+      taskSched.submitTasks(
+        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
+      if (!stage.submissionTime.isDefined) {
+        stage.submissionTime = Some(System.currentTimeMillis())
+      }
+    } else {
+      logDebug("Stage " + stage + " is actually done; %b %d %d".format(
+        stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
+      running -= stage
+    }
+  }
+
+  /**
+   * Responds to a task finishing. This is called inside the event loop so it assumes that it can
+   * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
+   */
+  private def handleTaskCompletion(event: CompletionEvent) {
+    val task = event.task
+    val stage = stageIdToStage(task.stageId)
+
+    def markStageAsFinished(stage: Stage) = {
+      val serviceTime = stage.submissionTime match {
+        case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
+        case _ => "Unkown"
+      }
+      logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
+      stage.completionTime = Some(System.currentTimeMillis)
+      listenerBus.post(StageCompleted(stageToInfos(stage)))
+      running -= stage
+    }
+    event.reason match {
+      case Success =>
+        logInfo("Completed " + task)
+        if (event.accumUpdates != null) {
+          Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
+        }
+        pendingTasks(stage) -= task
+        stageToInfos(stage).taskInfos += event.taskInfo -> event.taskMetrics
+        task match {
+          case rt: ResultTask[_, _] =>
+            resultStageToJob.get(stage) match {
+              case Some(job) =>
+                if (!job.finished(rt.outputId)) {
+                  job.finished(rt.outputId) = true
+                  job.numFinished += 1
+                  // If the whole job has finished, remove it
+                  if (job.numFinished == job.numPartitions) {
+                    idToActiveJob -= stage.jobId
+                    activeJobs -= job
+                    resultStageToJob -= stage
+                    markStageAsFinished(stage)
+                    listenerBus.post(SparkListenerJobEnd(job, JobSucceeded))
+                  }
+                  job.listener.taskSucceeded(rt.outputId, event.result)
+                }
+              case None =>
+                logInfo("Ignoring result from " + rt + " because its job has finished")
+            }
+
+          case smt: ShuffleMapTask =>
+            val status = event.result.asInstanceOf[MapStatus]
+            val execId = status.location.executorId
+            logDebug("ShuffleMapTask finished on " + execId)
+            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
+              logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
+            } else {
+              stage.addOutputLoc(smt.partition, status)
+            }
+            if (running.contains(stage) && pendingTasks(stage).isEmpty) {
+              markStageAsFinished(stage)
+              logInfo("looking for newly runnable stages")
+              logInfo("running: " + running)
+              logInfo("waiting: " + waiting)
+              logInfo("failed: " + failed)
+              if (stage.shuffleDep != None) {
+                // We supply true to increment the epoch number here in case this is a
+                // recomputation of the map outputs. In that case, some nodes may have cached
+                // locations with holes (from when we detected the error) and will need the
+                // epoch incremented to refetch them.
+                // TODO: Only increment the epoch number if this is not the first time
+                //       we registered these map outputs.
+                mapOutputTracker.registerMapOutputs(
+                  stage.shuffleDep.get.shuffleId,
+                  stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
+                  changeEpoch = true)
+              }
+              clearCacheLocs()
+              if (stage.outputLocs.count(_ == Nil) != 0) {
+                // Some tasks had failed; let's resubmit this stage
+                // TODO: Lower-level scheduler should also deal with this
+                logInfo("Resubmitting " + stage + " (" + stage.name +
+                  ") because some of its tasks had failed: " +
+                  stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", "))
+                submitStage(stage)
+              } else {
+                val newlyRunnable = new ArrayBuffer[Stage]
+                for (stage <- waiting) {
+                  logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))
+                }
+                for (stage <- waiting if getMissingParentStages(stage) == Nil) {
+                  newlyRunnable += stage
+                }
+                waiting --= newlyRunnable
+                running ++= newlyRunnable
+                for (stage <- newlyRunnable.sortBy(_.id)) {
+                  logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
+                  submitMissingTasks(stage)
+                }
+              }
+            }
+          }
+
+      case Resubmitted =>
+        logInfo("Resubmitted " + task + ", so marking it as still running")
+        pendingTasks(stage) += task
+
+      case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
+        // Mark the stage that the reducer was in as unrunnable
+        val failedStage = stageIdToStage(task.stageId)
+        running -= failedStage
+        failed += failedStage
+        // TODO: Cancel running tasks in the stage
+        logInfo("Marking " + failedStage + " (" + failedStage.name +
+          ") for resubmision due to a fetch failure")
+        // Mark the map whose fetch failed as broken in the map stage
+        val mapStage = shuffleToMapStage(shuffleId)
+        if (mapId != -1) {
+          mapStage.removeOutputLoc(mapId, bmAddress)
+          mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
+        }
+        logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
+          "); marking it for resubmission")
+        failed += mapStage
+        // Remember that a fetch failed now; this is used to resubmit the broken
+        // stages later, after a small wait (to give other tasks the chance to fail)
+        lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock
+        // TODO: mark the executor as failed only if there were lots of fetch failures on it
+        if (bmAddress != null) {
+          handleExecutorLost(bmAddress.executorId, Some(task.epoch))
+        }
+
+      case ExceptionFailure(className, description, stackTrace, metrics) =>
+        // Do nothing here, left up to the TaskScheduler to decide how to handle user failures
+
+      case other =>
+        // Unrecognized failure - abort all jobs depending on this stage
+        abortStage(stageIdToStage(task.stageId), task + " failed: " + other)
+    }
+  }
+
+  /**
+   * Responds to an executor being lost. This is called inside the event loop, so it assumes it can
+   * modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
+   *
+   * Optionally the epoch during which the failure was caught can be passed to avoid allowing
+   * stray fetch failures from possibly retriggering the detection of a node as lost.
+   */
+  private def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) {
+    val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
+    if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
+      failedEpoch(execId) = currentEpoch
+      logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
+      blockManagerMaster.removeExecutor(execId)
+      // TODO: This will be really slow if we keep accumulating shuffle map stages
+      for ((shuffleId, stage) <- shuffleToMapStage) {
+        stage.removeOutputsOnExecutor(execId)
+        val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
+        mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
+      }
+      if (shuffleToMapStage.isEmpty) {
+        mapOutputTracker.incrementEpoch()
+      }
+      clearCacheLocs()
+    } else {
+      logDebug("Additional executor lost message for " + execId +
+               "(epoch " + currentEpoch + ")")
+    }
+  }
+
+  private def handleExecutorGained(execId: String, host: String) {
+    // remove from failedEpoch(execId) ?
+    if (failedEpoch.contains(execId)) {
+      logInfo("Host gained which was in lost list earlier: " + host)
+      failedEpoch -= execId
+    }
+  }
+
+  /**
+   * Aborts all jobs depending on a particular Stage. This is called in response to a task set
+   * being cancelled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
+   */
+  private def abortStage(failedStage: Stage, reason: String) {
+    val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
+    failedStage.completionTime = Some(System.currentTimeMillis())
+    for (resultStage <- dependentStages) {
+      val job = resultStageToJob(resultStage)
+      val error = new SparkException("Job failed: " + reason)
+      job.listener.jobFailed(error)
+      listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))
+      idToActiveJob -= resultStage.jobId
+      activeJobs -= job
+      resultStageToJob -= resultStage
+    }
+    if (dependentStages.isEmpty) {
+      logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
+    }
+  }
+
+  /**
+   * Return true if one of stage's ancestors is target.
+   */
+  private def stageDependsOn(stage: Stage, target: Stage): Boolean = {
+    if (stage == target) {
+      return true
+    }
+    val visitedRdds = new HashSet[RDD[_]]
+    val visitedStages = new HashSet[Stage]
+    def visit(rdd: RDD[_]) {
+      if (!visitedRdds(rdd)) {
+        visitedRdds += rdd
+        for (dep <- rdd.dependencies) {
+          dep match {
+            case shufDep: ShuffleDependency[_,_] =>
+              val mapStage = getShuffleMapStage(shufDep, stage.jobId)
+              if (!mapStage.isAvailable) {
+                visitedStages += mapStage
+                visit(mapStage.rdd)
+              }  // Otherwise there's no need to follow the dependency back
+            case narrowDep: NarrowDependency[_] =>
+              visit(narrowDep.rdd)
+          }
+        }
+      }
+    }
+    visit(stage.rdd)
+    visitedRdds.contains(target.rdd)
+  }
+
+  /**
+   * Synchronized method that might be called from other threads.
+   * @param rdd whose partitions are to be looked at
+   * @param partition to lookup locality information for
+   * @return list of machines that are preferred by the partition
+   */
+  private[spark]
+  def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized {
+    // If the partition is cached, return the cache locations
+    val cached = getCacheLocs(rdd)(partition)
+    if (!cached.isEmpty) {
+      return cached
+    }
+    // If the RDD has some placement preferences (as is the case for input RDDs), get those
+    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
+    if (!rddPrefs.isEmpty) {
+      return rddPrefs.map(host => TaskLocation(host))
+    }
+    // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
+    // that has any placement preferences. Ideally we would choose based on transfer sizes,
+    // but this will do for now.
+    rdd.dependencies.foreach(_ match {
+      case n: NarrowDependency[_] =>
+        for (inPart <- n.getParents(partition)) {
+          val locs = getPreferredLocs(n.rdd, inPart)
+          if (locs != Nil)
+            return locs
+        }
+      case _ =>
+    })
+    Nil
+  }
+
+  private def cleanup(cleanupTime: Long) {
+    var sizeBefore = stageIdToStage.size
+    stageIdToStage.clearOldValues(cleanupTime)
+    logInfo("stageIdToStage " + sizeBefore + " --> " + stageIdToStage.size)
+
+    sizeBefore = shuffleToMapStage.size
+    shuffleToMapStage.clearOldValues(cleanupTime)
+    logInfo("shuffleToMapStage " + sizeBefore + " --> " + shuffleToMapStage.size)
+
+    sizeBefore = pendingTasks.size
+    pendingTasks.clearOldValues(cleanupTime)
+    logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size)
+
+    sizeBefore = stageToInfos.size
+    stageToInfos.clearOldValues(cleanupTime)
+    logInfo("stageToInfos " + sizeBefore + " --> " + stageToInfos.size)
+  }
+
+  def stop() {
+    eventQueue.put(StopDAGScheduler)
+    metadataCleaner.cancel()
+    taskSched.stop()
+  }
+}


Mime
View raw message