spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [23/69] [abbrv] [partial] Initial work to rename package to org.apache.spark
Date Sun, 01 Sep 2013 21:59:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
deleted file mode 100644
index 0db13ff..0000000
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy
-
-import scala.collection.immutable.List
-
-import spark.Utils
-import spark.deploy.ExecutorState.ExecutorState
-import spark.deploy.master.{WorkerInfo, ApplicationInfo}
-import spark.deploy.worker.ExecutorRunner
-
-
-private[deploy] sealed trait DeployMessage extends Serializable
-
-private[deploy] object DeployMessages {
-
-  // Worker to Master
-
-  case class RegisterWorker(
-      id: String,
-      host: String,
-      port: Int,
-      cores: Int,
-      memory: Int,
-      webUiPort: Int,
-      publicAddress: String)
-    extends DeployMessage {
-    Utils.checkHost(host, "Required hostname")
-    assert (port > 0)
-  }
-
-  case class ExecutorStateChanged(
-      appId: String,
-      execId: Int,
-      state: ExecutorState,
-      message: Option[String],
-      exitStatus: Option[Int])
-    extends DeployMessage
-
-  case class Heartbeat(workerId: String) extends DeployMessage
-
-  // Master to Worker
-
-  case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
-
-  case class RegisterWorkerFailed(message: String) extends DeployMessage
-
-  case class KillExecutor(appId: String, execId: Int) extends DeployMessage
-
-  case class LaunchExecutor(
-      appId: String,
-      execId: Int,
-      appDesc: ApplicationDescription,
-      cores: Int,
-      memory: Int,
-      sparkHome: String)
-    extends DeployMessage
-
-  // Client to Master
-
-  case class RegisterApplication(appDescription: ApplicationDescription)
-    extends DeployMessage
-
-  // Master to Client
-
-  case class RegisteredApplication(appId: String) extends DeployMessage
-
-  // TODO(matei): replace hostPort with host
-  case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
-    Utils.checkHostPort(hostPort, "Required hostport")
-  }
-
-  case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
-    exitStatus: Option[Int])
-
-  case class ApplicationRemoved(message: String)
-
-  // Internal message in Client
-
-  case object StopClient
-
-  // MasterWebUI To Master
-
-  case object RequestMasterState
-
-  // Master to MasterWebUI
-
-  case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
-    activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
-
-    Utils.checkHost(host, "Required hostname")
-    assert (port > 0)
-
-    def uri = "spark://" + host + ":" + port
-  }
-
-  //  WorkerWebUI to Worker
-
-  case object RequestWorkerState
-
-  // Worker to WorkerWebUI
-
-  case class WorkerStateResponse(host: String, port: Int, workerId: String,
-    executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String,
-    cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {
-
-    Utils.checkHost(host, "Required hostname")
-    assert (port > 0)
-  }
-
-  // Actor System to Master
-
-  case object CheckForWorkerTimeOut
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/ExecutorState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/ExecutorState.scala b/core/src/main/scala/spark/deploy/ExecutorState.scala
deleted file mode 100644
index 08c9a3b..0000000
--- a/core/src/main/scala/spark/deploy/ExecutorState.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy
-
-private[spark] object ExecutorState
-  extends Enumeration("LAUNCHING", "LOADING", "RUNNING", "KILLED", "FAILED", "LOST") {
-
-  val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
-
-  type ExecutorState = Value
-
-  def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala
deleted file mode 100644
index f8dcf02..0000000
--- a/core/src/main/scala/spark/deploy/JsonProtocol.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy
-
-import net.liftweb.json.JsonDSL._
-
-import spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
-import spark.deploy.master.{ApplicationInfo, WorkerInfo}
-import spark.deploy.worker.ExecutorRunner
-
-
-private[spark] object JsonProtocol {
- def writeWorkerInfo(obj: WorkerInfo) = {
-   ("id" -> obj.id) ~
-   ("host" -> obj.host) ~
-   ("port" -> obj.port) ~
-   ("webuiaddress" -> obj.webUiAddress) ~
-   ("cores" -> obj.cores) ~
-   ("coresused" -> obj.coresUsed) ~
-   ("memory" -> obj.memory) ~
-   ("memoryused" -> obj.memoryUsed) ~
-   ("state" -> obj.state.toString)
- }
-
-  def writeApplicationInfo(obj: ApplicationInfo) = {
-    ("starttime" -> obj.startTime) ~
-    ("id" -> obj.id) ~
-    ("name" -> obj.desc.name) ~
-    ("cores" -> obj.desc.maxCores) ~
-    ("user" ->  obj.desc.user) ~
-    ("memoryperslave" -> obj.desc.memoryPerSlave) ~
-    ("submitdate" -> obj.submitDate.toString)
-  }
-
-  def writeApplicationDescription(obj: ApplicationDescription) = {
-    ("name" -> obj.name) ~
-    ("cores" -> obj.maxCores) ~
-    ("memoryperslave" -> obj.memoryPerSlave) ~
-    ("user" -> obj.user)
-  }
-
-  def writeExecutorRunner(obj: ExecutorRunner) = {
-    ("id" -> obj.execId) ~
-    ("memory" -> obj.memory) ~
-    ("appid" -> obj.appId) ~
-    ("appdesc" -> writeApplicationDescription(obj.appDesc))
-  }
-
-  def writeMasterState(obj: MasterStateResponse) = {
-    ("url" -> ("spark://" + obj.uri)) ~
-    ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
-    ("cores" -> obj.workers.map(_.cores).sum) ~
-    ("coresused" -> obj.workers.map(_.coresUsed).sum) ~
-    ("memory" -> obj.workers.map(_.memory).sum) ~
-    ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
-    ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
-    ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
-  }
-
-  def writeWorkerState(obj: WorkerStateResponse) = {
-    ("id" -> obj.workerId) ~
-    ("masterurl" -> obj.masterUrl) ~
-    ("masterwebuiurl" -> obj.masterWebUiUrl) ~
-    ("cores" -> obj.cores) ~
-    ("coresused" -> obj.coresUsed) ~
-    ("memory" -> obj.memory) ~
-    ("memoryused" -> obj.memoryUsed) ~
-    ("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
-    ("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
deleted file mode 100644
index 6b8e9f2..0000000
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy
-
-import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
-
-import spark.deploy.worker.Worker
-import spark.deploy.master.Master
-import spark.util.AkkaUtils
-import spark.{Logging, Utils}
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Testing class that creates a Spark standalone process in-cluster (that is, running the
- * spark.deploy.master.Master and spark.deploy.worker.Workers in the same JVMs). Executors launched
- * by the Workers still run in separate JVMs. This can be used to test distributed operation and
- * fault recovery without spinning up a lot of processes.
- */
-private[spark]
-class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
-  
-  private val localHostname = Utils.localHostName()
-  private val masterActorSystems = ArrayBuffer[ActorSystem]()
-  private val workerActorSystems = ArrayBuffer[ActorSystem]()
-  
-  def start(): String = {
-    logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
-
-    /* Start the Master */
-    val (masterSystem, masterPort) = Master.startSystemAndActor(localHostname, 0, 0)
-    masterActorSystems += masterSystem
-    val masterUrl = "spark://" + localHostname + ":" + masterPort
-
-    /* Start the Workers */
-    for (workerNum <- 1 to numWorkers) {
-      val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
-        memoryPerWorker, masterUrl, null, Some(workerNum))
-      workerActorSystems += workerSystem
-    }
-
-    return masterUrl
-  }
-
-  def stop() {
-    logInfo("Shutting down local Spark cluster.")
-    // Stop the workers before the master so they don't get upset that it disconnected
-    workerActorSystems.foreach(_.shutdown())
-    workerActorSystems.foreach(_.awaitTermination())
-
-    masterActorSystems.foreach(_.shutdown())
-    masterActorSystems.foreach(_.awaitTermination())
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala
deleted file mode 100644
index 882161e..0000000
--- a/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-
-
-/**
- * Contains util methods to interact with Hadoop from spark.
- */
-class SparkHadoopUtil {
-
-  // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
-  def newConfiguration(): Configuration = new Configuration()
-
-  // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
-  def addCredentials(conf: JobConf) {}
-
-  def isYarnMode(): Boolean = { false }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/WebUI.scala b/core/src/main/scala/spark/deploy/WebUI.scala
deleted file mode 100644
index 8ea7792..0000000
--- a/core/src/main/scala/spark/deploy/WebUI.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy
-
-import java.text.SimpleDateFormat
-import java.util.Date
-
-/**
- * Utilities used throughout the web UI.
- */
-private[spark] object DeployWebUI {
-  val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
-
-  def formatDate(date: Date): String = DATE_FORMAT.format(date)
-
-  def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp))
-
-  def formatDuration(milliseconds: Long): String = {
-    val seconds = milliseconds.toDouble / 1000
-    if (seconds < 60) {
-      return "%.0f s".format(seconds)
-    }
-    val minutes = seconds / 60
-    if (minutes < 10) {
-      return "%.1f min".format(minutes)
-    } else if (minutes < 60) {
-      return "%.0f min".format(minutes)
-    }
-    val hours = minutes / 60
-    return "%.1f h".format(hours)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/client/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
deleted file mode 100644
index 9d5ba8a..0000000
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.client
-
-import java.util.concurrent.TimeoutException
-
-import akka.actor._
-import akka.actor.Terminated
-import akka.pattern.ask
-import akka.util.Duration
-import akka.remote.RemoteClientDisconnected
-import akka.remote.RemoteClientLifeCycleEvent
-import akka.remote.RemoteClientShutdown
-import akka.dispatch.Await
-
-import spark.Logging
-import spark.deploy.{ApplicationDescription, ExecutorState}
-import spark.deploy.DeployMessages._
-import spark.deploy.master.Master
-
-
-/**
- * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
- * and a listener for cluster events, and calls back the listener when various events occur.
- */
-private[spark] class Client(
-    actorSystem: ActorSystem,
-    masterUrl: String,
-    appDescription: ApplicationDescription,
-    listener: ClientListener)
-  extends Logging {
-
-  var actor: ActorRef = null
-  var appId: String = null
-
-  class ClientActor extends Actor with Logging {
-    var master: ActorRef = null
-    var masterAddress: Address = null
-    var alreadyDisconnected = false  // To avoid calling listener.disconnected() multiple times
-
-    override def preStart() {
-      logInfo("Connecting to master " + masterUrl)
-      try {
-        master = context.actorFor(Master.toAkkaUrl(masterUrl))
-        masterAddress = master.path.address
-        master ! RegisterApplication(appDescription)
-        context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
-        context.watch(master)  // Doesn't work with remote actors, but useful for testing
-      } catch {
-        case e: Exception =>
-          logError("Failed to connect to master", e)
-          markDisconnected()
-          context.stop(self)
-      }
-    }
-
-    override def receive = {
-      case RegisteredApplication(appId_) =>
-        appId = appId_
-        listener.connected(appId)
-
-      case ApplicationRemoved(message) =>
-        logError("Master removed our application: %s; stopping client".format(message))
-        markDisconnected()
-        context.stop(self)
-
-      case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
-        val fullId = appId + "/" + id
-        logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores))
-        listener.executorAdded(fullId, workerId, hostPort, cores, memory)
-
-      case ExecutorUpdated(id, state, message, exitStatus) =>
-        val fullId = appId + "/" + id
-        val messageText = message.map(s => " (" + s + ")").getOrElse("")
-        logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
-        if (ExecutorState.isFinished(state)) {
-          listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
-        }
-
-      case Terminated(actor_) if actor_ == master =>
-        logError("Connection to master failed; stopping client")
-        markDisconnected()
-        context.stop(self)
-
-      case RemoteClientDisconnected(transport, address) if address == masterAddress =>
-        logError("Connection to master failed; stopping client")
-        markDisconnected()
-        context.stop(self)
-
-      case RemoteClientShutdown(transport, address) if address == masterAddress =>
-        logError("Connection to master failed; stopping client")
-        markDisconnected()
-        context.stop(self)
-
-      case StopClient =>
-        markDisconnected()
-        sender ! true
-        context.stop(self)
-    }
-
-    /**
-     * Notify the listener that we disconnected, if we hadn't already done so before.
-     */
-    def markDisconnected() {
-      if (!alreadyDisconnected) {
-        listener.disconnected()
-        alreadyDisconnected = true
-      }
-    }
-  }
-
-  def start() {
-    // Just launch an actor; it will call back into the listener.
-    actor = actorSystem.actorOf(Props(new ClientActor))
-  }
-
-  def stop() {
-    if (actor != null) {
-      try {
-        val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-        val future = actor.ask(StopClient)(timeout)
-        Await.result(future, timeout)
-      } catch {
-        case e: TimeoutException =>
-          logInfo("Stop request to Master timed out; it may already be shut down.")
-      }
-      actor = null
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/client/ClientListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/client/ClientListener.scala b/core/src/main/scala/spark/deploy/client/ClientListener.scala
deleted file mode 100644
index 0640244..0000000
--- a/core/src/main/scala/spark/deploy/client/ClientListener.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.client
-
-/**
- * Callbacks invoked by deploy client when various events happen. There are currently four events:
- * connecting to the cluster, disconnecting, being given an executor, and having an executor
- * removed (either due to failure or due to revocation).
- *
- * Users of this API should *not* block inside the callback methods.
- */
-private[spark] trait ClientListener {
-  def connected(appId: String): Unit
-
-  def disconnected(): Unit
-
-  def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
-
-  def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/client/TestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/client/TestClient.scala b/core/src/main/scala/spark/deploy/client/TestClient.scala
deleted file mode 100644
index 4f4daa1..0000000
--- a/core/src/main/scala/spark/deploy/client/TestClient.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.client
-
-import spark.util.AkkaUtils
-import spark.{Logging, Utils}
-import spark.deploy.{Command, ApplicationDescription}
-
-private[spark] object TestClient {
-
-  class TestListener extends ClientListener with Logging {
-    def connected(id: String) {
-      logInfo("Connected to master, got app ID " + id)
-    }
-
-    def disconnected() {
-      logInfo("Disconnected from master")
-      System.exit(0)
-    }
-
-    def executorAdded(id: String, workerId: String, hostPort: String, cores: Int, memory: Int) {}
-
-    def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {}
-  }
-
-  def main(args: Array[String]) {
-    val url = args(0)
-    val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0)
-    val desc = new ApplicationDescription(
-      "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored")
-    val listener = new TestListener
-    val client = new Client(actorSystem, url, desc, listener)
-    client.start()
-    actorSystem.awaitTermination()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/client/TestExecutor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/client/TestExecutor.scala b/core/src/main/scala/spark/deploy/client/TestExecutor.scala
deleted file mode 100644
index 8a22b6b..0000000
--- a/core/src/main/scala/spark/deploy/client/TestExecutor.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.client
-
-private[spark] object TestExecutor {
-  def main(args: Array[String]) {
-    println("Hello world!")
-    while (true) {
-      Thread.sleep(1000)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
deleted file mode 100644
index 6dd2f06..0000000
--- a/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master
-
-import spark.deploy.ApplicationDescription
-import java.util.Date
-import akka.actor.ActorRef
-import scala.collection.mutable
-
-private[spark] class ApplicationInfo(
-    val startTime: Long,
-    val id: String,
-    val desc: ApplicationDescription,
-    val submitDate: Date,
-    val driver: ActorRef,
-    val appUiUrl: String)
-{
-  var state = ApplicationState.WAITING
-  var executors = new mutable.HashMap[Int, ExecutorInfo]
-  var coresGranted = 0
-  var endTime = -1L
-  val appSource = new ApplicationSource(this)
-
-  private var nextExecutorId = 0
-
-  def newExecutorId(): Int = {
-    val id = nextExecutorId
-    nextExecutorId += 1
-    id
-  }
-
-  def addExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = {
-    val exec = new ExecutorInfo(newExecutorId(), this, worker, cores, desc.memoryPerSlave)
-    executors(exec.id) = exec
-    coresGranted += cores
-    exec
-  }
-
-  def removeExecutor(exec: ExecutorInfo) {
-    if (executors.contains(exec.id)) {
-      executors -= exec.id
-      coresGranted -= exec.cores
-    }
-  }
-
-  def coresLeft: Int = desc.maxCores - coresGranted
-
-  private var _retryCount = 0
-
-  def retryCount = _retryCount
-
-  def incrementRetryCount = {
-    _retryCount += 1
-    _retryCount
-  }
-
-  def markFinished(endState: ApplicationState.Value) {
-    state = endState
-    endTime = System.currentTimeMillis()
-  }
-
-  def duration: Long = {
-    if (endTime != -1) {
-      endTime - startTime
-    } else {
-      System.currentTimeMillis() - startTime
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/ApplicationSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/spark/deploy/master/ApplicationSource.scala
deleted file mode 100644
index 4df2b6b..0000000
--- a/core/src/main/scala/spark/deploy/master/ApplicationSource.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-package spark.deploy.master
-
-import com.codahale.metrics.{Gauge, MetricRegistry}
-
-import spark.metrics.source.Source
-
-class ApplicationSource(val application: ApplicationInfo) extends Source {
-  val metricRegistry = new MetricRegistry()
-  val sourceName = "%s.%s.%s".format("application", application.desc.name,
-    System.currentTimeMillis())
-
-  metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {
-    override def getValue: String = application.state.toString
-  })
-
-  metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] {
-    override def getValue: Long = application.duration
-  })
-
-  metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
-    override def getValue: Int = application.coresGranted
-  })
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/ApplicationState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/ApplicationState.scala b/core/src/main/scala/spark/deploy/master/ApplicationState.scala
deleted file mode 100644
index 94f0ad8..0000000
--- a/core/src/main/scala/spark/deploy/master/ApplicationState.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master
-
-private[spark] object ApplicationState
-  extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
-
-  type ApplicationState = Value
-
-  val WAITING, RUNNING, FINISHED, FAILED = Value
-
-  val MAX_NUM_RETRY = 10
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala
deleted file mode 100644
index 99b60f7..0000000
--- a/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master
-
-import spark.deploy.ExecutorState
-
-private[spark] class ExecutorInfo(
-    val id: Int,
-    val application: ApplicationInfo,
-    val worker: WorkerInfo,
-    val cores: Int,
-    val memory: Int) {
-
-  var state = ExecutorState.LAUNCHING
-
-  def fullId: String = application.id + "/" + id
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
deleted file mode 100644
index 04af5e1..0000000
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master
-
-import java.text.SimpleDateFormat
-import java.util.Date
-
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-
-import akka.actor._
-import akka.actor.Terminated
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
-import akka.util.duration._
-
-import spark.{Logging, SparkException, Utils}
-import spark.deploy.{ApplicationDescription, ExecutorState}
-import spark.deploy.DeployMessages._
-import spark.deploy.master.ui.MasterWebUI
-import spark.metrics.MetricsSystem
-import spark.util.AkkaUtils
-
-
-private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
-  val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
-  val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
-  val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
-  val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
- 
-  var nextAppNumber = 0
-  val workers = new HashSet[WorkerInfo]
-  val idToWorker = new HashMap[String, WorkerInfo]
-  val actorToWorker = new HashMap[ActorRef, WorkerInfo]
-  val addressToWorker = new HashMap[Address, WorkerInfo]
-
-  val apps = new HashSet[ApplicationInfo]
-  val idToApp = new HashMap[String, ApplicationInfo]
-  val actorToApp = new HashMap[ActorRef, ApplicationInfo]
-  val addressToApp = new HashMap[Address, ApplicationInfo]
-
-  val waitingApps = new ArrayBuffer[ApplicationInfo]
-  val completedApps = new ArrayBuffer[ApplicationInfo]
-
-  var firstApp: Option[ApplicationInfo] = None
-
-  Utils.checkHost(host, "Expected hostname")
-
-  val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
-  val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
-  val masterSource = new MasterSource(this)
-
-  val webUi = new MasterWebUI(this, webUiPort)
-
-  val masterPublicAddress = {
-    val envVar = System.getenv("SPARK_PUBLIC_DNS")
-    if (envVar != null) envVar else host
-  }
-
-  // As a temporary workaround before better ways of configuring memory, we allow users to set
-  // a flag that will perform round-robin scheduling across the nodes (spreading out each app
-  // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
-  val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
-
-  override def preStart() {
-    logInfo("Starting Spark master at spark://" + host + ":" + port)
-    // Listen for remote client disconnection events, since they don't go through Akka's watch()
-    context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
-    webUi.start()
-    context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
-
-    masterMetricsSystem.registerSource(masterSource)
-    masterMetricsSystem.start()
-    applicationMetricsSystem.start()
-  }
-
-  override def postStop() {
-    webUi.stop()
-    masterMetricsSystem.stop()
-    applicationMetricsSystem.stop()
-  }
-
-  override def receive = {
-    case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
-      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
-        host, workerPort, cores, Utils.megabytesToString(memory)))
-      if (idToWorker.contains(id)) {
-        sender ! RegisterWorkerFailed("Duplicate worker ID")
-      } else {
-        addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
-        context.watch(sender)  // This doesn't work with remote actors but helps for testing
-        sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
-        schedule()
-      }
-    }
-
-    case RegisterApplication(description) => {
-      logInfo("Registering app " + description.name)
-      val app = addApplication(description, sender)
-      logInfo("Registered app " + description.name + " with ID " + app.id)
-      waitingApps += app
-      context.watch(sender)  // This doesn't work with remote actors but helps for testing
-      sender ! RegisteredApplication(app.id)
-      schedule()
-    }
-
-    case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
-      val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
-      execOption match {
-        case Some(exec) => {
-          exec.state = state
-          exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
-          if (ExecutorState.isFinished(state)) {
-            val appInfo = idToApp(appId)
-            // Remove this executor from the worker and app
-            logInfo("Removing executor " + exec.fullId + " because it is " + state)
-            appInfo.removeExecutor(exec)
-            exec.worker.removeExecutor(exec)
-
-            // Only retry certain number of times so we don't go into an infinite loop.
-            if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
-              schedule()
-            } else {
-              logError("Application %s with ID %s failed %d times, removing it".format(
-                appInfo.desc.name, appInfo.id, appInfo.retryCount))
-              removeApplication(appInfo, ApplicationState.FAILED)
-            }
-          }
-        }
-        case None =>
-          logWarning("Got status update for unknown executor " + appId + "/" + execId)
-      }
-    }
-
-    case Heartbeat(workerId) => {
-      idToWorker.get(workerId) match {
-        case Some(workerInfo) =>
-          workerInfo.lastHeartbeat = System.currentTimeMillis()
-        case None =>
-          logWarning("Got heartbeat from unregistered worker " + workerId)
-      }
-    }
-
-    case Terminated(actor) => {
-      // The disconnected actor could've been either a worker or an app; remove whichever of
-      // those we have an entry for in the corresponding actor hashmap
-      actorToWorker.get(actor).foreach(removeWorker)
-      actorToApp.get(actor).foreach(finishApplication)
-    }
-
-    case RemoteClientDisconnected(transport, address) => {
-      // The disconnected client could've been either a worker or an app; remove whichever it was
-      addressToWorker.get(address).foreach(removeWorker)
-      addressToApp.get(address).foreach(finishApplication)
-    }
-
-    case RemoteClientShutdown(transport, address) => {
-      // The disconnected client could've been either a worker or an app; remove whichever it was
-      addressToWorker.get(address).foreach(removeWorker)
-      addressToApp.get(address).foreach(finishApplication)
-    }
-
-    case RequestMasterState => {
-      sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
-    }
-
-    case CheckForWorkerTimeOut => {
-      timeOutDeadWorkers()
-    }
-  }
-
-  /**
-   * Can an app use the given worker? True if the worker has enough memory and we haven't already
-   * launched an executor for the app on it (right now the standalone backend doesn't like having
-   * two executors on the same worker).
-   */
-  def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
-    worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
-  }
-
-  /**
-   * Schedule the currently available resources among waiting apps. This method will be called
-   * every time a new app joins or resource availability changes.
-   */
-  def schedule() {
-    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
-    // in the queue, then the second app, etc.
-    if (spreadOutApps) {
-      // Try to spread out each app among all the nodes, until it has all its cores
-      for (app <- waitingApps if app.coresLeft > 0) {
-        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
-                                   .filter(canUse(app, _)).sortBy(_.coresFree).reverse
-        val numUsable = usableWorkers.length
-        val assigned = new Array[Int](numUsable) // Number of cores to give on each node
-        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
-        var pos = 0
-        while (toAssign > 0) {
-          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
-            toAssign -= 1
-            assigned(pos) += 1
-          }
-          pos = (pos + 1) % numUsable
-        }
-        // Now that we've decided how many cores to give on each node, let's actually give them
-        for (pos <- 0 until numUsable) {
-          if (assigned(pos) > 0) {
-            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
-            launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome)
-            app.state = ApplicationState.RUNNING
-          }
-        }
-      }
-    } else {
-      // Pack each app into as few nodes as possible until we've assigned all its cores
-      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
-        for (app <- waitingApps if app.coresLeft > 0) {
-          if (canUse(app, worker)) {
-            val coresToUse = math.min(worker.coresFree, app.coresLeft)
-            if (coresToUse > 0) {
-              val exec = app.addExecutor(worker, coresToUse)
-              launchExecutor(worker, exec, app.desc.sparkHome)
-              app.state = ApplicationState.RUNNING
-            }
-          }
-        }
-      }
-    }
-  }
-
-  def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
-    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
-    worker.addExecutor(exec)
-    worker.actor ! LaunchExecutor(
-      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
-    exec.application.driver ! ExecutorAdded(
-      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
-  }
-
-  def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
-    publicAddress: String): WorkerInfo = {
-    // There may be one or more refs to dead workers on this same node (w/ different ID's),
-    // remove them.
-    workers.filter { w =>
-      (w.host == host && w.port == port) && (w.state == WorkerState.DEAD)
-    }.foreach { w =>
-      workers -= w
-    }
-    val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
-    workers += worker
-    idToWorker(worker.id) = worker
-    actorToWorker(sender) = worker
-    addressToWorker(sender.path.address) = worker
-    worker
-  }
-
-  def removeWorker(worker: WorkerInfo) {
-    logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
-    worker.setState(WorkerState.DEAD)
-    idToWorker -= worker.id
-    actorToWorker -= worker.actor
-    addressToWorker -= worker.actor.path.address
-    for (exec <- worker.executors.values) {
-      logInfo("Telling app of lost executor: " + exec.id)
-      exec.application.driver ! ExecutorUpdated(
-        exec.id, ExecutorState.LOST, Some("worker lost"), None)
-      exec.application.removeExecutor(exec)
-    }
-  }
-
-  def addApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
-    val now = System.currentTimeMillis()
-    val date = new Date(now)
-    val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
-    applicationMetricsSystem.registerSource(app.appSource)
-    apps += app
-    idToApp(app.id) = app
-    actorToApp(driver) = app
-    addressToApp(driver.path.address) = app
-    if (firstApp == None) {
-      firstApp = Some(app)
-    }
-    val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray
-    if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) {
-      logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
-    }
-    app
-  }
-
-  def finishApplication(app: ApplicationInfo) {
-    removeApplication(app, ApplicationState.FINISHED)
-  }
-
-  def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
-    if (apps.contains(app)) {
-      logInfo("Removing app " + app.id)
-      apps -= app
-      idToApp -= app.id
-      actorToApp -= app.driver
-      addressToApp -= app.driver.path.address
-      if (completedApps.size >= RETAINED_APPLICATIONS) {
-        val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
-        completedApps.take(toRemove).foreach( a => {
-          applicationMetricsSystem.removeSource(a.appSource)
-        })
-        completedApps.trimStart(toRemove)
-      }
-      completedApps += app // Remember it in our history
-      waitingApps -= app
-      for (exec <- app.executors.values) {
-        exec.worker.removeExecutor(exec)
-        exec.worker.actor ! KillExecutor(exec.application.id, exec.id)
-        exec.state = ExecutorState.KILLED
-      }
-      app.markFinished(state)
-      if (state != ApplicationState.FINISHED) {
-        app.driver ! ApplicationRemoved(state.toString)
-      }
-      schedule()
-    }
-  }
-
-  /** Generate a new app ID given a app's submission date */
-  def newApplicationId(submitDate: Date): String = {
-    val appId = "app-%s-%04d".format(DATE_FORMAT.format(submitDate), nextAppNumber)
-    nextAppNumber += 1
-    appId
-  }
-
-  /** Check for, and remove, any timed-out workers */
-  def timeOutDeadWorkers() {
-    // Copy the workers into an array so we don't modify the hashset while iterating through it
-    val currentTime = System.currentTimeMillis()
-    val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray
-    for (worker <- toRemove) {
-      if (worker.state != WorkerState.DEAD) {
-        logWarning("Removing %s because we got no heartbeat in %d seconds".format(
-          worker.id, WORKER_TIMEOUT/1000))
-        removeWorker(worker)
-      } else {
-        if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
-          workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it 
-      }
-    }
-  }
-}
-
-private[spark] object Master {
-  private val systemName = "sparkMaster"
-  private val actorName = "Master"
-  private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
-
-  def main(argStrings: Array[String]) {
-    val args = new MasterArguments(argStrings)
-    val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
-    actorSystem.awaitTermination()
-  }
-
-  /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
-  def toAkkaUrl(sparkUrl: String): String = {
-    sparkUrl match {
-      case sparkUrlRegex(host, port) =>
-        "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
-      case _ =>
-        throw new SparkException("Invalid master URL: " + sparkUrl)
-    }
-  }
-
-  def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = {
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
-    val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
-    (actorSystem, boundPort)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/MasterArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/spark/deploy/master/MasterArguments.scala
deleted file mode 100644
index 0ae0160..0000000
--- a/core/src/main/scala/spark/deploy/master/MasterArguments.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master
-
-import spark.util.IntParam
-import spark.Utils
-
-/**
- * Command-line parser for the master.
- */
-private[spark] class MasterArguments(args: Array[String]) {
-  var host = Utils.localHostName()
-  var port = 7077
-  var webUiPort = 8080
-  
-  // Check for settings in environment variables 
-  if (System.getenv("SPARK_MASTER_HOST") != null) {
-    host = System.getenv("SPARK_MASTER_HOST")
-  }
-  if (System.getenv("SPARK_MASTER_PORT") != null) {
-    port = System.getenv("SPARK_MASTER_PORT").toInt
-  }
-  if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
-    webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
-  }
-  if (System.getProperty("master.ui.port") != null) {
-    webUiPort = System.getProperty("master.ui.port").toInt
-  }
-
-  parse(args.toList)
-
-  def parse(args: List[String]): Unit = args match {
-    case ("--ip" | "-i") :: value :: tail =>
-      Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
-      host = value
-      parse(tail)
-
-    case ("--host" | "-h") :: value :: tail =>
-      Utils.checkHost(value, "Please use hostname " + value)
-      host = value
-      parse(tail)
-
-    case ("--port" | "-p") :: IntParam(value) :: tail =>
-      port = value
-      parse(tail)
-
-    case "--webui-port" :: IntParam(value) :: tail =>
-      webUiPort = value
-      parse(tail)
-
-    case ("--help" | "-h") :: tail =>
-      printUsageAndExit(0)
-
-    case Nil => {}
-
-    case _ =>
-      printUsageAndExit(1)
-  }
-
-  /**
-   * Print usage and exit JVM with the given exit code.
-   */
-  def printUsageAndExit(exitCode: Int) {
-    System.err.println(
-      "Usage: Master [options]\n" +
-      "\n" +
-      "Options:\n" +
-      "  -i HOST, --ip HOST     Hostname to listen on (deprecated, please use --host or -h) \n" +
-      "  -h HOST, --host HOST   Hostname to listen on\n" +
-      "  -p PORT, --port PORT   Port to listen on (default: 7077)\n" +
-      "  --webui-port PORT      Port for web UI (default: 8080)")
-    System.exit(exitCode)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/MasterSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/MasterSource.scala b/core/src/main/scala/spark/deploy/master/MasterSource.scala
deleted file mode 100644
index b8cfa6a..0000000
--- a/core/src/main/scala/spark/deploy/master/MasterSource.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-package spark.deploy.master
-
-import com.codahale.metrics.{Gauge, MetricRegistry}
-
-import spark.metrics.source.Source
-
-private[spark] class MasterSource(val master: Master) extends Source {
-  val metricRegistry = new MetricRegistry()
-  val sourceName = "master"
-
-  // Gauge for worker numbers in cluster
-  metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
-    override def getValue: Int = master.workers.size
-  })
-
-  // Gauge for application numbers in cluster
-  metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
-    override def getValue: Int = master.apps.size
-  })
-
-  // Gauge for waiting application numbers in cluster
-  metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
-    override def getValue: Int = master.waitingApps.size
-  })
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
deleted file mode 100644
index 4135cfe..0000000
--- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master
-
-import akka.actor.ActorRef
-import scala.collection.mutable
-import spark.Utils
-
-private[spark] class WorkerInfo(
-  val id: String,
-  val host: String,
-  val port: Int,
-  val cores: Int,
-  val memory: Int,
-  val actor: ActorRef,
-  val webUiPort: Int,
-  val publicAddress: String) {
-
-  Utils.checkHost(host, "Expected hostname")
-  assert (port > 0)
-
-  var executors = new mutable.HashMap[String, ExecutorInfo]  // fullId => info
-  var state: WorkerState.Value = WorkerState.ALIVE
-  var coresUsed = 0
-  var memoryUsed = 0
-
-  var lastHeartbeat = System.currentTimeMillis()
-
-  def coresFree: Int = cores - coresUsed
-  def memoryFree: Int = memory - memoryUsed
-
-  def hostPort: String = {
-    assert (port > 0)
-    host + ":" + port
-  }
-
-  def addExecutor(exec: ExecutorInfo) {
-    executors(exec.fullId) = exec
-    coresUsed += exec.cores
-    memoryUsed += exec.memory
-  }
-
-  def removeExecutor(exec: ExecutorInfo) {
-    if (executors.contains(exec.fullId)) {
-      executors -= exec.fullId
-      coresUsed -= exec.cores
-      memoryUsed -= exec.memory
-    }
-  }
-
-  def hasExecutor(app: ApplicationInfo): Boolean = {
-    executors.values.exists(_.application == app)
-  }
-
-  def webUiAddress : String = {
-    "http://" + this.publicAddress + ":" + this.webUiPort
-  }
-
-  def setState(state: WorkerState.Value) = {
-    this.state = state
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/WorkerState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/WorkerState.scala b/core/src/main/scala/spark/deploy/master/WorkerState.scala
deleted file mode 100644
index 3e50b77..0000000
--- a/core/src/main/scala/spark/deploy/master/WorkerState.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master
-
-private[spark] object WorkerState extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED") {
-  type WorkerState = Value
-
-  val ALIVE, DEAD, DECOMMISSIONED = Value
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
deleted file mode 100644
index 2ad98f7..0000000
--- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master.ui
-
-import scala.xml.Node
-
-import akka.dispatch.Await
-import akka.pattern.ask
-import akka.util.duration._
-
-import javax.servlet.http.HttpServletRequest
-
-import net.liftweb.json.JsonAST.JValue
-
-import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
-import spark.deploy.JsonProtocol
-import spark.deploy.master.ExecutorInfo
-import spark.ui.UIUtils
-import spark.Utils
-
-private[spark] class ApplicationPage(parent: MasterWebUI) {
-  val master = parent.masterActorRef
-  implicit val timeout = parent.timeout
-
-  /** Executor details for a particular application */
-  def renderJson(request: HttpServletRequest): JValue = {
-    val appId = request.getParameter("appId")
-    val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
-    val state = Await.result(stateFuture, 30 seconds)
-    val app = state.activeApps.find(_.id == appId).getOrElse({
-      state.completedApps.find(_.id == appId).getOrElse(null)
-    })
-    JsonProtocol.writeApplicationInfo(app)
-  }
-
-  /** Executor details for a particular application */
-  def render(request: HttpServletRequest): Seq[Node] = {
-    val appId = request.getParameter("appId")
-    val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
-    val state = Await.result(stateFuture, 30 seconds)
-    val app = state.activeApps.find(_.id == appId).getOrElse({
-      state.completedApps.find(_.id == appId).getOrElse(null)
-    })
-
-    val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs")
-    val executors = app.executors.values.toSeq
-    val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors)
-
-    val content =
-        <div class="row-fluid">
-          <div class="span12">
-            <ul class="unstyled">
-              <li><strong>ID:</strong> {app.id}</li>
-              <li><strong>Name:</strong> {app.desc.name}</li>
-              <li><strong>User:</strong> {app.desc.user}</li>
-              <li><strong>Cores:</strong>
-                {
-                if (app.desc.maxCores == Integer.MAX_VALUE) {
-                  "Unlimited (%s granted)".format(app.coresGranted)
-                } else {
-                  "%s (%s granted, %s left)".format(
-                    app.desc.maxCores, app.coresGranted, app.coresLeft)
-                }
-                }
-              </li>
-              <li>
-                <strong>Executor Memory:</strong>
-                {Utils.megabytesToString(app.desc.memoryPerSlave)}
-              </li>
-              <li><strong>Submit Date:</strong> {app.submitDate}</li>
-              <li><strong>State:</strong> {app.state}</li>
-              <li><strong><a href={app.appUiUrl}>Application Detail UI</a></strong></li>
-            </ul>
-          </div>
-        </div>
-
-        <div class="row-fluid"> <!-- Executors -->
-          <div class="span12">
-            <h4> Executor Summary </h4>
-            {executorTable}
-          </div>
-        </div>;
-    UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
-  }
-
-  def executorRow(executor: ExecutorInfo): Seq[Node] = {
-    <tr>
-      <td>{executor.id}</td>
-      <td>
-        <a href={executor.worker.webUiAddress}>{executor.worker.id}</a>
-      </td>
-      <td>{executor.cores}</td>
-      <td>{executor.memory}</td>
-      <td>{executor.state}</td>
-      <td>
-        <a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout"
-          .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
-        <a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr"
-          .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
-      </td>
-    </tr>
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
deleted file mode 100644
index 093e523..0000000
--- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master.ui
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import akka.dispatch.Await
-import akka.pattern.ask
-import akka.util.duration._
-
-import net.liftweb.json.JsonAST.JValue
-
-import spark.Utils
-import spark.deploy.DeployWebUI
-import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
-import spark.deploy.JsonProtocol
-import spark.deploy.master.{ApplicationInfo, WorkerInfo}
-import spark.ui.UIUtils
-
-private[spark] class IndexPage(parent: MasterWebUI) {
-  val master = parent.masterActorRef
-  implicit val timeout = parent.timeout
-
-  def renderJson(request: HttpServletRequest): JValue = {
-    val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
-    val state = Await.result(stateFuture, 30 seconds)
-    JsonProtocol.writeMasterState(state)
-  }
-
-  /** Index view listing applications and executors */
-  def render(request: HttpServletRequest): Seq[Node] = {
-    val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
-    val state = Await.result(stateFuture, 30 seconds)
-
-    val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
-    val workers = state.workers.sortBy(_.id)
-    val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
-
-    val appHeaders = Seq("ID", "Name", "Cores", "Memory per Node", "Submitted Time", "User",
-      "State", "Duration")
-    val activeApps = state.activeApps.sortBy(_.startTime).reverse
-    val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
-    val completedApps = state.completedApps.sortBy(_.endTime).reverse
-    val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
-
-    val content =
-        <div class="row-fluid">
-          <div class="span12">
-            <ul class="unstyled">
-              <li><strong>URL:</strong> {state.uri}</li>
-              <li><strong>Workers:</strong> {state.workers.size}</li>
-              <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
-                {state.workers.map(_.coresUsed).sum} Used</li>
-              <li><strong>Memory:</strong>
-                {Utils.megabytesToString(state.workers.map(_.memory).sum)} Total,
-                {Utils.megabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
-              <li><strong>Applications:</strong>
-                {state.activeApps.size} Running,
-                {state.completedApps.size} Completed </li>
-            </ul>
-          </div>
-        </div>
-
-        <div class="row-fluid">
-          <div class="span12">
-            <h4> Workers </h4>
-            {workerTable}
-          </div>
-        </div>
-
-        <div class="row-fluid">
-          <div class="span12">
-            <h4> Running Applications </h4>
-
-            {activeAppsTable}
-          </div>
-        </div>
-
-        <div class="row-fluid">
-          <div class="span12">
-            <h4> Completed Applications </h4>
-            {completedAppsTable}
-          </div>
-        </div>;
-    UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
-  }
-
-  def workerRow(worker: WorkerInfo): Seq[Node] = {
-    <tr>
-      <td>
-        <a href={worker.webUiAddress}>{worker.id}</a>
-      </td>
-      <td>{worker.host}:{worker.port}</td>
-      <td>{worker.state}</td>
-      <td>{worker.cores} ({worker.coresUsed} Used)</td>
-      <td sorttable_customkey={"%s.%s".format(worker.memory, worker.memoryUsed)}>
-        {Utils.megabytesToString(worker.memory)}
-        ({Utils.megabytesToString(worker.memoryUsed)} Used)
-      </td>
-    </tr>
-  }
-
-
-  def appRow(app: ApplicationInfo): Seq[Node] = {
-    <tr>
-      <td>
-        <a href={"app?appId=" + app.id}>{app.id}</a>
-      </td>
-      <td>
-        <a href={app.appUiUrl}>{app.desc.name}</a>
-      </td>
-      <td>
-        {app.coresGranted}
-      </td>
-      <td sorttable_customkey={app.desc.memoryPerSlave.toString}>
-        {Utils.megabytesToString(app.desc.memoryPerSlave)}
-      </td>
-      <td>{DeployWebUI.formatDate(app.submitDate)}</td>
-      <td>{app.desc.user}</td>
-      <td>{app.state.toString}</td>
-      <td>{DeployWebUI.formatDuration(app.duration)}</td>
-    </tr>
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
deleted file mode 100644
index c91e1db..0000000
--- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master.ui
-
-import akka.util.Duration
-
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.server.{Handler, Server}
-
-import spark.{Logging, Utils}
-import spark.deploy.master.Master
-import spark.ui.JettyUtils
-import spark.ui.JettyUtils._
-
-/**
- * Web UI server for the standalone master.
- */
-private[spark]
-class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
-  implicit val timeout = Duration.create(
-    System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-  val host = Utils.localHostName()
-  val port = requestedPort
-
-  val masterActorRef = master.self
-
-  var server: Option[Server] = None
-  var boundPort: Option[Int] = None
-
-  val applicationPage = new ApplicationPage(this)
-  val indexPage = new IndexPage(this)
-
-  def start() {
-    try {
-      val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
-      server = Some(srv)
-      boundPort = Some(bPort)
-      logInfo("Started Master web UI at http://%s:%d".format(host, boundPort.get))
-    } catch {
-      case e: Exception =>
-        logError("Failed to create Master JettyUtils", e)
-        System.exit(1)
-    }
-  }
-
-  val metricsHandlers = master.masterMetricsSystem.getServletHandlers ++
-    master.applicationMetricsSystem.getServletHandlers
-
-  val handlers = metricsHandlers ++ Array[(String, Handler)](
-    ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
-    ("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
-    ("/app", (request: HttpServletRequest) => applicationPage.render(request)),
-    ("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
-    ("*", (request: HttpServletRequest) => indexPage.render(request))
-  )
-
-  def stop() {
-    server.foreach(_.stop())
-  }
-}
-
-private[spark] object MasterWebUI {
-  val STATIC_RESOURCE_DIR = "spark/ui/static"
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
deleted file mode 100644
index 34665ce..0000000
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.worker
-
-import java.io._
-import java.lang.System.getenv
-
-import akka.actor.ActorRef
-
-import com.google.common.base.Charsets
-import com.google.common.io.Files
-
-import spark.{Utils, Logging}
-import spark.deploy.{ExecutorState, ApplicationDescription}
-import spark.deploy.DeployMessages.ExecutorStateChanged
-
-/**
- * Manages the execution of one executor process.
- */
-private[spark] class ExecutorRunner(
-    val appId: String,
-    val execId: Int,
-    val appDesc: ApplicationDescription,
-    val cores: Int,
-    val memory: Int,
-    val worker: ActorRef,
-    val workerId: String,
-    val host: String,
-    val sparkHome: File,
-    val workDir: File)
-  extends Logging {
-
-  val fullId = appId + "/" + execId
-  var workerThread: Thread = null
-  var process: Process = null
-  var shutdownHook: Thread = null
-
-  private def getAppEnv(key: String): Option[String] =
-    appDesc.command.environment.get(key).orElse(Option(getenv(key)))
-
-  def start() {
-    workerThread = new Thread("ExecutorRunner for " + fullId) {
-      override def run() { fetchAndRunExecutor() }
-    }
-    workerThread.start()
-
-    // Shutdown hook that kills actors on shutdown.
-    shutdownHook = new Thread() {
-      override def run() {
-        if (process != null) {
-          logInfo("Shutdown hook killing child process.")
-          process.destroy()
-          process.waitFor()
-        }
-      }
-    }
-    Runtime.getRuntime.addShutdownHook(shutdownHook)
-  }
-
-  /** Stop this executor runner, including killing the process it launched */
-  def kill() {
-    if (workerThread != null) {
-      workerThread.interrupt()
-      workerThread = null
-      if (process != null) {
-        logInfo("Killing process!")
-        process.destroy()
-        process.waitFor()
-      }
-      worker ! ExecutorStateChanged(appId, execId, ExecutorState.KILLED, None, None)
-      Runtime.getRuntime.removeShutdownHook(shutdownHook)
-    }
-  }
-
-  /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
-  def substituteVariables(argument: String): String = argument match {
-    case "{{EXECUTOR_ID}}" => execId.toString
-    case "{{HOSTNAME}}" => host
-    case "{{CORES}}" => cores.toString
-    case other => other
-  }
-
-  def buildCommandSeq(): Seq[String] = {
-    val command = appDesc.command
-    val runner = getAppEnv("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
-    // SPARK-698: do not call the run.cmd script, as process.destroy()
-    // fails to kill a process tree on Windows
-    Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++
-      command.arguments.map(substituteVariables)
-  }
-
-  /**
-   * Attention: this must always be aligned with the environment variables in the run scripts and
-   * the way the JAVA_OPTS are assembled there.
-   */
-  def buildJavaOpts(): Seq[String] = {
-    val libraryOpts = getAppEnv("SPARK_LIBRARY_PATH")
-      .map(p => List("-Djava.library.path=" + p))
-      .getOrElse(Nil)
-    val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil)
-    val userOpts = getAppEnv("SPARK_JAVA_OPTS").map(Utils.splitCommandString).getOrElse(Nil)
-    val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M")
-
-    // Figure out our classpath with the external compute-classpath script
-    val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
-    val classPath = Utils.executeAndGetOutput(
-        Seq(sparkHome + "/bin/compute-classpath" + ext),
-        extraEnvironment=appDesc.command.environment)
-
-    Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts
-  }
-
-  /** Spawn a thread that will redirect a given stream to a file */
-  def redirectStream(in: InputStream, file: File) {
-    val out = new FileOutputStream(file, true)
-    new Thread("redirect output to " + file) {
-      override def run() {
-        try {
-          Utils.copyStream(in, out, true)
-        } catch {
-          case e: IOException =>
-            logInfo("Redirection to " + file + " closed: " + e.getMessage)
-        }
-      }
-    }.start()
-  }
-
-  /**
-   * Download and run the executor described in our ApplicationDescription
-   */
-  def fetchAndRunExecutor() {
-    try {
-      // Create the executor's working directory
-      val executorDir = new File(workDir, appId + "/" + execId)
-      if (!executorDir.mkdirs()) {
-        throw new IOException("Failed to create directory " + executorDir)
-      }
-
-      // Launch the process
-      val command = buildCommandSeq()
-      logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
-      val builder = new ProcessBuilder(command: _*).directory(executorDir)
-      val env = builder.environment()
-      for ((key, value) <- appDesc.command.environment) {
-        env.put(key, value)
-      }
-      // In case we are running this from within the Spark Shell, avoid creating a "scala"
-      // parent process for the executor command
-      env.put("SPARK_LAUNCH_WITH_SCALA", "0")
-      process = builder.start()
-
-      val header = "Spark Executor Command: %s\n%s\n\n".format(
-        command.mkString("\"", "\" \"", "\""), "=" * 40)
-
-      // Redirect its stdout and stderr to files
-      val stdout = new File(executorDir, "stdout")
-      redirectStream(process.getInputStream, stdout)
-
-      val stderr = new File(executorDir, "stderr")
-      Files.write(header, stderr, Charsets.UTF_8)
-      redirectStream(process.getErrorStream, stderr)
-
-      // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
-      // long-lived processes only. However, in the future, we might restart the executor a few
-      // times on the same machine.
-      val exitCode = process.waitFor()
-      val message = "Command exited with code " + exitCode
-      worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message),
-                                    Some(exitCode))
-    } catch {
-      case interrupted: InterruptedException =>
-        logInfo("Runner thread for executor " + fullId + " interrupted")
-
-      case e: Exception => {
-        logError("Error running executor", e)
-        if (process != null) {
-          process.destroy()
-        }
-        val message = e.getClass + ": " + e.getMessage
-        worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), None)
-      }
-    }
-  }
-}


Mime
View raw message