spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [07/50] [abbrv] git commit: Merge branch 'master' into scala-2.10
Date Sat, 14 Dec 2013 08:41:51 GMT
Merge branch 'master' into scala-2.10


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0f2e3c6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0f2e3c6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0f2e3c6e

Branch: refs/heads/master
Commit: 0f2e3c6e31d56c627ff81cdc93289a7c7cb2ec16
Parents: 5429d62 3d4ad84
Author: Raymond Liu <raymond.liu@intel.com>
Authored: Tue Nov 12 15:14:21 2013 +0800
Committer: Raymond Liu <raymond.liu@intel.com>
Committed: Wed Nov 13 16:55:11 2013 +0800

----------------------------------------------------------------------
 README.md                                       |    2 +-
 bin/compute-classpath.sh                        |   22 +-
 bin/slaves.sh                                   |   19 +-
 bin/spark-daemon.sh                             |   21 +-
 bin/spark-daemons.sh                            |    2 +-
 bin/stop-slaves.sh                              |    2 -
 core/pom.xml                                    |    4 +
 .../spark/network/netty/FileClientHandler.java  |    3 +-
 .../spark/network/netty/FileServerHandler.java  |   23 +-
 .../spark/network/netty/PathResolver.java       |   11 +-
 .../hadoop/mapred/SparkHadoopMapRedUtil.scala   |   17 +-
 .../mapreduce/SparkHadoopMapReduceUtil.scala    |   33 +-
 .../scala/org/apache/spark/Aggregator.scala     |   49 +-
 .../apache/spark/BlockStoreShuffleFetcher.scala |   23 +-
 .../scala/org/apache/spark/CacheManager.scala   |   12 +-
 .../scala/org/apache/spark/FutureAction.scala   |  250 +++++
 .../apache/spark/InterruptibleIterator.scala    |   30 +
 .../org/apache/spark/MapOutputTracker.scala     |  169 +--
 .../scala/org/apache/spark/ShuffleFetcher.scala |    5 +-
 .../scala/org/apache/spark/SparkContext.scala   |  229 ++--
 .../main/scala/org/apache/spark/SparkEnv.scala  |   25 +-
 .../org/apache/spark/SparkHadoopWriter.scala    |   21 +-
 .../scala/org/apache/spark/TaskContext.scala    |   21 +-
 .../scala/org/apache/spark/TaskEndReason.scala  |    2 +
 .../apache/spark/api/java/JavaDoubleRDD.scala   |   24 +
 .../org/apache/spark/api/java/JavaPairRDD.scala |   35 +
 .../org/apache/spark/api/java/JavaRDD.scala     |   19 +
 .../java/function/DoubleFlatMapFunction.java    |   10 +-
 .../spark/api/java/function/DoubleFunction.java |    3 +-
 .../api/java/function/FlatMapFunction.scala     |    3 -
 .../api/java/function/FlatMapFunction2.scala    |    3 -
 .../spark/api/java/function/Function.java       |    2 +-
 .../spark/api/java/function/Function3.java      |   36 +
 .../api/java/function/PairFlatMapFunction.java  |    2 -
 .../spark/api/java/function/PairFunction.java   |    5 +-
 .../api/java/function/WrappedFunction3.scala    |   34 +
 .../org/apache/spark/api/python/PythonRDD.scala |    2 +-
 .../spark/broadcast/BitTorrentBroadcast.scala   | 1058 ------------------
 .../apache/spark/broadcast/HttpBroadcast.scala  |   13 +-
 .../apache/spark/broadcast/MultiTracker.scala   |  410 -------
 .../org/apache/spark/broadcast/SourceInfo.scala |   54 -
 .../spark/broadcast/TorrentBroadcast.scala      |  247 ++++
 .../apache/spark/broadcast/TreeBroadcast.scala  |  603 ----------
 .../org/apache/spark/deploy/DeployMessage.scala |   29 +-
 .../spark/deploy/ExecutorDescription.scala      |   34 +
 .../spark/deploy/FaultToleranceTest.scala       |  420 +++++++
 .../org/apache/spark/deploy/JsonProtocol.scala  |    3 +-
 .../apache/spark/deploy/LocalSparkCluster.scala |    7 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |   49 +-
 .../org/apache/spark/deploy/client/Client.scala |   80 +-
 .../spark/deploy/client/ClientListener.scala    |    4 +
 .../apache/spark/deploy/client/TestClient.scala |    7 +-
 .../spark/deploy/master/ApplicationInfo.scala   |   53 +-
 .../spark/deploy/master/ApplicationState.scala  |    2 +-
 .../spark/deploy/master/ExecutorInfo.scala      |    7 +-
 .../master/FileSystemPersistenceEngine.scala    |   90 ++
 .../deploy/master/LeaderElectionAgent.scala     |   45 +
 .../org/apache/spark/deploy/master/Master.scala |  232 +++-
 .../spark/deploy/master/MasterMessages.scala    |   46 +
 .../spark/deploy/master/PersistenceEngine.scala |   53 +
 .../spark/deploy/master/RecoveryState.scala     |   26 +
 .../deploy/master/SparkZooKeeperSession.scala   |  203 ++++
 .../apache/spark/deploy/master/WorkerInfo.scala |   42 +-
 .../spark/deploy/master/WorkerState.scala       |    2 +-
 .../master/ZooKeeperLeaderElectionAgent.scala   |  136 +++
 .../master/ZooKeeperPersistenceEngine.scala     |   85 ++
 .../spark/deploy/worker/ExecutorRunner.scala    |   15 +-
 .../org/apache/spark/deploy/worker/Worker.scala |  181 ++-
 .../spark/deploy/worker/WorkerArguments.scala   |    8 +-
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |    2 +-
 .../executor/CoarseGrainedExecutorBackend.scala |  127 +++
 .../org/apache/spark/executor/Executor.scala    |  167 ++-
 .../spark/executor/MesosExecutorBackend.scala   |   18 +-
 .../executor/StandaloneExecutorBackend.scala    |  119 --
 .../org/apache/spark/executor/TaskMetrics.scala |    5 +
 .../spark/network/ConnectionManager.scala       |    3 +-
 .../apache/spark/network/netty/FileHeader.scala |   22 +-
 .../spark/network/netty/ShuffleCopier.scala     |   27 +-
 .../spark/network/netty/ShuffleSender.scala     |    9 +-
 .../main/scala/org/apache/spark/package.scala   |    2 +
 .../org/apache/spark/rdd/AsyncRDDActions.scala  |  123 ++
 .../scala/org/apache/spark/rdd/BlockRDD.scala   |    6 +-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |    9 +-
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |   26 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  123 +-
 .../spark/rdd/MapPartitionsWithContextRDD.scala |   42 +
 .../spark/rdd/MapPartitionsWithIndexRDD.scala   |   42 -
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |   79 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |   16 +-
 .../spark/rdd/ParallelCollectionRDD.scala       |    5 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  106 +-
 .../org/apache/spark/rdd/ShuffledRDD.scala      |    2 +-
 .../org/apache/spark/rdd/SubtractedRDD.scala    |    2 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  200 ++--
 .../spark/scheduler/DAGSchedulerEvent.scala     |   29 +-
 .../spark/scheduler/DAGSchedulerSource.scala    |    2 +-
 .../spark/scheduler/InputFormatInfo.scala       |    7 +-
 .../org/apache/spark/scheduler/JobLogger.scala  |  676 ++++++-----
 .../org/apache/spark/scheduler/JobWaiter.scala  |   62 +-
 .../scala/org/apache/spark/scheduler/Pool.scala |    5 +-
 .../org/apache/spark/scheduler/ResultTask.scala |   48 +-
 .../spark/scheduler/SchedulableBuilder.scala    |    3 +
 .../apache/spark/scheduler/ShuffleMapTask.scala |   76 +-
 .../apache/spark/scheduler/SparkListener.scala  |   21 +-
 .../spark/scheduler/SparkListenerBus.scala      |    2 +
 .../org/apache/spark/scheduler/Stage.scala      |    6 +-
 .../org/apache/spark/scheduler/StageInfo.scala  |   13 +-
 .../scala/org/apache/spark/scheduler/Task.scala |   63 +-
 .../org/apache/spark/scheduler/TaskInfo.scala   |   20 +
 .../org/apache/spark/scheduler/TaskResult.scala |    3 +-
 .../apache/spark/scheduler/TaskScheduler.scala  |   10 +-
 .../spark/scheduler/TaskSchedulerListener.scala |   44 -
 .../org/apache/spark/scheduler/TaskSet.scala    |    4 +
 .../scheduler/cluster/ClusterScheduler.scala    |   55 +-
 .../cluster/ClusterTaskSetManager.scala         |  106 +-
 .../cluster/CoarseGrainedClusterMessage.scala   |   69 ++
 .../cluster/CoarseGrainedSchedulerBackend.scala |  238 ++++
 .../scheduler/cluster/SchedulerBackend.scala    |    6 +-
 .../cluster/SimrSchedulerBackend.scala          |   66 ++
 .../cluster/SparkDeploySchedulerBackend.scala   |   20 +-
 .../cluster/StandaloneClusterMessage.scala      |   63 --
 .../cluster/StandaloneSchedulerBackend.scala    |  200 ----
 .../scheduler/cluster/TaskResultGetter.scala    |   26 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |   15 +-
 .../spark/scheduler/local/LocalScheduler.scala  |  196 ++--
 .../scheduler/local/LocalTaskSetManager.scala   |   24 +-
 .../spark/serializer/KryoSerializer.scala       |   10 +-
 .../apache/spark/storage/BlockException.scala   |    2 +-
 .../spark/storage/BlockFetcherIterator.scala    |   24 +-
 .../org/apache/spark/storage/BlockId.scala      |  103 ++
 .../org/apache/spark/storage/BlockInfo.scala    |   81 ++
 .../org/apache/spark/storage/BlockManager.scala |  628 ++++-------
 .../spark/storage/BlockManagerMaster.scala      |    8 +-
 .../spark/storage/BlockManagerMasterActor.scala |   25 +-
 .../spark/storage/BlockManagerMessages.scala    |   16 +-
 .../spark/storage/BlockManagerSlaveActor.scala  |    1 +
 .../spark/storage/BlockManagerWorker.scala      |    4 +-
 .../org/apache/spark/storage/BlockMessage.scala |   38 +-
 .../spark/storage/BlockMessageArray.scala       |    7 +-
 .../spark/storage/BlockObjectWriter.scala       |  142 ++-
 .../org/apache/spark/storage/BlockStore.scala   |   14 +-
 .../apache/spark/storage/DiskBlockManager.scala |  151 +++
 .../org/apache/spark/storage/DiskStore.scala    |  280 +----
 .../org/apache/spark/storage/FileSegment.scala  |   28 +
 .../org/apache/spark/storage/MemoryStore.scala  |   34 +-
 .../spark/storage/ShuffleBlockManager.scala     |  200 +++-
 .../spark/storage/StoragePerfTester.scala       |   86 ++
 .../org/apache/spark/storage/StorageUtils.scala |   47 +-
 .../apache/spark/storage/ThreadingTest.scala    |    6 +-
 .../apache/spark/ui/UIWorkloadGenerator.scala   |    2 +-
 .../org/apache/spark/ui/jobs/IndexPage.scala    |    2 +-
 .../spark/ui/jobs/JobProgressListener.scala     |  105 +-
 .../org/apache/spark/ui/jobs/PoolTable.scala    |    8 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |   21 +-
 .../org/apache/spark/ui/jobs/StageTable.scala   |   26 +-
 .../org/apache/spark/ui/storage/RDDPage.scala   |   23 +-
 .../org/apache/spark/util/AppendOnlyMap.scala   |  230 ++++
 .../org/apache/spark/util/MetadataCleaner.scala |   37 +-
 .../scala/org/apache/spark/util/Utils.scala     |   45 +-
 .../apache/spark/util/collection/BitSet.scala   |  103 ++
 .../spark/util/collection/OpenHashMap.scala     |  154 +++
 .../spark/util/collection/OpenHashSet.scala     |  272 +++++
 .../collection/PrimitiveKeyOpenHashMap.scala    |  128 +++
 .../spark/util/collection/PrimitiveVector.scala |   53 +
 .../scala/org/apache/spark/BroadcastSuite.scala |   52 +-
 .../org/apache/spark/CacheManagerSuite.scala    |   21 +-
 .../org/apache/spark/CheckpointSuite.scala      |   10 +-
 .../org/apache/spark/DistributedSuite.scala     |   16 +-
 .../org/apache/spark/FileServerSuite.scala      |   16 +
 .../scala/org/apache/spark/JavaAPISuite.java    |   23 +-
 .../org/apache/spark/JobCancellationSuite.scala |  209 ++++
 .../apache/spark/MapOutputTrackerSuite.scala    |   20 +-
 .../apache/spark/deploy/JsonProtocolSuite.scala |    7 +-
 .../deploy/worker/ExecutorRunnerTest.scala      |   20 +
 .../apache/spark/rdd/AsyncRDDActionsSuite.scala |  176 +++
 .../spark/rdd/PairRDDFunctionsSuite.scala       |    2 +-
 .../scala/org/apache/spark/rdd/RDDSuite.scala   |   20 +
 .../spark/scheduler/DAGSchedulerSuite.scala     |   35 +-
 .../apache/spark/scheduler/JobLoggerSuite.scala |   17 +-
 .../spark/scheduler/SparkListenerSuite.scala    |  136 ++-
 .../cluster/ClusterTaskSetManagerSuite.scala    |   49 +-
 .../spark/scheduler/cluster/FakeTask.scala      |    5 +-
 .../cluster/TaskResultGetterSuite.scala         |    3 +-
 .../scheduler/local/LocalSchedulerSuite.scala   |   28 +-
 .../org/apache/spark/storage/BlockIdSuite.scala |  114 ++
 .../spark/storage/BlockManagerSuite.scala       |  102 +-
 .../spark/storage/DiskBlockManagerSuite.scala   |   84 ++
 .../apache/spark/util/AppendOnlyMapSuite.scala  |  154 +++
 .../spark/util/collection/BitSetSuite.scala     |   73 ++
 .../util/collection/OpenHashMapSuite.scala      |  148 +++
 .../util/collection/OpenHashSetSuite.scala      |  145 +++
 .../PrimitiveKeyOpenHashSetSuite.scala          |   90 ++
 docker/README.md                                |    5 +
 docker/build                                    |   22 +
 docker/spark-test/README.md                     |   11 +
 docker/spark-test/base/Dockerfile               |   38 +
 docker/spark-test/build                         |   22 +
 docker/spark-test/master/Dockerfile             |   21 +
 docker/spark-test/master/default_cmd            |   22 +
 docker/spark-test/worker/Dockerfile             |   22 +
 docker/spark-test/worker/default_cmd            |   22 +
 docs/cluster-overview.md                        |   14 +-
 docs/configuration.md                           |   10 +-
 docs/ec2-scripts.md                             |    2 +-
 docs/python-programming-guide.md                |   11 +
 docs/running-on-yarn.md                         |    9 +-
 docs/scala-programming-guide.md                 |    6 +-
 docs/spark-standalone.md                        |   75 ++
 docs/streaming-programming-guide.md             |    9 +-
 docs/tuning.md                                  |    2 +-
 ec2/spark_ec2.py                                |    2 +-
 examples/pom.xml                                |   36 +-
 .../streaming/examples/JavaKafkaWordCount.java  |   98 ++
 .../apache/spark/examples/BroadcastTest.scala   |   15 +-
 .../org/apache/spark/examples/SparkHdfsLR.scala |    3 +-
 .../org/apache/spark/examples/SparkKMeans.scala |    2 -
 .../org/apache/spark/examples/SparkPi.scala     |    2 +-
 .../streaming/examples/KafkaWordCount.scala     |   28 +-
 .../streaming/examples/MQTTWordCount.scala      |  107 ++
 .../clickstream/PageViewGenerator.scala         |   13 +-
 pom.xml                                         |  126 ++-
 project/SparkBuild.scala                        |   35 +-
 python/pyspark/accumulators.py                  |   13 +-
 python/pyspark/context.py                       |   50 +-
 .../org/apache/spark/repl/SparkILoop.scala      |   36 +-
 spark-class                                     |   15 +-
 .../kafka/0.7.2-spark/kafka-0.7.2-spark.jar     |  Bin 1358063 -> 0 bytes
 .../kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 |    1 -
 .../0.7.2-spark/kafka-0.7.2-spark.jar.sha1      |    1 -
 .../kafka/0.7.2-spark/kafka-0.7.2-spark.pom     |    9 -
 .../kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 |    1 -
 .../0.7.2-spark/kafka-0.7.2-spark.pom.sha1      |    1 -
 .../apache/kafka/kafka/maven-metadata-local.xml |   12 -
 .../kafka/kafka/maven-metadata-local.xml.md5    |    1 -
 .../kafka/kafka/maven-metadata-local.xml.sha1   |    1 -
 streaming/pom.xml                               |   57 +-
 .../org/apache/spark/streaming/Checkpoint.scala |    2 +
 .../org/apache/spark/streaming/DStream.scala    |   55 +-
 .../spark/streaming/NetworkInputTracker.scala   |   12 +-
 .../spark/streaming/PairDStreamFunctions.scala  |  155 ++-
 .../spark/streaming/StreamingContext.scala      |   52 +-
 .../spark/streaming/api/java/JavaDStream.scala  |    8 +-
 .../streaming/api/java/JavaDStreamLike.scala    |   97 +-
 .../streaming/api/java/JavaPairDStream.scala    |  186 ++-
 .../api/java/JavaStreamingContext.scala         |  108 +-
 .../streaming/dstream/KafkaInputDStream.scala   |   63 +-
 .../streaming/dstream/MQTTInputDStream.scala    |  110 ++
 .../streaming/dstream/NetworkInputDStream.scala |   14 +-
 .../streaming/dstream/RawInputDStream.scala     |    4 +-
 .../streaming/dstream/TransformedDStream.scala  |   20 +-
 .../streaming/receivers/ActorReceiver.scala     |    4 +-
 .../apache/spark/streaming/JavaAPISuite.java    |  425 ++++++-
 .../apache/spark/streaming/JavaTestUtils.scala  |   36 +-
 .../spark/streaming/BasicOperationsSuite.scala  |  141 ++-
 .../spark/streaming/CheckpointSuite.scala       |    4 +-
 .../spark/streaming/InputStreamsSuite.scala     |    8 +-
 .../apache/spark/streaming/TestSuiteBase.scala  |   61 +-
 .../tools/JavaAPICompletenessChecker.scala      |    4 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |   55 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |  186 ++-
 .../spark/deploy/yarn/ClientArguments.scala     |   25 +-
 .../spark/deploy/yarn/WorkerRunnable.scala      |   59 +-
 .../deploy/yarn/YarnAllocationHandler.scala     |    4 +-
 263 files changed, 11156 insertions(+), 5526 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/README.md
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/bin/compute-classpath.sh
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/SparkContext.scala
index 1b003cc,4a98491..cc44a4c
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@@ -24,11 -24,9 +24,11 @@@ import java.util.concurrent.atomic.Atom
  
  import scala.collection.Map
  import scala.collection.generic.Growable
