spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-9602] remove "Akka/Actor" words from comments
Date Tue, 04 Aug 2015 21:54:33 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 f771a83f4 -> 560b2da78


[SPARK-9602] remove "Akka/Actor" words from comments

https://issues.apache.org/jira/browse/SPARK-9602

Although we have hidden Akka behind RPC interface, I found that the Akka/Actor-related comments
are still spreading everywhere. To make it consistent, we shall remove "actor"/"akka" words
from the comments...

Author: CodingCat <zhunansjtu@gmail.com>

Closes #7936 from CodingCat/SPARK-9602 and squashes the following commits:

e8296a3 [CodingCat] remove actor words from comments

(cherry picked from commit 9d668b73687e697cad2ef7fd3c3ba405e9795593)
Signed-off-by: Reynold Xin <rxin@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/560b2da7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/560b2da7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/560b2da7

Branch: refs/heads/branch-1.5
Commit: 560b2da783bc25bd8767f6888665dadecac916d8
Parents: f771a83
Author: CodingCat <zhunansjtu@gmail.com>
Authored: Tue Aug 4 14:54:11 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Tue Aug 4 14:54:30 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/api/python/PythonRDD.scala    | 2 +-
 .../scala/org/apache/spark/deploy/LocalSparkCluster.scala     | 4 ----
 .../main/scala/org/apache/spark/deploy/client/AppClient.scala | 2 +-
 .../org/apache/spark/deploy/master/LeaderElectionAgent.scala  | 6 +++---
 .../scala/org/apache/spark/deploy/master/MasterMessages.scala | 2 +-
 .../spark/deploy/master/ZooKeeperLeaderElectionAgent.scala    | 6 +++---
 .../main/scala/org/apache/spark/deploy/worker/Worker.scala    | 7 ++++---
 .../scala/org/apache/spark/deploy/worker/WorkerWatcher.scala  | 4 ++--
 core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala | 2 +-
 .../org/apache/spark/scheduler/OutputCommitCoordinator.scala  | 2 +-
 .../org/apache/spark/scheduler/cluster/ExecutorData.scala     | 2 +-
 core/src/main/scala/org/apache/spark/util/IdGenerator.scala   | 6 +++---
 .../spark/deploy/master/CustomRecoveryModeFactory.scala       | 4 ++--
 .../org/apache/spark/deploy/worker/WorkerWatcherSuite.scala   | 5 ++---
 project/MimaExcludes.scala                                    | 2 +-
 .../src/main/scala/org/apache/spark/repl/SparkILoop.scala     | 2 +-
 16 files changed, 27 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 55e563e..2a56bf2 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -794,7 +794,7 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
 
   /**
    * We try to reuse a single Socket to transfer accumulator updates, as they are all added
-   * by the DAGScheduler's single-threaded actor anyway.
+   * by the DAGScheduler's single-threaded RpcEndpoint anyway.
    */
   @transient var socket: Socket = _
 

