spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [26/50] [abbrv] git commit: Made akka capable of tolerating fatal exceptions and moving on.
Date Sat, 14 Dec 2013 08:42:10 GMT
Made akka capable of tolerating fatal exceptions and moving on.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5b11028a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5b11028a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5b11028a

Branch: refs/heads/master
Commit: 5b11028a0479623f41e95a41825a9bdfc944b323
Parents: 5618af6
Author: Prashant Sharma <prashant.s@imaginea.com>
Authored: Mon Dec 2 10:41:26 2013 +0530
Committer: Prashant Sharma <prashant.s@imaginea.com>
Committed: Mon Dec 2 10:47:39 2013 +0530

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/AkkaUtils.scala |   4 +-
 .../apache/spark/util/SparkActorSystem.scala    | 112 +++++++++++++++++++
 2 files changed, 114 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5b11028a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 5df8213..407e9ff 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.util
 
-import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.actor.{SparkActorSystem, ActorSystem, ExtendedActorSystem}
 import com.typesafe.config.ConfigFactory
 import scala.concurrent.duration._
 import scala.concurrent.Await
@@ -70,7 +70,7 @@ private[spark] object AkkaUtils {
       |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
       """.stripMargin)
 
-    val actorSystem = ActorSystem(name, akkaConf)
+    val actorSystem = SparkActorSystem(name, akkaConf)
 
     val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
     val boundPort = provider.getDefaultAddress.port.get

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5b11028a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala b/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
new file mode 100644
index 0000000..461e7ab
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
@@ -0,0 +1,112 @@
+/**
+ *  Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package akka.actor
+
+import com.typesafe.config.Config
+import akka.util._
+import scala.util.control.{NonFatal, ControlThrowable}
+
+/**
+ * An actorSystem specific to spark. It has an additional feature of letting spark tolerate
+ * fatal exceptions.
+ */
+object SparkActorSystem {
+
+  def apply(name: String, config: Config): ActorSystem = apply(name, config, findClassLoader())
+
+  def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem =
+    new SparkActorSystemImpl(name, config, classLoader).start()
+
+  /**
+   * INTERNAL API
+   */
+  private[akka] def findClassLoader(): ClassLoader = {
+    def findCaller(get: Int ⇒ Class[_]): ClassLoader =
+      Iterator.from(2 /*is the magic number, promise*/).map(get) dropWhile {
+        c ⇒
+          c != null &&
+            (c.getName.startsWith("akka.actor.ActorSystem") ||
+              c.getName.startsWith("scala.Option") ||
+              c.getName.startsWith("scala.collection.Iterator") ||
+              c.getName.startsWith("akka.util.Reflect"))
+      } next() match {
+        case null ⇒ getClass.getClassLoader
+        case c ⇒ c.getClassLoader
+      }
+
+    Option(Thread.currentThread.getContextClassLoader) orElse
+      (Reflect.getCallerClass map findCaller) getOrElse
+      getClass.getClassLoader
+  }
+}
+
+private[akka] class SparkActorSystemImpl(override val name: String,
+                                         applicationConfig: Config,
+                                         classLoader: ClassLoader)
+  extends ActorSystemImpl(name, applicationConfig, classLoader) {
+
+  protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler =
+    new Thread.UncaughtExceptionHandler() {
+      def uncaughtException(thread: Thread, cause: Throwable): Unit = {
+        cause match {
+          case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable
+          ⇒ log.error(cause, "Uncaught error from thread [{}]", thread.getName)
+          case _ ⇒
+            if (settings.JvmExitOnFatalError) {
+              try {
+                log.error(cause, "Uncaught error from thread [{}] shutting down JVM since
" +
+                  "'akka.jvm-exit-on-fatal-error' is enabled", thread.getName)
+                import System.err
+                err.print("Uncaught error from thread [")
+                err.print(thread.getName)
+                err.print("] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
for " +
+                  "ActorSystem[")
+                err.print(name)
+                err.println("]")
+                cause.printStackTrace(System.err)
+                System.err.flush()
+              } finally {
+                System.exit(-1)
+              }
+            } else {
+              log.error(cause, "Uncaught fatal error from thread [{}] not shutting down "
+
+                "ActorSystem tolerating and continuing.... [{}]", thread.getName, name)
+              //shutdown()                 //TODO make it configurable
+              if (thread.isAlive) log.error("Thread is still alive")
+              else {
+                log.error("Thread is dead")
+              }
+            }
+        }
+      }
+    }
+
+  override def stop(actor: ActorRef): Unit = {
+    val path = actor.path
+    val guard = guardian.path
+    val sys = systemGuardian.path
+    path.parent match {
+      case `guard` ⇒ guardian ! StopChild(actor)
+      case `sys` ⇒ systemGuardian ! StopChild(actor)
+      case _ ⇒ actor.asInstanceOf[InternalActorRef].stop()
+    }
+  }
+
+
+  override def /(actorName: String): ActorPath = guardian.path / actorName
+
+  override def /(path: Iterable[String]): ActorPath = guardian.path / path
+
+  private lazy val _start: this.type = {
+    // the provider is expected to start default loggers, LocalActorRefProvider does this
+    provider.init(this)
+    this
+  }
+
+  override def start(): this.type = _start
+
+  override def toString: String = lookupRoot.path.root.address.toString
+
+}


Mime
View raw message