- import scala.collection.JavaConversions._
+ import scala.collection.JavaConverters._
  import scala.collection.mutable.ArrayBuffer
  import scala.collection.mutable.HashMap
 +import scala.reflect.{ ClassTag, classTag}
 +import scala.util.DynamicVariable
  
  import org.apache.hadoop.conf.Configuration
  import org.apache.hadoop.fs.Path
@@@ -922,10 -1013,12 +1015,12 @@@ object SparkContext 
  
    // TODO: Add AccumulatorParams for other types, e.g. lists and strings
  
 -  implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
 +  implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
      new PairRDDFunctions(rdd)
  
 -  implicit def rddToAsyncRDDActions[T: ClassManifest](rdd: RDD[T]) = new AsyncRDDActions(rdd)
++  implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
+ 
 -  implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](
 +  implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
        rdd: RDD[(K, V)]) =
      new SequenceFileRDDFunctions(rdd)
  

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index f0a1960,043cb18..e5e20db
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@@ -84,8 -94,19 +97,19 @@@ class JavaDoubleRDD(val srdd: RDD[scala
      fromRDD(srdd.coalesce(numPartitions, shuffle))
  
    /**
+    * Return a new RDD that has exactly numPartitions partitions.
+    *
+    * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
+    * a shuffle to redistribute data.
+    *
+    * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
+    * which can avoid performing a shuffle.
+    */
+   def repartition(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.repartition(numPartitions))
+ 
+   /**
     * Return an RDD with the elements from `this` that are not in `other`.
 -   * 
 +   *
     * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
     * RDD will be <= us.
     */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 899e17d,2142fd7..eeea0ed
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@@ -599,4 -622,15 +623,15 @@@ object JavaPairRDD 
      new JavaPairRDD[K, V](rdd)
  
    implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
+ 
+ 
+   /** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
+   def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
 -    implicit val cmk: ClassManifest[K] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
 -    implicit val cmv: ClassManifest[V] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
++    implicit val cmk: ClassTag[K] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
++    implicit val cmv: ClassTag[V] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+     new JavaPairRDD[K, V](rdd.rdd)
+   }
+ 
  }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
index b7c0d78,2dfda8b..ed8fea9
--- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
@@@ -23,8 -21,5 +23,5 @@@ import scala.reflect.ClassTa
   * A function that returns zero or more output records from each input record.
   */
  abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
-   @throws(classOf[Exception])
-   def call(x: T) : java.lang.Iterable[R]
- 
 -  def elementType() : ClassManifest[R] = ClassManifest.Any.asInstanceOf[ClassManifest[R]]
 +  def elementType() :  ClassTag[R] = ClassTag.Any.asInstanceOf[ClassTag[R]]
  }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
index 7a505df,528e1c0..aae1349
--- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
@@@ -23,8 -21,5 +23,5 @@@ import scala.reflect.ClassTa
   * A function that takes two inputs and returns zero or more output records.
   */
  abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] {
-   @throws(classOf[Exception])
-   def call(a: A, b:B) : java.lang.Iterable[C]
- 
 -  def elementType() : ClassManifest[C] = ClassManifest.Any.asInstanceOf[ClassManifest[C]]
 +  def elementType() : ClassTag[C] = ClassTag.Any.asInstanceOf[ClassTag[C]]
  }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/api/java/function/Function.java
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/api/java/function/Function.java
index e971169,ce368ee..49e661a
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function.java
@@@ -29,10 -29,8 +29,10 @@@ import java.io.Serializable
   * when mapping RDDs of other types.
   */
  public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
 -  public ClassManifest<R> returnType() {
 -    return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
 +  public abstract R call(T t) throws Exception;
 +
 +  public ClassTag<R> returnType() {
-     return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
++    return ClassTag$.MODULE$.apply(Object.class);
    }
  }
  

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/api/java/function/Function3.java
index 0000000,ac61789..fb1dece
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
@@@ -1,0 -1,36 +1,36 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.api.java.function;
+ 
 -import scala.reflect.ClassManifest;
 -import scala.reflect.ClassManifest$;