http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 53356ad..83ccaad 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -73,12 +73,8 @@ class LocalSparkCluster(
   def stop() {
     logInfo("Shutting down local Spark cluster.")
     // Stop the workers before the master so they don't get upset that it disconnected
-    // TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors!
-    //       This is unfortunate, but for now we just comment it out.
     workerRpcEnvs.foreach(_.shutdown())
-    // workerActorSystems.foreach(_.awaitTermination())
     masterRpcEnvs.foreach(_.shutdown())
-    // masterActorSystems.foreach(_.awaitTermination())
     masterRpcEnvs.clear()
     workerRpcEnvs.clear()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 7576a29..25ea692 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -257,7 +257,7 @@ private[spark] class AppClient(
   }
 
   def start() {
-    // Just launch an actor; it will call back into the listener.
+    // Just launch an rpcEndpoint; it will call back into the listener.
     endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
index cf77c86..70f21fb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
@@ -26,7 +26,7 @@ import org.apache.spark.annotation.DeveloperApi
  */
 @DeveloperApi
 trait LeaderElectionAgent {
-  val masterActor: LeaderElectable
+  val masterInstance: LeaderElectable
   def stop() {} // to avoid noops in implementations.
 }
 
@@ -37,7 +37,7 @@ trait LeaderElectable {
 }
 
 /** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader.
*/
-private[spark] class MonarchyLeaderAgent(val masterActor: LeaderElectable)
+private[spark] class MonarchyLeaderAgent(val masterInstance: LeaderElectable)
   extends LeaderElectionAgent {
-  masterActor.electedLeader()
+  masterInstance.electedLeader()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
index 68c9371..a952cee 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
@@ -38,5 +38,5 @@ private[master] object MasterMessages {
 
   case object BoundPortsRequest
 
-  case class BoundPortsResponse(actorPort: Int, webUIPort: Int, restPort: Option[Int])
+  case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int])
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 6fdff86..d317206 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -22,7 +22,7 @@ import org.apache.curator.framework.CuratorFramework
 import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}
 import org.apache.spark.deploy.SparkCuratorUtil
 
-private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
+private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable,
     conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging  {
 
   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
@@ -73,10 +73,10 @@ private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElecta
   private def updateLeadershipStatus(isLeader: Boolean) {
     if (isLeader && status == LeadershipStatus.NOT_LEADER) {
       status = LeadershipStatus.LEADER
-      masterActor.electedLeader()
+      masterInstance.electedLeader()
     } else if (!isLeader && status == LeadershipStatus.LEADER) {
       status = LeadershipStatus.NOT_LEADER
-      masterActor.revokedLeadership()
+      masterInstance.revokedLeadership()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index c82a7cc..6792d33 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -228,7 +228,7 @@ private[deploy] class Worker(
   /**
    * Re-register with the master because a network failure or a master failure has occurred.
    * If the re-registration attempt threshold is exceeded, the worker exits with error.
-   * Note that for thread-safety this should only be called from the actor.
+   * Note that for thread-safety this should only be called from the rpcEndpoint.
    */
   private def reregisterWithMaster(): Unit = {
     Utils.tryOrExit {
@@ -365,7 +365,8 @@ private[deploy] class Worker(
       if (connected) { sendToMaster(Heartbeat(workerId, self)) }
 
     case WorkDirCleanup =>
-      // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker
actor
+      // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker
+      // rpcEndpoint.
       // Copy ids so that it can be used in the cleanup thread.
       val appIds = executors.values.map(_.appId).toSet
       val cleanupFuture = concurrent.future {
@@ -684,7 +685,7 @@ private[deploy] object Worker extends Logging {
       workerNumber: Option[Int] = None,
       conf: SparkConf = new SparkConf): RpcEnv = {
 
-    // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
+    // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
     val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
     val securityMgr = new SecurityManager(conf)
     val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)

http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index fae5640..735c4f0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -43,7 +43,7 @@ private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl:
Strin
   private[deploy] def setTesting(testing: Boolean) = isTesting = testing
   private var isTesting = false
 
-  // Lets us filter events only from the worker's actor system
+  // Lets filter events only from the worker's rpc system
   private val expectedAddress = RpcAddress.fromURIString(workerUrl)
   private def isWorker(address: RpcAddress) = expectedAddress == address
 
@@ -62,7 +62,7 @@ private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl:
Strin
   override def onDisconnected(remoteAddress: RpcAddress): Unit = {
     if (isWorker(remoteAddress)) {
       // This log message will never be seen
-      logError(s"Lost connection to worker actor $workerUrl. Exiting.")
+      logError(s"Lost connection to worker rpc endpoint $workerUrl. Exiting.")
       exitNonZero()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index 6ae4789..7409ac8 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -100,7 +100,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
         val future = ask[T](message, timeout)
         val result = timeout.awaitResult(future)
         if (result == null) {
-          throw new SparkException("Actor returned null")
+          throw new SparkException("RpcEndpoint returned null")
         }
         return result
       } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 8321037..5d92637 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -162,7 +162,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver:
Boolean)
 
 private[spark] object OutputCommitCoordinator {
 
-  // This actor is used only for RPC
+  // This endpoint is used only for RPC
   private[spark] class OutputCommitCoordinatorEndpoint(
       override val rpcEnv: RpcEnv, outputCommitCoordinator: OutputCommitCoordinator)
     extends RpcEndpoint with Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
index 26e72c0..626a2b7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -22,7 +22,7 @@ import org.apache.spark.rpc.{RpcEndpointRef, RpcAddress}
 /**
  * Grouping of data for an executor used by CoarseGrainedSchedulerBackend.
  *
- * @param executorEndpoint The ActorRef representing this executor
+ * @param executorEndpoint The RpcEndpointRef representing this executor
  * @param executorAddress The network address of this executor
  * @param executorHost The hostname that this executor is running on
  * @param freeCores  The current number of cores available for work on the executor

http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/core/src/main/scala/org/apache/spark/util/IdGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/IdGenerator.scala b/core/src/main/scala/org/apache/spark/util/IdGenerator.scala
index 17e55f7..53934ad 100644
--- a/core/src/main/scala/org/apache/spark/util/IdGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/util/IdGenerator.scala
@@ -22,10 +22,10 @@ import java.util.concurrent.atomic.AtomicInteger
 /**
  * A util used to get a unique generation ID. This is a wrapper around Java's
  * AtomicInteger. An example usage is in BlockManager, where each BlockManager
- * instance would start an Akka actor and we use this utility to assign the Akka
- * actors unique names.
+ * instance would start an RpcEndpoint and we use this utility to assign the RpcEndpoints'
+ * unique names.
  */
 private[spark] class IdGenerator {
-  private var id = new AtomicInteger
+  private val id = new AtomicInteger
   def next: Int = id.incrementAndGet
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala
b/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala
index 8c96b0e..4b86da5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala
@@ -99,7 +99,7 @@ object CustomPersistenceEngine {
   @volatile var lastInstance: Option[CustomPersistenceEngine] = None
 }
 
-class CustomLeaderElectionAgent(val masterActor: LeaderElectable) extends LeaderElectionAgent
{
-  masterActor.electedLeader()
+class CustomLeaderElectionAgent(val masterInstance: LeaderElectable) extends LeaderElectionAgent
{
+  masterInstance.electedLeader()
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
index cd24d79..e9034e3 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
@@ -38,12 +38,11 @@ class WorkerWatcherSuite extends SparkFunSuite {
     val conf = new SparkConf()
     val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
     val targetWorkerUrl = rpcEnv.uriOf("test", RpcAddress("1.2.3.4", 1234), "Worker")
-    val otherAddress = "akka://test@4.3.2.1:1234/user/OtherActor"
-    val otherAkkaAddress = RpcAddress("4.3.2.1", 1234)
+    val otherRpcAddress = RpcAddress("4.3.2.1", 1234)
     val workerWatcher = new WorkerWatcher(rpcEnv, targetWorkerUrl)
     workerWatcher.setTesting(testing = true)
     rpcEnv.setupEndpoint("worker-watcher", workerWatcher)
-    workerWatcher.onDisconnected(otherAkkaAddress)
+    workerWatcher.onDisconnected(otherRpcAddress)
     assert(!workerWatcher.isShutDown)
     rpcEnv.shutdown()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 280aac9..b60ae78 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -182,7 +182,7 @@ object MimaExcludes {
             ProblemFilters.exclude[IncompatibleResultTypeProblem](
               "org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast"),
             ProblemFilters.exclude[MissingClassProblem](
-              "org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor")
+              "org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint")
           ) ++ Seq(
             // SPARK-4655 - Making Stage an Abstract class broke binary compatility even
though
             // the stage class is defined as private[spark]

http://git-wip-us.apache.org/repos/asf/spark/blob/560b2da7/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 8130868..304b1e8 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -981,7 +981,7 @@ class SparkILoop(
     // which spins off a separate thread, then print the prompt and try
     // our best to look ready.  The interlocking lazy vals tend to
     // inter-deadlock, so we break the cycle with a single asynchronous
-    // message to an actor.
+    // message to an rpcEndpoint.
     if (isAsync) {
       intp initialize initializedCallback()
       createAsyncListener() // listens for signal to run postInitialization


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message