spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [22/69] [abbrv] [partial] Initial work to rename package to org.apache.spark
Date Sun, 01 Sep 2013 21:59:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
deleted file mode 100644
index 053ac55..0000000
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.worker
-
-import java.text.SimpleDateFormat
-import java.util.Date
-import java.io.File
-
-import scala.collection.mutable.HashMap
-
-import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-import akka.util.duration._
-
-import spark.{Logging, Utils}
-import spark.deploy.ExecutorState
-import spark.deploy.DeployMessages._
-import spark.deploy.master.Master
-import spark.deploy.worker.ui.WorkerWebUI
-import spark.metrics.MetricsSystem
-import spark.util.AkkaUtils
-
-
-private[spark] class Worker(
-    host: String,
-    port: Int,
-    webUiPort: Int,
-    cores: Int,
-    memory: Int,
-    masterUrl: String,
-    workDirPath: String = null)
-  extends Actor with Logging {
-
-  Utils.checkHost(host, "Expected hostname")
-  assert (port > 0)
-
-  val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For worker and executor IDs
-
-  // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
-  val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
-
-  var master: ActorRef = null
-  var masterWebUiUrl : String = ""
-  val workerId = generateWorkerId()
-  var sparkHome: File = null
-  var workDir: File = null
-  val executors = new HashMap[String, ExecutorRunner]
-  val finishedExecutors = new HashMap[String, ExecutorRunner]
-  val publicAddress = {
-    val envVar = System.getenv("SPARK_PUBLIC_DNS")
-    if (envVar != null) envVar else host
-  }
-  var webUi: WorkerWebUI = null
-
-  var coresUsed = 0
-  var memoryUsed = 0
-
-  val metricsSystem = MetricsSystem.createMetricsSystem("worker")
-  val workerSource = new WorkerSource(this)
-
-  def coresFree: Int = cores - coresUsed
-  def memoryFree: Int = memory - memoryUsed
-
-  def createWorkDir() {
-    workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
-    try {
-      // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
-      // So attempting to create and then check if directory was created or not.
-      workDir.mkdirs()
-      if ( !workDir.exists() || !workDir.isDirectory) {
-        logError("Failed to create work directory " + workDir)
-        System.exit(1)
-      }
-      assert (workDir.isDirectory)
-    } catch {
-      case e: Exception =>
-        logError("Failed to create work directory " + workDir, e)
-        System.exit(1)
-    }
-  }
-
-  override def preStart() {
-    logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
-      host, port, cores, Utils.megabytesToString(memory)))
-    sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
-    logInfo("Spark home: " + sparkHome)
-    createWorkDir()
-    webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
-
-    webUi.start()
-    connectToMaster()
-
-    metricsSystem.registerSource(workerSource)
-    metricsSystem.start()
-  }
-
-  def connectToMaster() {
-    logInfo("Connecting to master " + masterUrl)
-    master = context.actorFor(Master.toAkkaUrl(masterUrl))
-    master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)
-    context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
-    context.watch(master) // Doesn't work with remote actors, but useful for testing
-  }
-
-  override def receive = {
-    case RegisteredWorker(url) =>
-      masterWebUiUrl = url
-      logInfo("Successfully registered with master")
-      context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
-        master ! Heartbeat(workerId)
-      }
-
-    case RegisterWorkerFailed(message) =>
-      logError("Worker registration failed: " + message)
-      System.exit(1)
-
-    case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
-      logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
-      val manager = new ExecutorRunner(
-        appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
-      executors(appId + "/" + execId) = manager
-      manager.start()
-      coresUsed += cores_
-      memoryUsed += memory_
-      master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)
-
-    case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
-      master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
-      val fullId = appId + "/" + execId
-      if (ExecutorState.isFinished(state)) {
-        val executor = executors(fullId)
-        logInfo("Executor " + fullId + " finished with state " + state +
-          message.map(" message " + _).getOrElse("") +
-          exitStatus.map(" exitStatus " + _).getOrElse(""))
-        finishedExecutors(fullId) = executor
-        executors -= fullId
-        coresUsed -= executor.cores
-        memoryUsed -= executor.memory
-      }
-
-    case KillExecutor(appId, execId) =>
-      val fullId = appId + "/" + execId
-      executors.get(fullId) match {
-        case Some(executor) =>
-          logInfo("Asked to kill executor " + fullId)
-          executor.kill()
-        case None =>
-          logInfo("Asked to kill unknown executor " + fullId)
-      }
-
-    case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
-      masterDisconnected()
-
-    case RequestWorkerState => {
-      sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
-        finishedExecutors.values.toList, masterUrl, cores, memory,
-        coresUsed, memoryUsed, masterWebUiUrl)
-    }
-  }
-
-  def masterDisconnected() {
-    // TODO: It would be nice to try to reconnect to the master, but just shut down for now.
-    // (Note that if reconnecting we would also need to assign IDs differently.)
-    logError("Connection to master failed! Shutting down.")
-    executors.values.foreach(_.kill())
-    System.exit(1)
-  }
-
-  def generateWorkerId(): String = {
-    "worker-%s-%s-%d".format(DATE_FORMAT.format(new Date), host, port)
-  }
-
-  override def postStop() {
-    executors.values.foreach(_.kill())
-    webUi.stop()
-    metricsSystem.stop()
-  }
-}
-
-private[spark] object Worker {
-  def main(argStrings: Array[String]) {
-    val args = new WorkerArguments(argStrings)
-    val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
-      args.memory, args.master, args.workDir)
-    actorSystem.awaitTermination()
-  }
-
-  def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int,
-    masterUrl: String, workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = {
-    // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
-    val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
-    val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory,
-      masterUrl, workDir)), name = "Worker")
-    (actorSystem, boundPort)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
deleted file mode 100644
index 9fcd326..0000000
--- a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.worker
-
-import spark.util.IntParam
-import spark.util.MemoryParam
-import spark.Utils
-import java.lang.management.ManagementFactory
-
-/**
- * Command-line parser for the master.
- */
-private[spark] class WorkerArguments(args: Array[String]) {
-  var host = Utils.localHostName()
-  var port = 0
-  var webUiPort = 8081
-  var cores = inferDefaultCores()
-  var memory = inferDefaultMemory()
-  var master: String = null
-  var workDir: String = null
-  
-  // Check for settings in environment variables 
-  if (System.getenv("SPARK_WORKER_PORT") != null) {
-    port = System.getenv("SPARK_WORKER_PORT").toInt
-  }
-  if (System.getenv("SPARK_WORKER_CORES") != null) {
-    cores = System.getenv("SPARK_WORKER_CORES").toInt
-  }
-  if (System.getenv("SPARK_WORKER_MEMORY") != null) {
-    memory = Utils.memoryStringToMb(System.getenv("SPARK_WORKER_MEMORY"))
-  }
-  if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
-    webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
-  }
-  if (System.getenv("SPARK_WORKER_DIR") != null) {
-    workDir = System.getenv("SPARK_WORKER_DIR")
-  }
-  
-  parse(args.toList)
-
-  def parse(args: List[String]): Unit = args match {
-    case ("--ip" | "-i") :: value :: tail =>
-      Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
-      host = value
-      parse(tail)
-
-    case ("--host" | "-h") :: value :: tail =>
-      Utils.checkHost(value, "Please use hostname " + value)
-      host = value
-      parse(tail)
-
-    case ("--port" | "-p") :: IntParam(value) :: tail =>
-      port = value
-      parse(tail)
-
-    case ("--cores" | "-c") :: IntParam(value) :: tail =>
-      cores = value
-      parse(tail)
-
-    case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
-      memory = value
-      parse(tail)
-
-    case ("--work-dir" | "-d") :: value :: tail =>
-      workDir = value
-      parse(tail)
-      
-    case "--webui-port" :: IntParam(value) :: tail =>
-      webUiPort = value
-      parse(tail)
-
-    case ("--help" | "-h") :: tail =>
-      printUsageAndExit(0)
-
-    case value :: tail =>
-      if (master != null) {  // Two positional arguments were given
-        printUsageAndExit(1)
-      }
-      master = value
-      parse(tail)
-
-    case Nil =>
-      if (master == null) {  // No positional argument was given
-        printUsageAndExit(1)
-      }
-
-    case _ =>
-      printUsageAndExit(1)
-  }
-
-  /**
-   * Print usage and exit JVM with the given exit code.
-   */
-  def printUsageAndExit(exitCode: Int) {
-    System.err.println(
-      "Usage: Worker [options] <master>\n" +
-      "\n" +
-      "Master must be a URL of the form spark://hostname:port\n" +
-      "\n" +
-      "Options:\n" +
-      "  -c CORES, --cores CORES  Number of cores to use\n" +
-      "  -m MEM, --memory MEM     Amount of memory to use (e.g. 1000M, 2G)\n" +
-      "  -d DIR, --work-dir DIR   Directory to run apps in (default: SPARK_HOME/work)\n" +
-      "  -i HOST, --ip IP         Hostname to listen on (deprecated, please use --host or -h)\n" +
-      "  -h HOST, --host HOST     Hostname to listen on\n" +
-      "  -p PORT, --port PORT     Port to listen on (default: random)\n" +
-      "  --webui-port PORT        Port for web UI (default: 8081)")
-    System.exit(exitCode)
-  }
-
-  def inferDefaultCores(): Int = {
-    Runtime.getRuntime.availableProcessors()
-  }
-
-  def inferDefaultMemory(): Int = {
-    val ibmVendor = System.getProperty("java.vendor").contains("IBM")
-    var totalMb = 0
-    try {
-      val bean = ManagementFactory.getOperatingSystemMXBean()
-      if (ibmVendor) {
-        val beanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean")
-        val method = beanClass.getDeclaredMethod("getTotalPhysicalMemory")
-        totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
-      } else {
-        val beanClass = Class.forName("com.sun.management.OperatingSystemMXBean")
-        val method = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize")
-        totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
-      }
-    } catch {
-      case e: Exception => {
-        totalMb = 2*1024
-        System.out.println("Failed to get total physical memory. Using " + totalMb + " MB")
-      }
-    }
-    // Leave out 1 GB for the operating system, but don't return a negative memory size
-    math.max(totalMb - 1024, 512)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/worker/WorkerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/spark/deploy/worker/WorkerSource.scala
deleted file mode 100644
index 39cb8e5..0000000
--- a/core/src/main/scala/spark/deploy/worker/WorkerSource.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-package spark.deploy.worker
-
-import com.codahale.metrics.{Gauge, MetricRegistry}
-
-import spark.metrics.source.Source
-
-private[spark] class WorkerSource(val worker: Worker) extends Source {
-  val sourceName = "worker"
-  val metricRegistry = new MetricRegistry()
-
-  metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
-    override def getValue: Int = worker.executors.size
-  })
-
-  // Gauge for cores used of this worker
-  metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
-    override def getValue: Int = worker.coresUsed
-  })
-
-  // Gauge for memory used of this worker
-  metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
-    override def getValue: Int = worker.memoryUsed
-  })
-
-  // Gauge for cores free of this worker
-  metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
-    override def getValue: Int = worker.coresFree
-  })
-
-  // Gauge for memory free of this worker
-  metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
-    override def getValue: Int = worker.memoryFree
-  })
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
deleted file mode 100644
index 243e076..0000000
--- a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.worker.ui
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import akka.dispatch.Await
-import akka.pattern.ask
-import akka.util.duration._
-
-import net.liftweb.json.JsonAST.JValue
-
-import spark.Utils
-import spark.deploy.JsonProtocol
-import spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
-import spark.deploy.worker.ExecutorRunner
-import spark.ui.UIUtils
-
-
-private[spark] class IndexPage(parent: WorkerWebUI) {
-  val workerActor = parent.worker.self
-  val worker = parent.worker
-  val timeout = parent.timeout
-
-  def renderJson(request: HttpServletRequest): JValue = {
-    val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
-    val workerState = Await.result(stateFuture, 30 seconds)
-    JsonProtocol.writeWorkerState(workerState)
-  }
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-    val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
-    val workerState = Await.result(stateFuture, 30 seconds)
-
-    val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
-    val runningExecutorTable =
-      UIUtils.listingTable(executorHeaders, executorRow, workerState.executors)
-    val finishedExecutorTable =
-      UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
-
-    val content =
-        <div class="row-fluid"> <!-- Worker Details -->
-          <div class="span12">
-            <ul class="unstyled">
-              <li><strong>ID:</strong> {workerState.workerId}</li>
-              <li><strong>
-                Master URL:</strong> {workerState.masterUrl}
-              </li>
-              <li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li>
-              <li><strong>Memory:</strong> {Utils.megabytesToString(workerState.memory)}
-                ({Utils.megabytesToString(workerState.memoryUsed)} Used)</li>
-            </ul>
-            <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
-          </div>
-        </div>
-
-        <div class="row-fluid"> <!-- Running Executors -->
-          <div class="span12">
-            <h4> Running Executors {workerState.executors.size} </h4>
-            {runningExecutorTable}
-          </div>
-        </div>
-
-        <div class="row-fluid"> <!-- Finished Executors  -->
-          <div class="span12">
-            <h4> Finished Executors </h4>
-            {finishedExecutorTable}
-          </div>
-        </div>;
-
-    UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format(
-      workerState.host, workerState.port))
-  }
-
-  def executorRow(executor: ExecutorRunner): Seq[Node] = {
-    <tr>
-      <td>{executor.execId}</td>
-      <td>{executor.cores}</td>
-      <td sorttable_customkey={executor.memory.toString}>
-        {Utils.megabytesToString(executor.memory)}
-      </td>
-      <td>
-        <ul class="unstyled">
-          <li><strong>ID:</strong> {executor.appId}</li>
-          <li><strong>Name:</strong> {executor.appDesc.name}</li>
-          <li><strong>User:</strong> {executor.appDesc.user}</li>
-        </ul>
-      </td>
-      <td>
-	 <a href={"logPage?appId=%s&executorId=%s&logType=stdout"
-          .format(executor.appId, executor.execId)}>stdout</a>
-	 <a href={"logPage?appId=%s&executorId=%s&logType=stderr"
-          .format(executor.appId, executor.execId)}>stderr</a>
-      </td> 
-    </tr>
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
deleted file mode 100644
index 0a75ad8..0000000
--- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.worker.ui
-
-import akka.util.{Duration, Timeout}
-
-import java.io.{FileInputStream, File}
-
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.server.{Handler, Server}
-
-import spark.deploy.worker.Worker
-import spark.{Utils, Logging}
-import spark.ui.JettyUtils
-import spark.ui.JettyUtils._
-import spark.ui.UIUtils
-
-/**
- * Web UI server for the standalone worker.
- */
-private[spark]
-class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
-  extends Logging {
-  implicit val timeout = Timeout(
-    Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
-  val host = Utils.localHostName()
-  val port = requestedPort.getOrElse(
-    System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
-
-  var server: Option[Server] = None
-  var boundPort: Option[Int] = None
-
-  val indexPage = new IndexPage(this)
-
-  val metricsHandlers = worker.metricsSystem.getServletHandlers
-
-  val handlers = metricsHandlers ++ Array[(String, Handler)](
-    ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
-    ("/log", (request: HttpServletRequest) => log(request)),
-    ("/logPage", (request: HttpServletRequest) => logPage(request)),
-    ("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
-    ("*", (request: HttpServletRequest) => indexPage.render(request))
-  )
-
-  def start() {
-    try {
-      val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
-      server = Some(srv)
-      boundPort = Some(bPort)
-      logInfo("Started Worker web UI at http://%s:%d".format(host, bPort))
-    } catch {
-      case e: Exception =>
-        logError("Failed to create Worker JettyUtils", e)
-        System.exit(1)
-    }
-  }
-
-  def log(request: HttpServletRequest): String = {
-    val defaultBytes = 100 * 1024
-    val appId = request.getParameter("appId")
-    val executorId = request.getParameter("executorId")
-    val logType = request.getParameter("logType")
-    val offset = Option(request.getParameter("offset")).map(_.toLong)
-    val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
-    val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
-
-    val (startByte, endByte) = getByteRange(path, offset, byteLength)
-    val file = new File(path)
-    val logLength = file.length
-
-    val pre = "==== Bytes %s-%s of %s of %s/%s/%s ====\n"
-      .format(startByte, endByte, logLength, appId, executorId, logType)
-    pre + Utils.offsetBytes(path, startByte, endByte)
-  }
-
-  def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = {
-    val defaultBytes = 100 * 1024
-    val appId = request.getParameter("appId")
-    val executorId = request.getParameter("executorId")
-    val logType = request.getParameter("logType")
-    val offset = Option(request.getParameter("offset")).map(_.toLong)
-    val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
-    val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
-
-    val (startByte, endByte) = getByteRange(path, offset, byteLength)
-    val file = new File(path)
-    val logLength = file.length
-
-    val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
-
-    val linkToMaster = <p><a href={worker.masterWebUiUrl}>Back to Master</a></p>
-
-    val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
-
-    val backButton =
-      if (startByte > 0) {
-        <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s"
-          .format(appId, executorId, logType, math.max(startByte-byteLength, 0),
-          byteLength)}>
-          <button type="button" class="btn btn-default">
-            Previous {Utils.bytesToString(math.min(byteLength, startByte))}
-          </button>
-        </a>
-      }
-      else {
-        <button type="button" class="btn btn-default" disabled="disabled">
-          Previous 0 B
-        </button>
-      }
-
-    val nextButton =
-      if (endByte < logLength) {
-        <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s".
-          format(appId, executorId, logType, endByte, byteLength)}>
-          <button type="button" class="btn btn-default">
-            Next {Utils.bytesToString(math.min(byteLength, logLength-endByte))}
-          </button>
-        </a>
-      }
-      else {
-        <button type="button" class="btn btn-default" disabled="disabled">
-          Next 0 B
-        </button>
-      }
-
-    val content =
-      <html>
-        <body>
-          {linkToMaster}
-          <div>
-            <div style="float:left;width:40%">{backButton}</div>
-            <div style="float:left;">{range}</div>
-            <div style="float:right;">{nextButton}</div>
-          </div>
-          <br />
-          <div style="height:500px;overflow:auto;padding:5px;">
-            <pre>{logText}</pre>
-          </div>
-        </body>
-      </html>
-    UIUtils.basicSparkPage(content, logType + " log page for " + appId)
-  }
-
-  /** Determine the byte range for a log or log page. */
-  def getByteRange(path: String, offset: Option[Long], byteLength: Int)
-  : (Long, Long) = {
-    val defaultBytes = 100 * 1024
-    val maxBytes = 1024 * 1024
-
-    val file = new File(path)
-    val logLength = file.length()
-    val getOffset = offset.getOrElse(logLength-defaultBytes)
-
-    val startByte =
-      if (getOffset < 0) 0L
-      else if (getOffset > logLength) logLength
-      else getOffset
-
-    val logPageLength = math.min(byteLength, maxBytes)
-
-    val endByte = math.min(startByte+logPageLength, logLength)
-
-    (startByte, endByte)
-  }
-
-  def stop() {
-    server.foreach(_.stop())
-  }
-}
-
-private[spark] object WorkerWebUI {
-  val STATIC_RESOURCE_DIR = "spark/ui/static"
-  val DEFAULT_PORT="8081"
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
deleted file mode 100644
index fa82d2b..0000000
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.executor
-
-import java.io.{File}
-import java.lang.management.ManagementFactory
-import java.nio.ByteBuffer
-import java.util.concurrent._
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-
-import spark.scheduler._
-import spark._
-
-
-/**
- * The Mesos executor for Spark.
- */
-private[spark] class Executor(
-    executorId: String,
-    slaveHostname: String,
-    properties: Seq[(String, String)])
-  extends Logging
-{
-  // Application dependencies (added through SparkContext) that we've fetched so far on this node.
-  // Each map holds the master's timestamp for the version of that file or JAR we got.
-  private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
-  private val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
-
-  private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
-
-  initLogging()
-
-  // No ip or host:port - just hostname
-  Utils.checkHost(slaveHostname, "Expected executed slave to be a hostname")
-  // must not have port specified.
-  assert (0 == Utils.parseHostPort(slaveHostname)._2)
-
-  // Make sure the local hostname we report matches the cluster scheduler's name for this host
-  Utils.setCustomHostname(slaveHostname)
-
-  // Set spark.* system properties from executor arg
-  for ((key, value) <- properties) {
-    System.setProperty(key, value)
-  }
-
-  // If we are in yarn mode, systems can have different disk layouts so we must set it
-  // to what Yarn on this system said was available. This will be used later when SparkEnv
-  // created.
-  if (java.lang.Boolean.valueOf(System.getenv("SPARK_YARN_MODE"))) {
-    System.setProperty("spark.local.dir", getYarnLocalDirs())
-  }
-
-  // Create our ClassLoader and set it on this thread
-  private val urlClassLoader = createClassLoader()
-  private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
-  Thread.currentThread.setContextClassLoader(replClassLoader)
-
-  // Make any thread terminations due to uncaught exceptions kill the entire
-  // executor process to avoid surprising stalls.
-  Thread.setDefaultUncaughtExceptionHandler(
-    new Thread.UncaughtExceptionHandler {
-      override def uncaughtException(thread: Thread, exception: Throwable) {
-        try {
-          logError("Uncaught exception in thread " + thread, exception)
-
-          // We may have been called from a shutdown hook. If so, we must not call System.exit().
-          // (If we do, we will deadlock.)
-          if (!Utils.inShutdown()) {
-            if (exception.isInstanceOf[OutOfMemoryError]) {
-              System.exit(ExecutorExitCode.OOM)
-            } else {
-              System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
-            }
-          }
-        } catch {
-          case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
-          case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
-        }
-      }
-    }
-  )
-
-  val executorSource = new ExecutorSource(this)
-
-  // Initialize Spark environment (using system properties read above)
-  val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
-  SparkEnv.set(env)
-  env.metricsSystem.registerSource(executorSource)
-
-  private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
-
-  // Start worker thread pool
-  val threadPool = new ThreadPoolExecutor(
-    1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
-
-  def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
-    threadPool.execute(new TaskRunner(context, taskId, serializedTask))
-  }
-
-  /** Get the Yarn approved local directories. */
-  private def getYarnLocalDirs(): String = {
-    // Hadoop 0.23 and 2.x have different Environment variable names for the
-    // local dirs, so lets check both. We assume one of the 2 is set.
-    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
-    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
-      .getOrElse(Option(System.getenv("LOCAL_DIRS"))
-      .getOrElse(""))
-
-    if (localDirs.isEmpty()) {
-      throw new Exception("Yarn Local dirs can't be empty")
-    }
-    return localDirs
-  }
-
-  class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
-    extends Runnable {
-
-    override def run() {
-      val startTime = System.currentTimeMillis()
-      SparkEnv.set(env)
-      Thread.currentThread.setContextClassLoader(replClassLoader)
-      val ser = SparkEnv.get.closureSerializer.newInstance()
-      logInfo("Running task ID " + taskId)
-      context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
-      var attemptedTask: Option[Task[Any]] = None
-      var taskStart: Long = 0
-      def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum
-      val startGCTime = getTotalGCTime
-
-      try {
-        SparkEnv.set(env)
-        Accumulators.clear()
-        val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
-        updateDependencies(taskFiles, taskJars)
-        val task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
-        attemptedTask = Some(task)
-        logInfo("Its epoch is " + task.epoch)
-        env.mapOutputTracker.updateEpoch(task.epoch)
-        taskStart = System.currentTimeMillis()
-        val value = task.run(taskId.toInt)
-        val taskFinish = System.currentTimeMillis()
-        for (m <- task.metrics) {
-          m.hostname = Utils.localHostName
-          m.executorDeserializeTime = (taskStart - startTime).toInt
-          m.executorRunTime = (taskFinish - taskStart).toInt
-          m.jvmGCTime = getTotalGCTime - startGCTime
-        }
-        //TODO I'd also like to track the time it takes to serialize the task results, but that is huge headache, b/c
-        // we need to serialize the task metrics first.  If TaskMetrics had a custom serialized format, we could
-        // just change the relevants bytes in the byte buffer
-        val accumUpdates = Accumulators.values
-        val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null))
-        val serializedResult = ser.serialize(result)
-        logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
-        if (serializedResult.limit >= (akkaFrameSize - 1024)) {
-          context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskResultTooBigFailure()))
-          return
-        }
-        context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
-        logInfo("Finished task ID " + taskId)
-      } catch {
-        case ffe: FetchFailedException => {
-          val reason = ffe.toTaskEndReason
-          context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
-        }
-
-        case t: Throwable => {
-          val serviceTime = (System.currentTimeMillis() - taskStart).toInt
-          val metrics = attemptedTask.flatMap(t => t.metrics)
-          for (m <- metrics) {
-            m.executorRunTime = serviceTime
-            m.jvmGCTime = getTotalGCTime - startGCTime
-          }
-          val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
-          context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
-
-          // TODO: Should we exit the whole executor here? On the one hand, the failed task may
-          // have left some weird state around depending on when the exception was thrown, but on
-          // the other hand, maybe we could detect that when future tasks fail and exit then.
-          logError("Exception in task ID " + taskId, t)
-          //System.exit(1)
-        }
-      }
-    }
-  }
-
-  /**
-   * Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes
-   * created by the interpreter to the search path
-   */
-  private def createClassLoader(): ExecutorURLClassLoader = {
-    var loader = this.getClass.getClassLoader
-
-    // For each of the jars in the jarSet, add them to the class loader.
-    // We assume each of the files has already been fetched.
-    val urls = currentJars.keySet.map { uri =>
-      new File(uri.split("/").last).toURI.toURL
-    }.toArray
-    new ExecutorURLClassLoader(urls, loader)
-  }
-
-  /**
-   * If the REPL is in use, add another ClassLoader that will read
-   * new classes defined by the REPL as the user types code
-   */
-  private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {
-    val classUri = System.getProperty("spark.repl.class.uri")
-    if (classUri != null) {
-      logInfo("Using REPL class URI: " + classUri)
-      try {
-        val klass = Class.forName("spark.repl.ExecutorClassLoader")
-          .asInstanceOf[Class[_ <: ClassLoader]]
-        val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader])
-        return constructor.newInstance(classUri, parent)
-      } catch {
-        case _: ClassNotFoundException =>
-          logError("Could not find spark.repl.ExecutorClassLoader on classpath!")
-          System.exit(1)
-          null
-      }
-    } else {
-      return parent
-    }
-  }
-
-  /**
-   * Download any missing dependencies if we receive a new set of files and JARs from the
-   * SparkContext. Also adds any new JARs we fetched to the class loader.
-   */
-  private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
-    synchronized {
-      // Fetch missing dependencies
-      for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
-        logInfo("Fetching " + name + " with timestamp " + timestamp)
-        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
-        currentFiles(name) = timestamp
-      }
-      for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
-        logInfo("Fetching " + name + " with timestamp " + timestamp)
-        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
-        currentJars(name) = timestamp
-        // Add it to our class loader
-        val localName = name.split("/").last
-        val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
-        if (!urlClassLoader.getURLs.contains(url)) {
-          logInfo("Adding " + url + " to class loader")
-          urlClassLoader.addURL(url)
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/executor/ExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/executor/ExecutorBackend.scala b/core/src/main/scala/spark/executor/ExecutorBackend.scala
deleted file mode 100644
index 33a6f8a..0000000
--- a/core/src/main/scala/spark/executor/ExecutorBackend.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.executor
-
-import java.nio.ByteBuffer
-import spark.TaskState.TaskState
-
-/**
- * A pluggable interface used by the Executor to send updates to the cluster scheduler.
- */
-private[spark] trait ExecutorBackend {
-  def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/executor/ExecutorExitCode.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/spark/executor/ExecutorExitCode.scala
deleted file mode 100644
index 64b9fb8..0000000
--- a/core/src/main/scala/spark/executor/ExecutorExitCode.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.executor
-
-/**
- * These are exit codes that executors should use to provide the master with information about
- * executor failures assuming that cluster management framework can capture the exit codes (but
- * perhaps not log files). The exit code constants here are chosen to be unlikely to conflict
- * with "natural" exit statuses that may be caused by the JVM or user code. In particular,
- * exit codes 128+ arise on some Unix-likes as a result of signals, and it appears that the
- * OpenJDK JVM may use exit code 1 in some of its own "last chance" code.
- */
-private[spark]
-object ExecutorExitCode {
-  /** The default uncaught exception handler was reached. */
-  val UNCAUGHT_EXCEPTION = 50
-
-  /** The default uncaught exception handler was called and an exception was encountered while
-      logging the exception. */
-  val UNCAUGHT_EXCEPTION_TWICE = 51
-
-  /** The default uncaught exception handler was reached, and the uncaught exception was an 
-      OutOfMemoryError. */
-  val OOM = 52
-
-  /** DiskStore failed to create a local temporary directory after many attempts. */
-  val DISK_STORE_FAILED_TO_CREATE_DIR = 53
-
-  def explainExitCode(exitCode: Int): String = {
-    exitCode match {
-      case UNCAUGHT_EXCEPTION => "Uncaught exception"
-      case UNCAUGHT_EXCEPTION_TWICE => "Uncaught exception, and logging the exception failed"
-      case OOM => "OutOfMemoryError"
-      case DISK_STORE_FAILED_TO_CREATE_DIR =>
-        "Failed to create local directory (bad spark.local.dir?)"
-      case _ => 
-        "Unknown executor exit code (" + exitCode + ")" + (
-          if (exitCode > 128)
-            " (died from signal " + (exitCode - 128) + "?)"
-          else
-            ""
-        )
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/executor/ExecutorSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/executor/ExecutorSource.scala b/core/src/main/scala/spark/executor/ExecutorSource.scala
deleted file mode 100644
index d491a3c..0000000
--- a/core/src/main/scala/spark/executor/ExecutorSource.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-package spark.executor
-
-import com.codahale.metrics.{Gauge, MetricRegistry}
-
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.hdfs.DistributedFileSystem
-import org.apache.hadoop.fs.LocalFileSystem
-
-import scala.collection.JavaConversions._
-
-import spark.metrics.source.Source
-
-class ExecutorSource(val executor: Executor) extends Source {
-  private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
-    FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption
-
-  private def registerFileSystemStat[T](
-        scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {
-    metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] {
-      override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue)
-    })
-  }
-
-  val metricRegistry = new MetricRegistry()
-  val sourceName = "executor"
-
-  // Gauge for executor thread pool's actively executing task counts
-  metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
-    override def getValue: Int = executor.threadPool.getActiveCount()
-  })
-
-  // Gauge for executor thread pool's approximate total number of tasks that have been completed
-  metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
-    override def getValue: Long = executor.threadPool.getCompletedTaskCount()
-  })
-
-  // Gauge for executor thread pool's current number of threads
-  metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
-    override def getValue: Int = executor.threadPool.getPoolSize()
-  })
-
-  // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
-  metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
-    override def getValue: Int = executor.threadPool.getMaximumPoolSize()
-  })
-
-  // Gauge for file system stats of this executor
-  for (scheme <- Array("hdfs", "file")) {
-    registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L)
-    registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L)
-    registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0)
-    registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0)
-    registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/executor/ExecutorURLClassLoader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/spark/executor/ExecutorURLClassLoader.scala
deleted file mode 100644
index 09d12fb..0000000
--- a/core/src/main/scala/spark/executor/ExecutorURLClassLoader.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.executor
-
-import java.net.{URLClassLoader, URL}
-
-/**
- * The addURL method in URLClassLoader is protected. We subclass it to make this accessible.
- */
-private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
-  extends URLClassLoader(urls, parent) {
-
-  override def addURL(url: URL) {
-    super.addURL(url)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
deleted file mode 100644
index 4961c42..0000000
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.executor
-
-import java.nio.ByteBuffer
-import org.apache.mesos.{Executor => MesosExecutor, MesosExecutorDriver, MesosNativeLibrary, ExecutorDriver}
-import org.apache.mesos.Protos.{TaskState => MesosTaskState, TaskStatus => MesosTaskStatus, _}
-import spark.TaskState.TaskState
-import com.google.protobuf.ByteString
-import spark.{Utils, Logging}
-import spark.TaskState
-
-private[spark] class MesosExecutorBackend
-  extends MesosExecutor
-  with ExecutorBackend
-  with Logging {
-
-  var executor: Executor = null
-  var driver: ExecutorDriver = null
-
-  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
-    val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
-    driver.sendStatusUpdate(MesosTaskStatus.newBuilder()
-      .setTaskId(mesosTaskId)
-      .setState(TaskState.toMesos(state))
-      .setData(ByteString.copyFrom(data))
-      .build())
-  }
-
-  override def registered(
-      driver: ExecutorDriver,
-      executorInfo: ExecutorInfo,
-      frameworkInfo: FrameworkInfo,
-      slaveInfo: SlaveInfo) {
-    logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
-    this.driver = driver
-    val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
-    executor = new Executor(
-      executorInfo.getExecutorId.getValue,
-      slaveInfo.getHostname,
-      properties)
-  }
-
-  override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
-    val taskId = taskInfo.getTaskId.getValue.toLong
-    if (executor == null) {
-      logError("Received launchTask but executor was null")
-    } else {
-      executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
-    }
-  }
-
-  override def error(d: ExecutorDriver, message: String) {
-    logError("Error from Mesos: " + message)
-  }
-
-  override def killTask(d: ExecutorDriver, t: TaskID) {
-    logWarning("Mesos asked us to kill task " + t.getValue + "; ignoring (not yet implemented)")
-  }
-
-  override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {}
-
-  override def disconnected(d: ExecutorDriver) {}
-
-  override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
-
-  override def shutdown(d: ExecutorDriver) {}
-}
-
-/**
- * Entry point for Mesos executor.
- */
-private[spark] object MesosExecutorBackend {
-  def main(args: Array[String]) {
-    MesosNativeLibrary.load()
-    // Create a new Executor and start it running
-    val runner = new MesosExecutorBackend()
-    new MesosExecutorDriver(runner).run()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
deleted file mode 100644
index b5fb6db..0000000
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.executor
-
-import java.nio.ByteBuffer
-
-import akka.actor.{ActorRef, Actor, Props, Terminated}
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-
-import spark.{Logging, Utils, SparkEnv}
-import spark.TaskState.TaskState
-import spark.scheduler.cluster.StandaloneClusterMessages._
-import spark.util.AkkaUtils
-
-
-private[spark] class StandaloneExecutorBackend(
-    driverUrl: String,
-    executorId: String,
-    hostPort: String,
-    cores: Int)
-  extends Actor
-  with ExecutorBackend
-  with Logging {
-
-  Utils.checkHostPort(hostPort, "Expected hostport")
-
-  var executor: Executor = null
-  var driver: ActorRef = null
-
-  override def preStart() {
-    logInfo("Connecting to driver: " + driverUrl)
-    driver = context.actorFor(driverUrl)
-    driver ! RegisterExecutor(executorId, hostPort, cores)
-    context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
-    context.watch(driver) // Doesn't work with remote actors, but useful for testing
-  }
-
-  override def receive = {
-    case RegisteredExecutor(sparkProperties) =>
-      logInfo("Successfully registered with driver")
-      // Make this host instead of hostPort ?
-      executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
-
-    case RegisterExecutorFailed(message) =>
-      logError("Slave registration failed: " + message)
-      System.exit(1)
-
-    case LaunchTask(taskDesc) =>
-      logInfo("Got assigned task " + taskDesc.taskId)
-      if (executor == null) {
-        logError("Received launchTask but executor was null")
-        System.exit(1)
-      } else {
-        executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
-      }
-
-    case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
-      logError("Driver terminated or disconnected! Shutting down.")
-      System.exit(1)
-  }
-
-  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
-    driver ! StatusUpdate(executorId, taskId, state, data)
-  }
-}
-
-private[spark] object StandaloneExecutorBackend {
-  def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
-    // Debug code
-    Utils.checkHost(hostname)
-
-    // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
-    // before getting started with all our system properties, etc
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
-    // set it
-    val sparkHostPort = hostname + ":" + boundPort
-    System.setProperty("spark.hostPort", sparkHostPort)
-    val actor = actorSystem.actorOf(
-      Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
-      name = "Executor")
-    actorSystem.awaitTermination()
-  }
-
-  def main(args: Array[String]) {
-    if (args.length < 4) {
-      //the reason we allow the last frameworkId argument is to make it easy to kill rogue executors
-      System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <executorId> <hostname> <cores> [<appid>]")
-      System.exit(1)
-    }
-    run(args(0), args(1), args(2), args(3).toInt)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala
deleted file mode 100644
index 47b8890..0000000
--- a/core/src/main/scala/spark/executor/TaskMetrics.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.executor
-
-class TaskMetrics extends Serializable {
-  /**
-   * Host's name the task runs on 
-   */
-  var hostname: String = _
-
-  /**
-   * Time taken on the executor to deserialize this task
-   */
-  var executorDeserializeTime: Int = _
-
-  /**
-   * Time the executor spends actually running the task (including fetching shuffle data)
-   */
-  var executorRunTime: Int = _
-
-  /**
-   * The number of bytes this task transmitted back to the driver as the TaskResult
-   */
-  var resultSize: Long = _
-
-  /**
-   * Amount of time the JVM spent in garbage collection while executing this task
-   */
-  var jvmGCTime: Long = _
-
-  /**
-   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
-   */
-  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
-
-  /**
-   * If this task writes to shuffle output, metrics on the written shuffle data will be collected here
-   */
-  var shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None
-}
-
-object TaskMetrics {
-  private[spark] def empty(): TaskMetrics = new TaskMetrics
-}
-
-
-class ShuffleReadMetrics extends Serializable {
-  /**
-   * Time when shuffle finishs
-   */
-  var shuffleFinishTime: Long = _
-
-  /**
-   * Total number of blocks fetched in a shuffle (remote or local)
-   */
-  var totalBlocksFetched: Int = _
-
-  /**
-   * Number of remote blocks fetched in a shuffle
-   */
-  var remoteBlocksFetched: Int = _
-
-  /**
-   * Local blocks fetched in a shuffle
-   */
-  var localBlocksFetched: Int = _
-
-  /**
-   * Total time that is spent blocked waiting for shuffle to fetch data
-   */
-  var fetchWaitTime: Long = _
-
-  /**
-   * The total amount of time for all the shuffle fetches.  This adds up time from overlapping
-   *     shuffles, so can be longer than task time
-   */
-  var remoteFetchTime: Long = _
-
-  /**
-   * Total number of remote bytes read from a shuffle
-   */
-  var remoteBytesRead: Long = _
-}
-
-class ShuffleWriteMetrics extends Serializable {
-  /**
-   * Number of bytes written for a shuffle
-   */
-  var shuffleBytesWritten: Long = _
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/io/CompressionCodec.scala b/core/src/main/scala/spark/io/CompressionCodec.scala
deleted file mode 100644
index 0adebec..0000000
--- a/core/src/main/scala/spark/io/CompressionCodec.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.io
-
-import java.io.{InputStream, OutputStream}
-
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-
-import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
-
-
-/**
- * CompressionCodec allows the customization of choosing different compression implementations
- * to be used in block storage.
- */
-trait CompressionCodec {
-
-  def compressedOutputStream(s: OutputStream): OutputStream
-
-  def compressedInputStream(s: InputStream): InputStream
-}
-
-
-private[spark] object CompressionCodec {
-
-  def createCodec(): CompressionCodec = {
-    // Set the default codec to Snappy since the LZF implementation initializes a pretty large
-    // buffer for every stream, which results in a lot of memory overhead when the number of
-    // shuffle reduce buckets are large.
-    createCodec(classOf[SnappyCompressionCodec].getName)
-  }
-
-  def createCodec(codecName: String): CompressionCodec = {
-    Class.forName(
-      System.getProperty("spark.io.compression.codec", codecName),
-      true,
-      Thread.currentThread.getContextClassLoader).newInstance().asInstanceOf[CompressionCodec]
-  }
-}
-
-
-/**
- * LZF implementation of [[spark.io.CompressionCodec]].
- */
-class LZFCompressionCodec extends CompressionCodec {
-
-  override def compressedOutputStream(s: OutputStream): OutputStream = {
-    new LZFOutputStream(s).setFinishBlockOnFlush(true)
-  }
-
-  override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s)
-}
-
-
-/**
- * Snappy implementation of [[spark.io.CompressionCodec]].
- * Block size can be configured by spark.io.compression.snappy.block.size.
- */
-class SnappyCompressionCodec extends CompressionCodec {
-
-  override def compressedOutputStream(s: OutputStream): OutputStream = {
-    val blockSize = System.getProperty("spark.io.compression.snappy.block.size", "32768").toInt
-    new SnappyOutputStream(s, blockSize)
-  }
-
-  override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/metrics/MetricsConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
deleted file mode 100644
index d7fb537..0000000
--- a/core/src/main/scala/spark/metrics/MetricsConfig.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.metrics
-
-import java.util.Properties
-import java.io.{File, FileInputStream, InputStream, IOException}
-
-import scala.collection.mutable
-import scala.util.matching.Regex
-
-import spark.Logging
-
-private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
-  initLogging()
-
-  val DEFAULT_PREFIX = "*"
-  val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
-  val METRICS_CONF = "metrics.properties"
-
-  val properties = new Properties()
-  var propertyCategories: mutable.HashMap[String, Properties] = null
-
-  private def setDefaultProperties(prop: Properties) {
-    prop.setProperty("*.sink.servlet.class", "spark.metrics.sink.MetricsServlet")
-    prop.setProperty("*.sink.servlet.uri", "/metrics/json")
-    prop.setProperty("*.sink.servlet.sample", "false")
-    prop.setProperty("master.sink.servlet.uri", "/metrics/master/json")
-    prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json")
-  }
-
-  def initialize() {
-    //Add default properties in case there's no properties file
-    setDefaultProperties(properties)
-
-    // If spark.metrics.conf is not set, try to get file in class path
-    var is: InputStream = null
-    try {
-      is = configFile match {
-        case Some(f) => new FileInputStream(f)
-        case None => getClass.getClassLoader.getResourceAsStream(METRICS_CONF)
-      }
-
-      if (is != null) {
-        properties.load(is)
-      }
-    } catch {
-      case e: Exception => logError("Error loading configure file", e)
-    } finally {
-      if (is != null) is.close()
-    }
-
-    propertyCategories = subProperties(properties, INSTANCE_REGEX)
-    if (propertyCategories.contains(DEFAULT_PREFIX)) {
-      import scala.collection.JavaConversions._
-
-      val defaultProperty = propertyCategories(DEFAULT_PREFIX)
-      for { (inst, prop) <- propertyCategories
-            if (inst != DEFAULT_PREFIX)
-            (k, v) <- defaultProperty
-            if (prop.getProperty(k) == null) } {
-        prop.setProperty(k, v)
-      }
-    }
-  }
-
-  def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
-    val subProperties = new mutable.HashMap[String, Properties]
-    import scala.collection.JavaConversions._
-    prop.foreach { kv =>
-      if (regex.findPrefixOf(kv._1) != None) {
-        val regex(prefix, suffix) = kv._1
-        subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
-      }
-    }
-    subProperties
-  }
-
-  def getInstance(inst: String): Properties = {
-    propertyCategories.get(inst) match {
-      case Some(s) => s
-      case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/metrics/MetricsSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
deleted file mode 100644
index 4e6c6b2..0000000
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.metrics
-
-import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
-
-import java.util.Properties
-import java.util.concurrent.TimeUnit
-
-import scala.collection.mutable
-
-import spark.Logging
-import spark.metrics.sink.{MetricsServlet, Sink}
-import spark.metrics.source.Source
-
-/**
- * Spark Metrics System, created by specific "instance", combined by source,
- * sink, periodically poll source metrics data to sink destinations.
- *
- * "instance" specify "who" (the role) use metrics system. In spark there are several roles
- * like master, worker, executor, client driver, these roles will create metrics system
- * for monitoring. So instance represents these roles. Currently in Spark, several instances
- * have already implemented: master, worker, executor, driver, applications.
- *
- * "source" specify "where" (source) to collect metrics data. In metrics system, there exists
- * two kinds of source:
- *   1. Spark internal source, like MasterSource, WorkerSource, etc, which will collect
- *   Spark component's internal state, these sources are related to instance and will be
- *   added after specific metrics system is created.
- *   2. Common source, like JvmSource, which will collect low level state, is configured by
- *   configuration and loaded through reflection.
- *
- * "sink" specify "where" (destination) to output metrics data to. Several sinks can be
- * coexisted and flush metrics to all these sinks.
- *
- * Metrics configuration format is like below:
- * [instance].[sink|source].[name].[options] = xxxx
- *
- * [instance] can be "master", "worker", "executor", "driver", "applications" which means only
- * the specified instance has this property.
- * wild card "*" can be used to replace instance name, which means all the instances will have
- * this property.
- *
- * [sink|source] means this property belongs to source or sink. This field can only be source or sink.
- *
- * [name] specify the name of sink or source, it is custom defined.
- *
- * [options] is the specific property of this source or sink.
- */
-private[spark] class MetricsSystem private (val instance: String) extends Logging {
-  initLogging()
-
-  val confFile = System.getProperty("spark.metrics.conf")
-  val metricsConfig = new MetricsConfig(Option(confFile))
-
-  val sinks = new mutable.ArrayBuffer[Sink]
-  val sources = new mutable.ArrayBuffer[Source]
-  val registry = new MetricRegistry()
-
-  // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
-  private var metricsServlet: Option[MetricsServlet] = None
-
-  /** Get any UI handlers used by this metrics system. */
-  def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())
-
-  metricsConfig.initialize()
-  registerSources()
-  registerSinks()
-
-  def start() {
-    sinks.foreach(_.start)
-  }
-
-  def stop() {
-    sinks.foreach(_.stop)
-  }
-
-  def registerSource(source: Source) {
-    sources += source
-    try {
-      registry.register(source.sourceName, source.metricRegistry)
-    } catch {
-      case e: IllegalArgumentException => logInfo("Metrics already registered", e)
-    }
-  }
-
-  def removeSource(source: Source) {
-    sources -= source
-    registry.removeMatching(new MetricFilter {
-      def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName)
-    })
-  }
-
-  def registerSources() {
-    val instConfig = metricsConfig.getInstance(instance)
-    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
-
-    // Register all the sources related to instance
-    sourceConfigs.foreach { kv =>
-      val classPath = kv._2.getProperty("class")
-      try {
-        val source = Class.forName(classPath).newInstance()
-        registerSource(source.asInstanceOf[Source])
-      } catch {
-        case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e)
-      }
-    }
-  }
-
-  def registerSinks() {
-    val instConfig = metricsConfig.getInstance(instance)
-    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
-
-    sinkConfigs.foreach { kv =>
-      val classPath = kv._2.getProperty("class")
-      try {
-        val sink = Class.forName(classPath)
-          .getConstructor(classOf[Properties], classOf[MetricRegistry])
-          .newInstance(kv._2, registry)
-        if (kv._1 == "servlet") {
-           metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
-        } else {
-          sinks += sink.asInstanceOf[Sink]
-        }
-      } catch {
-        case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
-      }
-    }
-  }
-}
-
-private[spark] object MetricsSystem {
-  val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
-  val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
-
-  val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
-  val MINIMAL_POLL_PERIOD = 1
-
-  def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
-    val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
-    if (period < MINIMAL_POLL_PERIOD) {
-      throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit +
-        " below than minimal polling period ")
-    }
-  }
-
-  def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
deleted file mode 100644
index 966ba37..0000000
--- a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.metrics.sink
-
-import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
-
-import java.util.Properties
-import java.util.concurrent.TimeUnit
-
-import spark.metrics.MetricsSystem
-
-class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
-  val CONSOLE_DEFAULT_PERIOD = 10
-  val CONSOLE_DEFAULT_UNIT = "SECONDS"
-
-  val CONSOLE_KEY_PERIOD = "period"
-  val CONSOLE_KEY_UNIT = "unit"
-
-  val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
-    case Some(s) => s.toInt
-    case None => CONSOLE_DEFAULT_PERIOD
-  }
-
-  val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
-    case Some(s) => TimeUnit.valueOf(s.toUpperCase())
-    case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
-  }
-
-  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
-
-  val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
-      .convertDurationsTo(TimeUnit.MILLISECONDS)
-      .convertRatesTo(TimeUnit.SECONDS)
-      .build()
-
-  override def start() {
-    reporter.start(pollPeriod, pollUnit)
-  }
-
-  override def stop() {
-    reporter.stop()
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/metrics/sink/CsvSink.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
deleted file mode 100644
index cb990af..0000000
--- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.metrics.sink
-
-import com.codahale.metrics.{CsvReporter, MetricRegistry}
-
-import java.io.File
-import java.util.{Locale, Properties}
-import java.util.concurrent.TimeUnit
-
-import spark.metrics.MetricsSystem
-
-class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink {
-  val CSV_KEY_PERIOD = "period"
-  val CSV_KEY_UNIT = "unit"
-  val CSV_KEY_DIR = "directory"
-
-  val CSV_DEFAULT_PERIOD = 10
-  val CSV_DEFAULT_UNIT = "SECONDS"
-  val CSV_DEFAULT_DIR = "/tmp/"
-
-  val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match {
-    case Some(s) => s.toInt
-    case None => CSV_DEFAULT_PERIOD
-  }
-
-  val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
-    case Some(s) => TimeUnit.valueOf(s.toUpperCase())
-    case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
-  }
-  
-  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
-
-  val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
-    case Some(s) => s
-    case None => CSV_DEFAULT_DIR
-  }
-
-  val reporter: CsvReporter = CsvReporter.forRegistry(registry)
-      .formatFor(Locale.US)
-      .convertDurationsTo(TimeUnit.MILLISECONDS)
-      .convertRatesTo(TimeUnit.SECONDS)
-      .build(new File(pollDir))
-
-  override def start() {
-    reporter.start(pollPeriod, pollUnit)
-  }
-
-  override def stop() {
-    reporter.stop()
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/metrics/sink/JmxSink.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
deleted file mode 100644
index ee04544..0000000
--- a/core/src/main/scala/spark/metrics/sink/JmxSink.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.metrics.sink
-
-import com.codahale.metrics.{JmxReporter, MetricRegistry}
-
-import java.util.Properties
-
-class JmxSink(val property: Properties, val registry: MetricRegistry) extends Sink {
-  val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
-
-  override def start() {
-    reporter.start()
-  }
-
-  override def stop() {
-    reporter.stop()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala
deleted file mode 100644
index 17432b1..0000000
--- a/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.metrics.sink
-
-import com.codahale.metrics.MetricRegistry
-import com.codahale.metrics.json.MetricsModule
-
-import com.fasterxml.jackson.databind.ObjectMapper
-
-import java.util.Properties
-import java.util.concurrent.TimeUnit
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.server.Handler
-
-import spark.ui.JettyUtils
-
-class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink {
-  val SERVLET_KEY_URI = "uri"
-  val SERVLET_KEY_SAMPLE = "sample"
-
-  val servletURI = property.getProperty(SERVLET_KEY_URI)
-
-  val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean
-
-  val mapper = new ObjectMapper().registerModule(
-    new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
-
-  def getHandlers = Array[(String, Handler)](
-    (servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
-  )
-
-  def getMetricsSnapshot(request: HttpServletRequest): String = {
-    mapper.writeValueAsString(registry)
-  }
-
-  override def start() { }
-
-  override def stop() { }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/metrics/sink/Sink.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/metrics/sink/Sink.scala b/core/src/main/scala/spark/metrics/sink/Sink.scala
deleted file mode 100644
index dad1a7f..0000000
--- a/core/src/main/scala/spark/metrics/sink/Sink.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.metrics.sink
-
-trait Sink {
-  def start: Unit
-  def stop: Unit
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/metrics/source/JvmSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/metrics/source/JvmSource.scala b/core/src/main/scala/spark/metrics/source/JvmSource.scala
deleted file mode 100644
index e771008..0000000
--- a/core/src/main/scala/spark/metrics/source/JvmSource.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.metrics.source
-
-import com.codahale.metrics.MetricRegistry
-import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet}
-
-class JvmSource extends Source {
-  val sourceName = "jvm"
-  val metricRegistry = new MetricRegistry()
-
-  val gcMetricSet = new GarbageCollectorMetricSet
-  val memGaugeSet = new MemoryUsageGaugeSet
-
-  metricRegistry.registerAll(gcMetricSet)
-  metricRegistry.registerAll(memGaugeSet)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/metrics/source/Source.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/metrics/source/Source.scala b/core/src/main/scala/spark/metrics/source/Source.scala
deleted file mode 100644
index 76199a0..0000000
--- a/core/src/main/scala/spark/metrics/source/Source.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.metrics.source
-
-import com.codahale.metrics.MetricRegistry
-
-trait Source {
-  def sourceName: String
-  def metricRegistry: MetricRegistry
-}


Mime
View raw message