++import scala.reflect.ClassTag;
++import scala.reflect.ClassTag$;
+ import scala.runtime.AbstractFunction2;
+ 
+ import java.io.Serializable;
+ 
+ /**
+  * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
+  */
+ public abstract class Function3<T1, T2, T3, R> extends WrappedFunction3<T1, T2, T3, R>
+         implements Serializable {
+ 
 -    public ClassManifest<R> returnType() {
 -        return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
++    public ClassTag<R> returnType() {
++        return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
+     }
+ }
+ 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
index fbd0cda,6d76a8f..ca485b3
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@@ -33,13 -33,11 +33,11 @@@ public abstract class PairFlatMapFuncti
    extends WrappedFunction1<T, Iterable<Tuple2<K, V>>>
    implements Serializable {
  
-   public abstract Iterable<Tuple2<K, V>> call(T t) throws Exception;
- 
 -  public ClassManifest<K> keyType() {
 -    return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
 +  public ClassTag<K> keyType() {
 +    return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class);
    }
  
 -  public ClassManifest<V> valueType() {
 -    return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
 +  public ClassTag<V> valueType() {
 +    return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class);
    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
index f095596,ede7cee..cbe2306
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
@@@ -28,17 -28,14 +28,14 @@@ import java.io.Serializable
   */
  // PairFunction does not extend Function because some UDF functions, like map,
  // are overloaded for both Function and PairFunction.
- public abstract class PairFunction<T, K, V>
-   extends WrappedFunction1<T, Tuple2<K, V>>
+ public abstract class PairFunction<T, K, V> extends WrappedFunction1<T, Tuple2<K, V>>
    implements Serializable {
  
-   public abstract Tuple2<K, V> call(T t) throws Exception;
- 
 -  public ClassManifest<K> keyType() {
 -    return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
 +  public ClassTag<K> keyType() {
 +    return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class);
    }
  
 -  public ClassManifest<V> valueType() {
 -    return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
 +  public ClassTag<V> valueType() {
 +    return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class);
    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 6a7d5a8,308a2bf..94cf4ff
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@@ -38,8 -38,8 +38,8 @@@ class LocalSparkCluster(numWorkers: Int
    private val localHostname = Utils.localHostName()
    private val masterActorSystems = ArrayBuffer[ActorSystem]()
    private val workerActorSystems = ArrayBuffer[ActorSystem]()
 -  
 +
-   def start(): String = {
+   def start(): Array[String] = {
      logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
  
      /* Start the Master */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 1643867,77422f6..be8693e
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@@ -19,14 -19,15 +19,15 @@@ package org.apache.spark.deploy.clien
  
  import java.util.concurrent.TimeoutException
  
 +import scala.concurrent.duration._
 +import scala.concurrent.Await
++import scala.concurrent.ExecutionContext.Implicits.global
 +
  import akka.actor._
  import akka.actor.Terminated
 +import akka.pattern.AskTimeoutException
  import akka.pattern.ask
 -import akka.util.Duration
 -import akka.util.duration._
 -import akka.remote.RemoteClientDisconnected
 -import akka.remote.RemoteClientLifeCycleEvent
 -import akka.remote.RemoteClientShutdown
 -import akka.dispatch.Await
 +import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
  
  import org.apache.spark.Logging
  import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@@@ -69,9 -73,46 +73,46 @@@ private[spark] class Client
        }
      }
  
+     def tryRegisterAllMasters() {
+       for (masterUrl <- masterUrls) {
+         logInfo("Connecting to master " + masterUrl + "...")
+         val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
+         actor ! RegisterApplication(appDescription)
+       }
+     }
+ 
+     def registerWithMaster() {
+       tryRegisterAllMasters()
+ 
+       var retries = 0
+       lazy val retryTimer: Cancellable =
+         context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
+           retries += 1
+           if (registered) {
+             retryTimer.cancel()
+           } else if (retries >= REGISTRATION_RETRIES) {
+             logError("All masters are unresponsive! Giving up.")
+             markDead()
+           } else {
+             tryRegisterAllMasters()
+           }
+         }
+       retryTimer // start timer
+     }
+ 
+     def changeMaster(url: String) {
+       activeMasterUrl = url
+       master = context.actorFor(Master.toAkkaUrl(url))
+       masterAddress = master.path.address
 -      context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
++      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+       context.watch(master)  // Doesn't work with remote actors, but useful for testing
+     }
+ 
      override def receive = {
-       case RegisteredApplication(appId_) =>
+       case RegisteredApplication(appId_, masterUrl) =>
          appId = appId_
+         registered = true
+         changeMaster(masterUrl)
          listener.connected(appId)
  
        case ApplicationRemoved(message) =>
@@@ -92,23 -133,27 +133,27 @@@
            listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
          }
  
+       case MasterChanged(masterUrl, masterWebUiUrl) =>
+         logInfo("Master has changed, new master is at " + masterUrl)
+         context.unwatch(master)
+         changeMaster(masterUrl)
+         alreadyDisconnected = false
+         sender ! MasterChangeAcknowledged(appId)
+ 
        case Terminated(actor_) if actor_ == master =>
-         logError("Connection to master failed; stopping client")
+         logWarning("Connection to master failed; waiting for master to reconnect...")
          markDisconnected()
-         context.stop(self)
  
 -      case RemoteClientDisconnected(transport, address) if address == masterAddress =>
 -        logWarning("Connection to master failed; waiting for master to reconnect...")
 +      case DisassociatedEvent(_, address, _) if address == masterAddress =>
 +        logError("Connection to master failed; stopping client")
          markDisconnected()
-         context.stop(self)
  
 -      case RemoteClientShutdown(transport, address) if address == masterAddress =>
 -        logWarning("Connection to master failed; waiting for master to reconnect...")
 +      case AssociationErrorEvent(_, _, address, _) if address == masterAddress =>
 +        logError("Connection to master failed; stopping client")
          markDisconnected()
-         context.stop(self)
  
        case StopClient =>
-         markDisconnected()
+         markDead()
          sender ! true
          context.stop(self)
      }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index 0000000,c0849ef..043945a
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@@ -1,0 -1,90 +1,90 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.deploy.master
+ 
+ import java.io._
+ 
+ import scala.Serializable
+ 
+ import akka.serialization.Serialization
+ import org.apache.spark.Logging
+ 
+ /**
+  * Stores data in a single on-disk directory with one file per application and worker.
+  * Files are deleted when applications and workers are removed.
+  *
+  * @param dir Directory to store files. Created if non-existent (but not recursively).
+  * @param serialization Used to serialize our objects.
+  */
+ private[spark] class FileSystemPersistenceEngine(
+     val dir: String,
+     val serialization: Serialization)
+   extends PersistenceEngine with Logging {
+ 
+   new File(dir).mkdir()
+ 
+   override def addApplication(app: ApplicationInfo) {
+     val appFile = new File(dir + File.separator + "app_" + app.id)
+     serializeIntoFile(appFile, app)
+   }
+ 
+   override def removeApplication(app: ApplicationInfo) {
+     new File(dir + File.separator + "app_" + app.id).delete()
+   }
+ 
+   override def addWorker(worker: WorkerInfo) {
+     val workerFile = new File(dir + File.separator + "worker_" + worker.id)
+     serializeIntoFile(workerFile, worker)
+   }
+ 
+   override def removeWorker(worker: WorkerInfo) {
+     new File(dir + File.separator + "worker_" + worker.id).delete()
+   }
+ 
+   override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
+     val sortedFiles = new File(dir).listFiles().sortBy(_.getName)
+     val appFiles = sortedFiles.filter(_.getName.startsWith("app_"))
+     val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
+     val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_"))
+     val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
+     (apps, workers)
+   }
+ 
 -  private def serializeIntoFile(file: File, value: Serializable) {
++  private def serializeIntoFile(file: File, value: AnyRef) {
+     val created = file.createNewFile()
+     if (!created) { throw new IllegalStateException("Could not create file: " + file) }
+ 
+     val serializer = serialization.findSerializerFor(value)
+     val serialized = serializer.toBinary(value)
+ 
+     val out = new FileOutputStream(file)
+     out.write(serialized)
+     out.close()
+   }
+ 
 -  def deserializeFromFile[T <: Serializable](file: File)(implicit m: Manifest[T]): T = {
++  def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
+     val fileData = new Array[Byte](file.length().asInstanceOf[Int])
+     val dis = new DataInputStream(new FileInputStream(file))
+     dis.readFully(fileData)
+     dis.close()
+ 
 -    val clazz = m.erasure.asInstanceOf[Class[T]]
++    val clazz = m.runtimeClass.asInstanceOf[Class[T]]
+     val serializer = serialization.serializerFor(clazz)
+     serializer.fromBinary(fileData).asInstanceOf[T]
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index cb0fe6a,cd91667..26f9807
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@@ -21,37 -21,23 +21,41 @@@ import java.util.Dat
  import java.text.SimpleDateFormat
  
  import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 +import scala.concurrent.Await
 +import scala.concurrent.duration._
++import scala.concurrent.duration.{ Duration, FiniteDuration }
++import scala.concurrent.ExecutionContext.Implicits.global
  
  import akka.actor._
 -import akka.actor.Terminated
 -import akka.dispatch.Await
  import akka.pattern.ask
 -import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
 -import akka.serialization.SerializationExtension
 -import akka.util.duration._
 -import akka.util.{Duration, Timeout}
 +import akka.remote._
++import akka.util.Timeout
  
  import org.apache.spark.{Logging, SparkException}
  import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
  import org.apache.spark.deploy.DeployMessages._
+ import org.apache.spark.deploy.master.MasterMessages._
  import org.apache.spark.deploy.master.ui.MasterWebUI
  import org.apache.spark.metrics.MetricsSystem
 -import org.apache.spark.util.{AkkaUtils, Utils}
 +import org.apache.spark.util.{Utils, AkkaUtils}
- import akka.util.Timeout
 +import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
 +import org.apache.spark.deploy.DeployMessages.KillExecutor
 +import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
 +import scala.Some
- import org.apache.spark.deploy.DeployMessages.WebUIPortResponse
 +import org.apache.spark.deploy.DeployMessages.LaunchExecutor
 +import org.apache.spark.deploy.DeployMessages.RegisteredApplication
 +import org.apache.spark.deploy.DeployMessages.RegisterWorker
 +import org.apache.spark.deploy.DeployMessages.ExecutorUpdated
 +import org.apache.spark.deploy.DeployMessages.MasterStateResponse
 +import org.apache.spark.deploy.DeployMessages.ExecutorAdded
 +import org.apache.spark.deploy.DeployMessages.RegisterApplication
 +import org.apache.spark.deploy.DeployMessages.ApplicationRemoved
 +import org.apache.spark.deploy.DeployMessages.Heartbeat
 +import org.apache.spark.deploy.DeployMessages.RegisteredWorker
 +import akka.actor.Terminated
++import akka.serialization.SerializationExtension
++import java.util.concurrent.TimeUnit
 +
  
  private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
    val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
@@@ -94,16 -91,40 +109,39 @@@
    val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
  
    override def preStart() {
-     logInfo("Starting Spark master at spark://" + host + ":" + port)
+     logInfo("Starting Spark master at " + masterUrl)
      // Listen for remote client disconnection events, since they don't go through Akka's watch()
 -    context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
 +    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
      webUi.start()
-     import context.dispatcher
+     masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
      context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
  
      masterMetricsSystem.registerSource(masterSource)
      masterMetricsSystem.start()
      applicationMetricsSystem.start()
+ 
+     persistenceEngine = RECOVERY_MODE match {
+       case "ZOOKEEPER" =>
+         logInfo("Persisting recovery state to ZooKeeper")
+         new ZooKeeperPersistenceEngine(SerializationExtension(context.system))
+       case "FILESYSTEM" =>
+         logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
+         new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
+       case _ =>
+         new BlackHolePersistenceEngine()
+     }
+ 
 -    leaderElectionAgent = context.actorOf(Props(
 -      RECOVERY_MODE match {
++    leaderElectionAgent = RECOVERY_MODE match {
+         case "ZOOKEEPER" =>
 -          new ZooKeeperLeaderElectionAgent(self, masterUrl)
++          context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl))
+         case _ =>
 -          new MonarchyLeaderAgent(self)
 -      }))
++          context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
++      }
+   }
+ 
+   override def preRestart(reason: Throwable, message: Option[Any]) {
+     super.preRestart(reason, message) // calls postStop()!
+     logError("Master actor restarted due to exception", reason)
    }
  
    override def postStop() {
@@@ -178,15 -262,17 +279,17 @@@
        // those we have an entry for in the corresponding actor hashmap
        actorToWorker.get(actor).foreach(removeWorker)
        actorToApp.get(actor).foreach(finishApplication)
+       if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
      }
  
 -    case RemoteClientDisconnected(transport, address) => {
 +    case DisassociatedEvent(_, address, _) => {
        // The disconnected client could've been either a worker or an app; remove whichever it was
        addressToWorker.get(address).foreach(removeWorker)
        addressToApp.get(address).foreach(finishApplication)
+       if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
      }
  
 -    case RemoteClientShutdown(transport, address) => {
 +    case AssociationErrorEvent(_, _, address, _) => {
        // The disconnected client could've been either a worker or an app; remove whichever it was
        addressToWorker.get(address).foreach(removeWorker)
        addressToApp.get(address).foreach(finishApplication)
@@@ -403,9 -552,9 +569,9 @@@ private[spark] object Master 
  
    def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
      val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
 -    val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
 -    val timeoutDuration = Duration.create(
 -      System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
 +    val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName)
-     val timeoutDuration = Duration.create(
-       System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
++    val timeoutDuration : FiniteDuration = Duration.create(
++      System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
      implicit val timeout = Timeout(timeoutDuration)
      val respFuture = actor ? RequestWebUIPort   // ask pattern
      val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
index fb3fe88,c8d34f2..0b36ef6
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
@@@ -17,8 -17,10 +17,8 @@@
  
  package org.apache.spark.deploy.master
  
 -private[spark] object WorkerState
 -  extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED", "UNKNOWN") {
 -
 +private[spark] object WorkerState extends Enumeration {
    type WorkerState = Value
  
-   val ALIVE, DEAD, DECOMMISSIONED = Value
+   val ALIVE, DEAD, DECOMMISSIONED, UNKNOWN = Value
  }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 0000000,a0233a7..825344b
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@@ -1,0 -1,85 +1,85 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.deploy.master
+ 
+ import org.apache.spark.Logging
+ import org.apache.zookeeper._
+ 
+ import akka.serialization.Serialization
+ 
+ class ZooKeeperPersistenceEngine(serialization: Serialization)
+   extends PersistenceEngine
+   with SparkZooKeeperWatcher
+   with Logging
+ {
+   val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
+ 
+   val zk = new SparkZooKeeperSession(this)
+ 
+   zk.connect()
+ 
+   override def zkSessionCreated() {
+     zk.mkdirRecursive(WORKING_DIR)
+   }
+ 
+   override def zkDown() {
+     logError("PersistenceEngine disconnected from ZooKeeper -- ZK looks down.")
+   }
+ 
+   override def addApplication(app: ApplicationInfo) {
+     serializeIntoFile(WORKING_DIR + "/app_" + app.id, app)
+   }
+ 
+   override def removeApplication(app: ApplicationInfo) {
+     zk.delete(WORKING_DIR + "/app_" + app.id)
+   }
+ 
+   override def addWorker(worker: WorkerInfo) {
+     serializeIntoFile(WORKING_DIR + "/worker_" + worker.id, worker)
+   }
+ 
+   override def removeWorker(worker: WorkerInfo) {
+     zk.delete(WORKING_DIR + "/worker_" + worker.id)
+   }
+ 
+   override def close() {
+     zk.close()
+   }
+ 
+   override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
+     val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted
+     val appFiles = sortedFiles.filter(_.startsWith("app_"))
+     val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
+     val workerFiles = sortedFiles.filter(_.startsWith("worker_"))
+     val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
+     (apps, workers)
+   }
+ 
 -  private def serializeIntoFile(path: String, value: Serializable) {
++  private def serializeIntoFile(path: String, value: AnyRef) {
+     val serializer = serialization.findSerializerFor(value)
+     val serialized = serializer.toBinary(value)
+     zk.create(path, serialized, CreateMode.PERSISTENT)
+   }
+ 
 -  def deserializeFromFile[T <: Serializable](filename: String)(implicit m: Manifest[T]): T = {
++  def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): T = {
+     val fileData = zk.getData("/spark/master_status/" + filename)
 -    val clazz = m.erasure.asInstanceOf[Class[T]]
++    val clazz = m.runtimeClass.asInstanceOf[Class[T]]
+     val serializer = serialization.serializerFor(clazz)
+     serializer.fromBinary(fileData).asInstanceOf[T]
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 3904b70,216d9d4..991b22d
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@@ -22,20 -22,22 +22,36 @@@ import java.util.Dat
  import java.io.File
  
  import scala.collection.mutable.HashMap
 +import scala.concurrent.duration._
++import scala.concurrent.ExecutionContext.Implicits.global
  
- import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+ import akka.actor._
 -import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
 -import akka.util.duration._
 +import akka.remote.{RemotingLifecycleEvent, AssociationErrorEvent, DisassociatedEvent}
  
  import org.apache.spark.Logging
- import org.apache.spark.deploy.ExecutorState
+ import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
  import org.apache.spark.deploy.DeployMessages._
  import org.apache.spark.deploy.master.Master
  import org.apache.spark.deploy.worker.ui.WorkerWebUI
  import org.apache.spark.metrics.MetricsSystem
  import org.apache.spark.util.{Utils, AkkaUtils}
- 
- 
++import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
++import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
++import org.apache.spark.deploy.DeployMessages.KillExecutor
++import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
++import scala.Some
++import akka.remote.DisassociatedEvent
++import org.apache.spark.deploy.DeployMessages.LaunchExecutor
++import org.apache.spark.deploy.DeployMessages.RegisterWorker
++import org.apache.spark.deploy.DeployMessages.WorkerSchedulerStateResponse
++import org.apache.spark.deploy.DeployMessages.MasterChanged
++import org.apache.spark.deploy.DeployMessages.Heartbeat
++import org.apache.spark.deploy.DeployMessages.RegisteredWorker
++import akka.actor.Terminated
+ 
+ /**
+   * @param masterUrls Each url should look like spark://host:port.
+   */
  private[spark] class Worker(
      host: String,
      port: Int,
@@@ -109,40 -122,92 +136,94 @@@
      metricsSystem.start()
    }
  
-   def connectToMaster() {
-     logInfo("Connecting to master " + masterUrl)
-     master = context.actorFor(Master.toAkkaUrl(masterUrl))
-     master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)
-     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
-     context.watch(master) // Doesn't work with remote actors, but useful for testing
+   def changeMaster(url: String, uiUrl: String) {
+     masterLock.synchronized {
+       activeMasterUrl = url
+       activeMasterWebUiUrl = uiUrl
+       master = context.actorFor(Master.toAkkaUrl(activeMasterUrl))
 -      context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
++      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+       context.watch(master) // Doesn't work with remote actors, but useful for testing
+       connected = true
+     }
+   }
+ 
+   def tryRegisterAllMasters() {
+     for (masterUrl <- masterUrls) {
+       logInfo("Connecting to master " + masterUrl + "...")
+       val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
+       actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get,
+         publicAddress)
+     }
+   }
+ 
+   def registerWithMaster() {
+     tryRegisterAllMasters()
+ 
+     var retries = 0
+     lazy val retryTimer: Cancellable =
+       context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
+         retries += 1
+         if (registered) {
+           retryTimer.cancel()
+         } else if (retries >= REGISTRATION_RETRIES) {
+           logError("All masters are unresponsive! Giving up.")
+           System.exit(1)
+         } else {
+           tryRegisterAllMasters()
+         }
+       }
+     retryTimer // start timer
    }
  
 +  import context.dispatcher
 +
    override def receive = {
-     case RegisteredWorker(url) =>
-       masterWebUiUrl = url
-       logInfo("Successfully registered with master")
-         context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
-         master ! Heartbeat(workerId)
+     case RegisteredWorker(masterUrl, masterWebUiUrl) =>
+       logInfo("Successfully registered with master " + masterUrl)
+       registered = true
+       changeMaster(masterUrl, masterWebUiUrl)
+       context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
+ 
+     case SendHeartbeat =>
+       masterLock.synchronized {
+         if (connected) { master ! Heartbeat(workerId) }
        }
  
+     case MasterChanged(masterUrl, masterWebUiUrl) =>
+       logInfo("Master has changed, new master is at " + masterUrl)
+       context.unwatch(master)
+       changeMaster(masterUrl, masterWebUiUrl)
+ 
+       val execs = executors.values.
+         map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
+       sender ! WorkerSchedulerStateResponse(workerId, execs.toList)
+ 
      case RegisterWorkerFailed(message) =>
-       logError("Worker registration failed: " + message)
-       System.exit(1)
- 
-     case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
-       logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
-       val manager = new ExecutorRunner(
-         appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
-       executors(appId + "/" + execId) = manager
-       manager.start()
-       coresUsed += cores_
-       memoryUsed += memory_
-       master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)
+       if (!registered) {
+         logError("Worker registration failed: " + message)
+         System.exit(1)
+       }
+ 
+     case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
+       if (masterUrl != activeMasterUrl) {
+         logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
+       } else {
+         logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+         val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
+           self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING)
+         executors(appId + "/" + execId) = manager
+         manager.start()
+         coresUsed += cores_
+         memoryUsed += memory_
+         masterLock.synchronized {
+           master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+         }
+       }
  
      case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
-       master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
+       masterLock.synchronized {
+         master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
+       }
        val fullId = appId + "/" + execId
        if (ExecutorState.isFinished(state)) {
          val executor = executors(fullId)
@@@ -155,17 -220,27 +236,21 @@@
          memoryUsed -= executor.memory
        }
  
-     case KillExecutor(appId, execId) =>
-       val fullId = appId + "/" + execId
-       executors.get(fullId) match {
-         case Some(executor) =>
-           logInfo("Asked to kill executor " + fullId)
-           executor.kill()
-         case None =>
-           logInfo("Asked to kill unknown executor " + fullId)
+     case KillExecutor(masterUrl, appId, execId) =>
+       if (masterUrl != activeMasterUrl) {
+         logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId)
+       } else {
+         val fullId = appId + "/" + execId
+         executors.get(fullId) match {
+           case Some(executor) =>
+             logInfo("Asked to kill executor " + fullId)
+             executor.kill()
+           case None =>
+             logInfo("Asked to kill unknown executor " + fullId)
+         }
        }
  
 -    case Terminated(actor_) if actor_ == master =>
 -      masterDisconnected()
 -
 -    case RemoteClientDisconnected(transport, address) if address == master.path.address =>
 -      masterDisconnected()
 -
 -    case RemoteClientShutdown(transport, address) if address == master.path.address =>
 +    case DisassociatedEvent(_, _, _) =>
        masterDisconnected()
  
      case RequestWorkerState => {
@@@ -207,8 -280,8 +290,8 @@@ private[spark] object Worker 
      // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
      val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
      val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
 -    val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory,
 -      masterUrls, workDir)), name = "Worker")
 +    actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
-       masterUrl, workDir), name = "Worker")
++      masterUrls, workDir), name = "Worker")
      (actorSystem, boundPort)
    }
  

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 0000000,caee6b0..73fa7d6
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@@ -1,0 -1,122 +1,127 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.executor
+ 
+ import java.nio.ByteBuffer
+ 
 -import akka.actor.{ActorRef, Actor, Props, Terminated}
 -import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
++import akka.actor._
++import akka.remote._
+ 
+ import org.apache.spark.{Logging, SparkEnv}
+ import org.apache.spark.TaskState.TaskState
+ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+ import org.apache.spark.util.{Utils, AkkaUtils}
++import akka.remote.DisassociatedEvent
++import akka.remote.AssociationErrorEvent
++import akka.remote.DisassociatedEvent
++import akka.actor.Terminated
+ 
+ 
+ private[spark] class CoarseGrainedExecutorBackend(
+     driverUrl: String,
+     executorId: String,
+     hostPort: String,
+     cores: Int)
+   extends Actor
+   with ExecutorBackend
+   with Logging {
+ 
+   Utils.checkHostPort(hostPort, "Expected hostport")
+ 
+   var executor: Executor = null
 -  var driver: ActorRef = null
++  var driver: ActorSelection = null
+ 
+   override def preStart() {
+     logInfo("Connecting to driver: " + driverUrl)
 -    driver = context.actorFor(driverUrl)
++    driver = context.actorSelection(driverUrl)
+     driver ! RegisterExecutor(executorId, hostPort, cores)
 -    context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
 -    context.watch(driver) // Doesn't work with remote actors, but useful for testing
++    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
++   // context.watch(driver) // Doesn't work with remote actors, but useful for testing
+   }
+ 
+   override def receive = {
+     case RegisteredExecutor(sparkProperties) =>
+       logInfo("Successfully registered with driver")
+       // Make this host instead of hostPort ?
+       executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
+ 
+     case RegisterExecutorFailed(message) =>
+       logError("Slave registration failed: " + message)
+       System.exit(1)
+ 
+     case LaunchTask(taskDesc) =>
+       logInfo("Got assigned task " + taskDesc.taskId)
+       if (executor == null) {
+         logError("Received LaunchTask command but executor was null")
+         System.exit(1)
+       } else {
+         executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+       }
+ 
+     case KillTask(taskId, _) =>
+       if (executor == null) {
+         logError("Received KillTask command but executor was null")
+         System.exit(1)
+       } else {
+         executor.killTask(taskId)
+       }
+ 
 -    case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
++    case DisassociatedEvent(_, _, _) =>
+       logError("Driver terminated or disconnected! Shutting down.")
+       System.exit(1)
+ 
+     case StopExecutor =>
+       logInfo("Driver commanded a shutdown")
+       context.stop(self)
+       context.system.shutdown()
+   }
+ 
+   override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
+     driver ! StatusUpdate(executorId, taskId, state, data)
+   }
+ }
+ 
+ private[spark] object CoarseGrainedExecutorBackend {
+   def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
+     // Debug code
+     Utils.checkHost(hostname)
+ 
+     // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
+     // before getting started with all our system properties, etc
+     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
+     // set it
+     val sparkHostPort = hostname + ":" + boundPort
+     System.setProperty("spark.hostPort", sparkHostPort)
 -    val actor = actorSystem.actorOf(
 -      Props(new CoarseGrainedExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
++
++    actorSystem.actorOf(
++      Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
+       name = "Executor")
+     actorSystem.awaitTermination()
+   }
+ 
+   def main(args: Array[String]) {
+     if (args.length < 4) {
+       //the reason we allow the last appid argument is to make it easy to kill rogue executors
+       System.err.println(
+         "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> <cores> " +
+         "[<appid>]")
+       System.exit(1)
+     }
+     run(args(0), args(1), args(2), args(3).toInt)
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/executor/Executor.scala
index 3800063,5c9bb9d..de45404
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@@ -101,18 -106,48 +106,44 @@@ private[spark] class Executor
    val executorSource = new ExecutorSource(this, executorId)
  
    // Initialize Spark environment (using system properties read above)
-   val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
-   SparkEnv.set(env)
-   env.metricsSystem.registerSource(executorSource)
+   private val env = {
+     if (!isLocal) {
+       val _env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0,
+         isDriver = false, isLocal = false)
+       SparkEnv.set(_env)
+       _env.metricsSystem.registerSource(executorSource)
+       _env
+     } else {
+       SparkEnv.get
+     }
+   }
  
 -  // Akka's message frame size. If task result is bigger than this, we use the block manager
 -  // to send the result back.
 -  private val akkaFrameSize = {
 -    env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
 -  }
 +  private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size")
  
    // Start worker thread pool
-   val threadPool = new ThreadPoolExecutor(
-     1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
+   val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
+ 
+   // Maintains the list of running tasks.
+   private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
+ 
+   val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
  
    def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
-     threadPool.execute(new TaskRunner(context, taskId, serializedTask))
+     val tr = new TaskRunner(context, taskId, serializedTask)
+     runningTasks.put(taskId, tr)
+     threadPool.execute(tr)
+   }
+ 
+   def killTask(taskId: Long) {
+     val tr = runningTasks.get(taskId)
+     if (tr != null) {
+       tr.kill()
+       // We remove the task also in the finally block in TaskRunner.run.
+       // The reason we need to remove it here is because killTask might be called before the task
+       // is even launched, and never reaching that finally block. ConcurrentHashMap's remove is
+       // idempotent.
+       runningTasks.remove(taskId)
+     }
    }
  
    /** Get the Yarn approved local directories. */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index 0000000,faaf837..44c5078
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@@ -1,0 -1,122 +1,123 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.rdd
+ 
+ import java.util.concurrent.atomic.AtomicLong
+ 
+ import scala.collection.mutable.ArrayBuffer
+ import scala.concurrent.ExecutionContext.Implicits.global
+ 
+ import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
++import scala.reflect.ClassTag
+ 
+ /**
+  * A set of asynchronous RDD actions available through an implicit conversion.
+  * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
+  */
 -class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with Logging {
++class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging {
+ 
+   /**
+    * Returns a future for counting the number of elements in the RDD.
+    */
+   def countAsync(): FutureAction[Long] = {
+     val totalCount = new AtomicLong
+     self.context.submitJob(
+       self,
+       (iter: Iterator[T]) => {
+         var result = 0L
+         while (iter.hasNext) {
+           result += 1L
+           iter.next()
+         }
+         result
+       },
+       Range(0, self.partitions.size),
+       (index: Int, data: Long) => totalCount.addAndGet(data),
+       totalCount.get())
+   }
+ 
+   /**
+    * Returns a future for retrieving all elements of this RDD.
+    */
+   def collectAsync(): FutureAction[Seq[T]] = {
+     val results = new Array[Array[T]](self.partitions.size)
+     self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size),
+       (index, data) => results(index) = data, results.flatten.toSeq)
+   }
+ 
+   /**
+    * Returns a future for retrieving the first num elements of the RDD.
+    */
+   def takeAsync(num: Int): FutureAction[Seq[T]] = {
+     val f = new ComplexFutureAction[Seq[T]]
+ 
+     f.run {
+       val results = new ArrayBuffer[T](num)
+       val totalParts = self.partitions.length
+       var partsScanned = 0
+       while (results.size < num && partsScanned < totalParts) {
+         // The number of partitions to try in this iteration. It is ok for this number to be
+         // greater than totalParts because we actually cap it at totalParts in runJob.
+         var numPartsToTry = 1
+         if (partsScanned > 0) {
+           // If we didn't find any rows after the first iteration, just try all partitions next.
+           // Otherwise, interpolate the number of partitions we need to try, but overestimate it
+           // by 50%.
+           if (results.size == 0) {
+             numPartsToTry = totalParts - 1
+           } else {
+             numPartsToTry = (1.5 * num * partsScanned / results.size).toInt
+           }
+         }
+         numPartsToTry = math.max(0, numPartsToTry)  // guard against negative num of partitions
+ 
+         val left = num - results.size
+         val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
+ 
+         val buf = new Array[Array[T]](p.size)
+         f.runJob(self,
+           (it: Iterator[T]) => it.take(left).toArray,
+           p,
+           (index: Int, data: Array[T]) => buf(index) = data,
+           Unit)
+ 
+         buf.foreach(results ++= _.take(num - results.size))
+         partsScanned += numPartsToTry
+       }
+       results.toSeq
+     }
+ 
+     f
+   }
+ 
+   /**
+    * Applies a function f to all elements of this RDD.
+    */
+   def foreachAsync(f: T => Unit): FutureAction[Unit] = {
+     self.context.submitJob[T, Unit, Unit](self, _.foreach(f), Range(0, self.partitions.size),
+       (index, data) => Unit, Unit)
+   }
+ 
+   /**
+    * Applies a function f to each partition of this RDD.
+    */
+   def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
+     self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.size),
+       (index, data) => Unit, Unit)
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index fe2946b,44ea573..63b9fe1
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@@ -18,15 -18,14 +18,15 @@@
  package org.apache.spark.rdd
  
  import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
- import org.apache.spark.storage.BlockManager
+ import org.apache.spark.storage.{BlockId, BlockManager}
 +import scala.reflect.ClassTag
  
- private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition {
+ private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition {
    val index = idx
  }
  
  private[spark]
- class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[String])
 -class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[BlockId])
