spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject git commit: SPARK-3093 : masterLock in Worker is no longer need
Date Mon, 18 Aug 2014 16:34:45 GMT
Repository: spark
Updated Branches:
  refs/heads/master 9306b8c6c -> c0cbbdeaf


SPARK-3093 : masterLock in Worker is no longer need

there's no need to use masterLock in Worker now since all communications are within Akka actor

Author: CrazyJvm <crazyjvm@gmail.com>

Closes #2008 from CrazyJvm/no-need-master-lock and squashes the following commits:

dd39e20 [CrazyJvm] fix format
58e7fa5 [CrazyJvm] there's no need to use masterLock now since all communications are within
Akka actor


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0cbbdea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0cbbdea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0cbbdea

Branch: refs/heads/master
Commit: c0cbbdeaf4f2033be03d32e3ea0288812b4edbf6
Parents: 9306b8c
Author: CrazyJvm <crazyjvm@gmail.com>
Authored: Mon Aug 18 09:34:36 2014 -0700
Committer: Aaron Davidson <aaron@databricks.com>
Committed: Mon Aug 18 09:34:36 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/worker/Worker.scala | 41 +++++++-------------
 1 file changed, 14 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c0cbbdea/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 80fde7e..81400af 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -72,7 +72,6 @@ private[spark] class Worker(
   val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 *
3600)
 
   val testing: Boolean = sys.props.contains("spark.testing")
-  val masterLock: Object = new Object()
   var master: ActorSelection = null
   var masterAddress: Address = null
   var activeMasterUrl: String = ""
@@ -145,18 +144,16 @@ private[spark] class Worker(
   }
 
   def changeMaster(url: String, uiUrl: String) {
-    masterLock.synchronized {
-      activeMasterUrl = url
-      activeMasterWebUiUrl = uiUrl
-      master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
-      masterAddress = activeMasterUrl match {
-        case Master.sparkUrlRegex(_host, _port) =>
-          Address("akka.tcp", Master.systemName, _host, _port.toInt)
-        case x =>
-          throw new SparkException("Invalid spark URL: " + x)
-      }
-      connected = true
+    activeMasterUrl = url
+    activeMasterWebUiUrl = uiUrl
+    master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
+    masterAddress = activeMasterUrl match {
+      case Master.sparkUrlRegex(_host, _port) =>
+        Address("akka.tcp", Master.systemName, _host, _port.toInt)
+      case x =>
+        throw new SparkException("Invalid spark URL: " + x)
     }
+    connected = true
   }
 
   def tryRegisterAllMasters() {
@@ -199,9 +196,7 @@ private[spark] class Worker(
       }
 
     case SendHeartbeat =>
-      masterLock.synchronized {
-        if (connected) { master ! Heartbeat(workerId) }
-      }
+      if (connected) { master ! Heartbeat(workerId) }
 
     case WorkDirCleanup =>
       // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker
actor
@@ -244,9 +239,7 @@ private[spark] class Worker(
           manager.start()
           coresUsed += cores_
           memoryUsed += memory_
-          masterLock.synchronized {
-            master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
-          }
+          master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
         } catch {
           case e: Exception => {
             logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
@@ -254,17 +247,13 @@ private[spark] class Worker(
               executors(appId + "/" + execId).kill()
               executors -= appId + "/" + execId
             }
-            masterLock.synchronized {
-              master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
-            }
+            master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
           }
         }
       }
 
     case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
-      masterLock.synchronized {
-        master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
-      }
+      master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
       val fullId = appId + "/" + execId
       if (ExecutorState.isFinished(state)) {
         executors.get(fullId) match {
@@ -330,9 +319,7 @@ private[spark] class Worker(
         case _ =>
           logDebug(s"Driver $driverId changed state to $state")
       }
-      masterLock.synchronized {
-        master ! DriverStateChanged(driverId, state, exception)
-      }
+      master ! DriverStateChanged(driverId, state, exception)
       val driver = drivers.remove(driverId).get
       finishedDrivers(driverId) = driver
       memoryUsed -= driver.driverDesc.mem


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message