spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject [1/2] spark git commit: [SPARK-4011] tighten the visibility of the members in Master/Worker class
Date Tue, 17 Mar 2015 11:18:35 GMT
Repository: spark
Updated Branches:
  refs/heads/master b2d8c0222 -> 25f35806e


http://git-wip-us.apache.org/repos/asf/spark/blob/25f35806/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index f2e7418..c1b0a29 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -42,7 +42,7 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger,
Utils}
 /**
   * @param masterAkkaUrls Each url should be a valid akka url.
   */
-private[spark] class Worker(
+private[worker] class Worker(
     host: String,
     port: Int,
     webUiPort: Int,
@@ -60,85 +60,90 @@ private[spark] class Worker(
   Utils.checkHost(host, "Expected hostname")
   assert (port > 0)
 
-  def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")  // For worker and executor
IDs
+  // For worker and executor IDs
+  private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")  
 
   // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
-  val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
+  private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
 
   // Model retries to connect to the master, after Hadoop's model.
   // The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds)
   // Afterwards, the next 10 attempts are between 30 and 90 seconds.
   // A bit of randomness is introduced so that not all of the workers attempt to reconnect
at
   // the same time.
-  val INITIAL_REGISTRATION_RETRIES = 6
-  val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
-  val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500
-  val REGISTRATION_RETRY_FUZZ_MULTIPLIER = {
+  private val INITIAL_REGISTRATION_RETRIES = 6
+  private val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
+  private val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500
+  private val REGISTRATION_RETRY_FUZZ_MULTIPLIER = {
     val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits)
     randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND
   }
-  val INITIAL_REGISTRATION_RETRY_INTERVAL = (math.round(10 *
+  private val INITIAL_REGISTRATION_RETRY_INTERVAL = (math.round(10 *
     REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds
-  val PROLONGED_REGISTRATION_RETRY_INTERVAL = (math.round(60
+  private val PROLONGED_REGISTRATION_RETRY_INTERVAL = (math.round(60
     * REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds
 
-  val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
+  private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
   // How often worker will clean up old app folders
-  val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) *
1000
+  private val CLEANUP_INTERVAL_MILLIS = 
+    conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
   // TTL for app folders/data;  after TTL expires it will be cleaned up
-  val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 *
3600)
-
-  val testing: Boolean = sys.props.contains("spark.testing")
-  var master: ActorSelection = null
-  var masterAddress: Address = null
-  var activeMasterUrl: String = ""
-  var activeMasterWebUiUrl : String = ""
-  val akkaUrl = AkkaUtils.address(
+  private val APP_DATA_RETENTION_SECS = 
+    conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
+
+  private val testing: Boolean = sys.props.contains("spark.testing")
+  private var master: ActorSelection = null
+  private var masterAddress: Address = null
+  private var activeMasterUrl: String = ""
+  private[worker] var activeMasterWebUiUrl : String = ""
+  private val akkaUrl = AkkaUtils.address(
     AkkaUtils.protocol(context.system),
     actorSystemName,
     host,
     port,
     actorName)
-  @volatile var registered = false
-  @volatile var connected = false
-  val workerId = generateWorkerId()
-  val sparkHome =
+  @volatile private var registered = false
+  @volatile private var connected = false
+  private val workerId = generateWorkerId()
+  private val sparkHome =
     if (testing) {
       assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
       new File(sys.props("spark.test.home"))
     } else {
       new File(sys.env.get("SPARK_HOME").getOrElse("."))
     }
+  
   var workDir: File = null
-  val executors = new HashMap[String, ExecutorRunner]
   val finishedExecutors = new HashMap[String, ExecutorRunner]
   val drivers = new HashMap[String, DriverRunner]
+  val executors = new HashMap[String, ExecutorRunner]
   val finishedDrivers = new HashMap[String, DriverRunner]
   val appDirectories = new HashMap[String, Seq[String]]
   val finishedApps = new HashSet[String]
 
   // The shuffle service is not actually started unless configured.
-  val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
+  private val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
 
-  val publicAddress = {
+  private val publicAddress = {
     val envVar = conf.getenv("SPARK_PUBLIC_DNS")
     if (envVar != null) envVar else host
   }
-  var webUi: WorkerWebUI = null
+  private var webUi: WorkerWebUI = null
 
-  var coresUsed = 0
-  var memoryUsed = 0
-  var connectionAttemptCount = 0
+  private var connectionAttemptCount = 0
 
-  val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
-  val workerSource = new WorkerSource(this)
+  private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
+  private val workerSource = new WorkerSource(this)
+  
+  private var registrationRetryTimer: Option[Cancellable] = None
 
-  var registrationRetryTimer: Option[Cancellable] = None
+  var coresUsed = 0
+  var memoryUsed = 0
 
   def coresFree: Int = cores - coresUsed
   def memoryFree: Int = memory - memoryUsed
 
-  def createWorkDir() {
+  private def createWorkDir() {
     workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
     try {
       // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
@@ -175,7 +180,7 @@ private[spark] class Worker(
     metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
   }
 
-  def changeMaster(url: String, uiUrl: String) {
+  private def changeMaster(url: String, uiUrl: String) {
     // activeMasterUrl it's a valid Spark url since we receive it from master.
     activeMasterUrl = url
     activeMasterWebUiUrl = uiUrl
@@ -252,7 +257,7 @@ private[spark] class Worker(
     }
   }
 
-  def registerWithMaster() {
+  private def registerWithMaster() {
     // DisassociatedEvent may be triggered multiple times, so don't attempt registration
     // if there are outstanding registration attempts scheduled.
     registrationRetryTimer match {
@@ -506,7 +511,7 @@ private[spark] class Worker(
     }
   }
 
-  def generateWorkerId(): String = {
+  private def generateWorkerId(): String = {
     "worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
   }
 
@@ -521,7 +526,7 @@ private[spark] class Worker(
   }
 }
 
-private[spark] object Worker extends Logging {
+private[deploy] object Worker extends Logging {
   def main(argStrings: Array[String]) {
     SignalLogger.register(log)
     val conf = new SparkConf
@@ -554,7 +559,7 @@ private[spark] object Worker extends Logging {
     (actorSystem, boundPort)
   }
 
-  private[spark] def isUseLocalNodeSSLConfig(cmd: Command): Boolean = {
+  def isUseLocalNodeSSLConfig(cmd: Command): Boolean = {
     val pattern = """\-Dspark\.ssl\.useNodeLocalConf\=(.+)""".r
     val result = cmd.javaOpts.collectFirst {
       case pattern(_result) => _result.toBoolean
@@ -562,7 +567,7 @@ private[spark] object Worker extends Logging {
     result.getOrElse(false)
   }
 
-  private[spark] def maybeUpdateSSLSettings(cmd: Command, conf: SparkConf): Command = {
+  def maybeUpdateSSLSettings(cmd: Command, conf: SparkConf): Command = {
     val prefix = "spark.ssl."
     val useNLC = "spark.ssl.useNodeLocalConf"
     if (isUseLocalNodeSSLConfig(cmd)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/25f35806/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 019cd70..88f9d88 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -25,7 +25,7 @@ import org.apache.spark.SparkConf
 /**
  * Command-line parser for the worker.
  */
-private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
+private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
   var host = Utils.localHostName()
   var port = 0
   var webUiPort = 8081
@@ -63,7 +63,7 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf)
{
 
   checkWorkerMemory()
 
-  def parse(args: List[String]): Unit = args match {
+  private def parse(args: List[String]): Unit = args match {
     case ("--ip" | "-i") :: value :: tail =>
       Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
       host = value

http://git-wip-us.apache.org/repos/asf/spark/blob/25f35806/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
index df1e01b..b36023b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
@@ -21,7 +21,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
 
 import org.apache.spark.metrics.source.Source
 
-private[spark] class WorkerSource(val worker: Worker) extends Source {
+private[worker] class WorkerSource(val worker: Worker) extends Source {
   override val sourceName = "worker"
   override val metricRegistry = new MetricRegistry()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/25f35806/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 63a8ac8..09d866f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -48,7 +48,7 @@ private[spark] class WorkerWatcher(workerUrl: String)
   private val expectedHostPort = AddressFromURIString(workerUrl).hostPort
   private def isWorker(address: Address) = address.hostPort == expectedHostPort
 
-  def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)
+  private def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)
 
   override def receiveWithLogging = {
     case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress)
=>

http://git-wip-us.apache.org/repos/asf/spark/blob/25f35806/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index ecb358c..88170d4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.Utils
 import org.apache.spark.Logging
 import org.apache.spark.util.logging.RollingFileAppender
 
-private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging
{
+private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging
{
   private val worker = parent.worker
   private val workDir = parent.workDir
 

http://git-wip-us.apache.org/repos/asf/spark/blob/25f35806/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
index 720f13b..9f9f27d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
@@ -31,10 +31,9 @@ import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
 import org.apache.spark.ui.{WebUIPage, UIUtils}
 import org.apache.spark.util.Utils
 
-private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
-  val workerActor = parent.worker.self
-  val worker = parent.worker
-  val timeout = parent.timeout
+private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
+  private val workerActor = parent.worker.self
+  private val timeout = parent.timeout
 
   override def renderJson(request: HttpServletRequest): JValue = {
     val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]

http://git-wip-us.apache.org/repos/asf/spark/blob/25f35806/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 7ac81a2..de6423b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -30,7 +30,7 @@ import org.apache.spark.util.AkkaUtils
 /**
  * Web UI server for the standalone worker.
  */
-private[spark]
+private[worker]
 class WorkerWebUI(
     val worker: Worker,
     val workDir: File,
@@ -38,7 +38,7 @@ class WorkerWebUI(
   extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
   with Logging {
 
-  val timeout = AkkaUtils.askTimeout(worker.conf)
+  private[ui] val timeout = AkkaUtils.askTimeout(worker.conf)
 
   initialize()
 
@@ -53,6 +53,6 @@ class WorkerWebUI(
   }
 }
 
-private[spark] object WorkerWebUI {
+private[ui] object WorkerWebUI {
   val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message