++class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[BlockId])
    extends RDD[T](sc, Nil) {
  
    @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 3f4d4ad,d3033ea..99ea6e8
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@@ -17,8 -17,8 +17,9 @@@
  
  package org.apache.spark.rdd
  
 +import scala.reflect.ClassTag
  import org.apache.spark._
+ import org.apache.spark.deploy.SparkHadoopUtil
  import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter}
  import org.apache.hadoop.conf.Configuration
  import org.apache.hadoop.io.{NullWritable, BytesWritable}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithContextRDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithContextRDD.scala
index 0000000,aea08ff..6763675
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithContextRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithContextRDD.scala
@@@ -1,0 -1,41 +1,42 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.rdd
+ 
+ import org.apache.spark.{Partition, TaskContext}
++import scala.reflect.ClassTag
+ 
+ 
+ /**
+  * A variant of the MapPartitionsRDD that passes the TaskContext into the closure. From the
+  * TaskContext, the closure can either get access to the interruptible flag or get the index
+  * of the partition in the RDD.
+  */
+ private[spark]
 -class MapPartitionsWithContextRDD[U: ClassManifest, T: ClassManifest](
++class MapPartitionsWithContextRDD[U: ClassTag, T: ClassTag](
+     prev: RDD[T],
+     f: (TaskContext, Iterator[T]) => Iterator[U],
+     preservesPartitioning: Boolean
+   ) extends RDD[U](prev) {
+ 
+   override def getPartitions: Array[Partition] = firstParent[T].partitions
+ 
+   override val partitioner = if (preservesPartitioning) prev.partitioner else None
+ 
+   override def compute(split: Partition, context: TaskContext) =
+     f(context, firstParent[T].iterator(split, context))
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 731ef90,6e88be6..3c237ca
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@@ -429,41 -440,54 +443,54 @@@ abstract class RDD[T: ClassTag]
    /**
     * Return a new RDD by applying a function to each partition of this RDD.
     */
 -  def mapPartitions[U: ClassManifest](
 -      f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
 +  def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],
-     preservesPartitioning: Boolean = false): RDD[U] =
++    preservesPartitioning: Boolean = false): RDD[U] = {
      new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
+   }
  
    /**
     * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
     * of the original partition.
     */
 -  def mapPartitionsWithIndex[U: ClassManifest](
 +  def mapPartitionsWithIndex[U: ClassTag](
-     f: (Int, Iterator[T]) => Iterator[U],
-     preservesPartitioning: Boolean = false): RDD[U] =
-     new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
+       f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
+     val func = (context: TaskContext, iter: Iterator[T]) => f(context.partitionId, iter)
+     new MapPartitionsWithContextRDD(this, sc.clean(func), preservesPartitioning)
+   }
+ 
+   /**
+    * Return a new RDD by applying a function to each partition of this RDD. This is a variant of
+    * mapPartitions that also passes the TaskContext into the closure.
+    */
 -  def mapPartitionsWithContext[U: ClassManifest](
++  def mapPartitionsWithContext[U: ClassTag](
+       f: (TaskContext, Iterator[T]) => Iterator[U],
+       preservesPartitioning: Boolean = false): RDD[U] = {
+     new MapPartitionsWithContextRDD(this, sc.clean(f), preservesPartitioning)
+   }
  
    /**
     * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
     * of the original partition.
     */
    @deprecated("use mapPartitionsWithIndex", "0.7.0")
 -  def mapPartitionsWithSplit[U: ClassManifest](
 +  def mapPartitionsWithSplit[U: ClassTag](
-     f: (Int, Iterator[T]) => Iterator[U],
-     preservesPartitioning: Boolean = false): RDD[U] =
-     new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
+       f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
+     mapPartitionsWithIndex(f, preservesPartitioning)
+   }
  
    /**
     * Maps f over this RDD, where f takes an additional parameter of type A.  This
     * additional parameter is produced by constructA, which is called in each
     * partition with the index of that partition.
     */
-   def mapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)
-     (f:(T, A) => U): RDD[U] = {
-       def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
-         val a = constructA(index)
-         iter.map(t => f(t, a))
-       }
-     new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
 -  def mapWith[A: ClassManifest, U: ClassManifest]
++  def mapWith[A: ClassTag, U: ClassTag]
+       (constructA: Int => A, preservesPartitioning: Boolean = false)
+       (f: (T, A) => U): RDD[U] = {
+     def iterF(context: TaskContext, iter: Iterator[T]): Iterator[U] = {
+       val a = constructA(context.partitionId)
+       iter.map(t => f(t, a))
+     }
+     new MapPartitionsWithContextRDD(this, sc.clean(iterF _), preservesPartitioning)
    }
  
    /**
@@@ -471,13 -495,14 +498,14 @@@
     * additional parameter is produced by constructA, which is called in each
     * partition with the index of that partition.
     */
-   def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)
-     (f:(T, A) => Seq[U]): RDD[U] = {
-       def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
-         val a = constructA(index)
-         iter.flatMap(t => f(t, a))
-       }
-     new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
 -  def flatMapWith[A: ClassManifest, U: ClassManifest]
++  def flatMapWith[A: ClassTag, U: ClassTag]
+       (constructA: Int => A, preservesPartitioning: Boolean = false)
+       (f: (T, A) => Seq[U]): RDD[U] = {
+     def iterF(context: TaskContext, iter: Iterator[T]): Iterator[U] = {
+       val a = constructA(context.partitionId)
+       iter.flatMap(t => f(t, a))
+     }
+     new MapPartitionsWithContextRDD(this, sc.clean(iterF _), preservesPartitioning)
    }
  
    /**
@@@ -485,13 -510,12 +513,12 @@@
     * This additional parameter is produced by constructA, which is called in each
     * partition with the index of that partition.
     */
-   def foreachWith[A: ClassTag](constructA: Int => A)
-     (f:(T, A) => Unit) {
-       def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
-         val a = constructA(index)
-         iter.map(t => {f(t, a); t})
-       }
-     (new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {})
 -  def foreachWith[A: ClassManifest](constructA: Int => A)(f: (T, A) => Unit) {
++  def foreachWith[A: ClassTag](constructA: Int => A)(f: (T, A) => Unit) {
+     def iterF(context: TaskContext, iter: Iterator[T]): Iterator[T] = {
+       val a = constructA(context.partitionId)
+       iter.map(t => {f(t, a); t})
+     }
+     new MapPartitionsWithContextRDD(this, sc.clean(iterF _), true).foreach(_ => {})
    }
  
    /**
@@@ -499,13 -523,12 +526,12 @@@
     * additional parameter is produced by constructA, which is called in each
     * partition with the index of that partition.
     */
-   def filterWith[A: ClassTag](constructA: Int => A)
-     (p:(T, A) => Boolean): RDD[T] = {
-       def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
-         val a = constructA(index)
-         iter.filter(t => p(t, a))
-       }
-     new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)
 -  def filterWith[A: ClassManifest](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = {
++  def filterWith[A: ClassTag](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = {
+     def iterF(context: TaskContext, iter: Iterator[T]): Iterator[T] = {
+       val a = constructA(context.partitionId)
+       iter.filter(t => p(t, a))
+     }
+     new MapPartitionsWithContextRDD(this, sc.clean(iterF _), true)
    }
  
    /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 03fe0e0,d0b21e8..ab7b3a2
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@@ -275,19 -283,31 +284,31 @@@ class DAGScheduler
        callSite: String,
        allowLocal: Boolean,
        resultHandler: (Int, U) => Unit,
-       properties: Properties = null)
-     : (JobSubmitted, JobWaiter[U]) =
+       properties: Properties = null): JobWaiter[U] =
    {
+     // Check to make sure we are not launching a task on a partition that does not exist.
+     val maxPartitions = rdd.partitions.length
+     partitions.find(p => p >= maxPartitions).foreach { p =>
+       throw new IllegalArgumentException(
+         "Attempting to access a non-existent partition: " + p + ". " +
+           "Total number of partitions: " + maxPartitions)
+     }
+ 
+     val jobId = nextJobId.getAndIncrement()
+     if (partitions.size == 0) {
+       return new JobWaiter[U](this, jobId, 0, resultHandler)
+     }
+ 
      assert(partitions.size > 0)
-     val waiter = new JobWaiter(partitions.size, resultHandler)
      val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
-     val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter,
-       properties)
-     (toSubmit, waiter)
+     val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
+     eventQueue.put(JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite,
+       waiter, properties))
+     waiter
    }
  
 -  def runJob[T, U: ClassManifest](
 +  def runJob[T, U: ClassTag](
-       finalRdd: RDD[T],
+       rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        callSite: String,


Mime
View raw message