iota-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tonyfaust...@apache.org
Subject [03/12] incubator-iota git commit: Refactoring MONITOR actor for active better tests results
Date Mon, 18 Jul 2016 20:43:26 GMT
Refactoring MONITOR actor for active better tests results

- Avoid instantiation of FeyCore and io.spray when actors being tested send message to static
actor Monitor


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/7202f0f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/7202f0f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/7202f0f2

Branch: refs/heads/master
Commit: 7202f0f209e14847d1d9dda192b5718d6ffa719a
Parents: c845630
Author: Barbara Gomes <barbaramaltagomes@gmail.com>
Authored: Fri Jul 15 11:22:30 2016 -0700
Committer: Barbara Gomes <barbaramaltagomes@gmail.com>
Committed: Fri Jul 15 11:22:30 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/iota/fey/Application.scala | 14 ++++++++++---
 .../scala/org/apache/iota/fey/Ensemble.scala    |  9 +++++----
 .../scala/org/apache/iota/fey/FeyCore.scala     | 10 ++++++----
 .../org/apache/iota/fey/FeyGenericActor.scala   | 21 ++++++++++----------
 .../org/apache/iota/fey/JsonReceiverActor.scala |  7 ++++---
 .../org/apache/iota/fey/Orchestration.scala     | 10 +++++-----
 6 files changed, 42 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/7202f0f2/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
index 1e5ee69..635bb09 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
@@ -34,18 +34,26 @@ object Application extends App {
 
 }
 
+object FEY_SYSTEM{
+  implicit val system = ActorSystem("FEY-MANAGEMENT-SYSTEM")
+}
+
 object SYSTEM_ACTORS{
 
-  implicit val system = ActorSystem("FEY-MANAGEMENT-SYSTEM")
+  import FEY_SYSTEM._
 
   val fey = system.actorOf(FeyCore.props, name = "FEY-CORE")
   fey ! FeyCore.START
 
   val service = system.actorOf(Props[MyServiceActor], name = "FEY_REST_API")
 
-  val monitoring = system.actorOf(Props[Monitor], "FEY-MONITOR")
-
   implicit val timeout = Timeout(800.seconds)
   IO(Http) ? Http.Bind(SYSTEM_ACTORS.service, interface = "0.0.0.0", port = 16666)
 
 }
+
+object FEY_MONITOR{
+  import FEY_SYSTEM._
+
+  val actorRef = system.actorOf(Props[Monitor], "FEY-MONITOR")
+}

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/7202f0f2/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index da3c0bb..cc7640c 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -32,6 +32,7 @@ protected class Ensemble(val orchestrationID: String,
 
   import Ensemble._
 
+  val monitoring_actor = FEY_MONITOR.actorRef
   var performers_metadata: Map[String, Performer] = Map.empty[String, Performer]
   var connectors: Map[String,Array[String]] = Map.empty[String,Array[String]]
   var performer: Map[String,ActorRef] = Map.empty[String,ActorRef]
@@ -50,7 +51,7 @@ protected class Ensemble(val orchestrationID: String,
       context.actorSelection(s"*") ! FeyGenericActor.PRINT_PATH
 
     case Terminated(actor) =>
-      SYSTEM_ACTORS.monitoring  ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
+      monitoring_actor  ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
       log.error(s"DEAD nPerformers ${actor.path.name}")
       context.children.foreach{ child =>
         context.unwatch(child)
@@ -80,7 +81,7 @@ protected class Ensemble(val orchestrationID: String,
     */
   override def preStart() : Unit = {
 
-    SYSTEM_ACTORS.monitoring  ! Monitor.START(Utils.getTimestamp)
+    monitoring_actor ! Monitor.START(Utils.getTimestamp)
 
     val connectors_js = (ensembleSpec \ CONNECTIONS).as[List[JsObject]]
     val performers_js = (ensembleSpec \ PERFORMERS).as[List[JsObject]]
@@ -100,11 +101,11 @@ protected class Ensemble(val orchestrationID: String,
   }
 
   override def postStop() : Unit = {
-    SYSTEM_ACTORS.monitoring  ! Monitor.STOP(Utils.getTimestamp)
+    monitoring_actor  ! Monitor.STOP(Utils.getTimestamp)
   }
 
   override def postRestart(reason: Throwable): Unit = {
-    SYSTEM_ACTORS.monitoring  ! Monitor.RESTART(reason, Utils.getTimestamp)
+    monitoring_actor  ! Monitor.RESTART(reason, Utils.getTimestamp)
     preStart()
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/7202f0f2/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
index 8f0b44f..8028f05 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
@@ -38,6 +38,8 @@ protected class FeyCore extends Actor with ActorLogging{
   import FeyCore._
   import CONFIG._
 
+  val monitoring_actor = FEY_MONITOR.actorRef
+
   val identifier: ActorRef = context.actorOf(Props(classOf[IdentifyFeyActors]), name = IDENTIFIER_NAME)
   context.watch(identifier)
 
@@ -75,7 +77,7 @@ protected class FeyCore extends Actor with ActorLogging{
   }
 
   private def processTerminatedMessage(actorRef: ActorRef) = {
-    SYSTEM_ACTORS.monitoring ! Monitor.TERMINATE(actorRef.path.toString, Utils.getTimestamp)
+    monitoring_actor ! Monitor.TERMINATE(actorRef.path.toString, Utils.getTimestamp)
     log.info(s"TERMINATED ${actorRef.path.name}")
     FEY_CACHE.activeOrchestrations.remove(actorRef.path.name)
     if(!FEY_CACHE.orchestrationsAwaitingTermination.isEmpty) {
@@ -87,19 +89,19 @@ protected class FeyCore extends Actor with ActorLogging{
     * Clean up Fey Cache
     */
   override def postStop(): Unit = {
-    SYSTEM_ACTORS.monitoring ! Monitor.STOP(Utils.getTimestamp)
+    monitoring_actor ! Monitor.STOP(Utils.getTimestamp)
     FEY_CACHE.activeOrchestrations.clear()
     FEY_CACHE.orchestrationsAwaitingTermination.clear()
     ORCHESTRATION_CACHE.orchestration_metadata.clear()
   }
 
   override def preStart(): Unit = {
-    SYSTEM_ACTORS.monitoring ! Monitor.START(Utils.getTimestamp)
+    monitoring_actor ! Monitor.START(Utils.getTimestamp)
     log.info("Starting Fey Core")
   }
 
   override def postRestart(reason: Throwable): Unit = {
-    SYSTEM_ACTORS.monitoring ! Monitor.RESTART(reason, Utils.getTimestamp)
+    monitoring_actor ! Monitor.RESTART(reason, Utils.getTimestamp)
     preStart()
     self ! START
   }

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/7202f0f2/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
index 9c6f5c5..f1c94e1 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
@@ -51,6 +51,7 @@ abstract class FeyGenericActor(val params: Map[String,String] = Map.empty,
     */
   @volatile private var scheduler: Cancellable = null
   @volatile private var endBackoff: Long = 0
+  private val monitoring_actor = FEY_MONITOR.actorRef
 
   override final def receive: Receive = {
 
@@ -81,27 +82,27 @@ abstract class FeyGenericActor(val params: Map[String,String] = Map.empty,
     postStop()
   }
 
-  override final def preStart() = {
-    SYSTEM_ACTORS.monitoring  ! Monitor.START(Utils.getTimestamp, startMonitorInfo)
+  override final def preStart(): Unit = {
+    monitoring_actor  ! Monitor.START(Utils.getTimestamp, startMonitorInfo)
     onStart()
     startScheduler()
   }
 
-  override final def postStop() = {
-    SYSTEM_ACTORS.monitoring  ! Monitor.STOP(Utils.getTimestamp, stopMonitorInfo)
+  override final def postStop(): Unit = {
+    monitoring_actor  ! Monitor.STOP(Utils.getTimestamp, stopMonitorInfo)
     log.info(s"STOPPED actor ${self.path.name}")
     stopScheduler()
     onStop()
   }
 
-  override final def postRestart(reason: Throwable) = {
-    SYSTEM_ACTORS.monitoring  ! Monitor.RESTART(reason, Utils.getTimestamp)
+  override final def postRestart(reason: Throwable): Unit = {
+    monitoring_actor  ! Monitor.RESTART(reason, Utils.getTimestamp)
     log.info(s"RESTARTED Actor ${self.path.name}")
     preStart()
     onRestart(reason)
   }
 
-  def onRestart(reason: Throwable) = {
+  def onRestart(reason: Throwable): Unit = {
     log.info("RESTARTED method")
   }
 
@@ -118,7 +119,7 @@ abstract class FeyGenericActor(val params: Map[String,String] = Map.empty,
     * Enables the backoff.
     * Actor will drop the PROCESS messages that are sent during the backoff period time.
     */
-  final def startBackoff() = {
+  final def startBackoff(): Unit = {
     this.endBackoff = System.nanoTime() + this.backoff.toNanos
   }
 
@@ -141,7 +142,7 @@ abstract class FeyGenericActor(val params: Map[String,String] = Map.empty,
   /**
     * Called by the scheduler.
     */
-  def execute() = {
+  def execute(): Unit = {
     log.info(s"Executing action in ${self.path.name}")
   }
 
@@ -166,7 +167,7 @@ abstract class FeyGenericActor(val params: Map[String,String] = Map.empty,
     * @param message message to be propagated
     * @tparam T Any
     */
-  final def propagateMessage[T](message: T) = {
+  final def propagateMessage[T](message: T): Unit = {
     connectTo.foreach(linkedActor => {
       linkedActor._2 ! PROCESS(message)
     })

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/7202f0f2/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
index 2a05710..6170524 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
@@ -28,6 +28,7 @@ class JsonReceiverActor extends Actor with ActorLogging {
 
   import JsonReceiverActor._
 
+  val monitoring_actor = FEY_MONITOR.actorRef
   val watchFileTask = new WatchServiceReceiver(self)
   var watchThread = new Thread(watchFileTask, "WatchService")
 
@@ -35,7 +36,7 @@ class JsonReceiverActor extends Actor with ActorLogging {
     prepareDynamicJarRepo()
     processCheckpointFiles()
 
-    SYSTEM_ACTORS.monitoring  ! Monitor.START(Utils.getTimestamp)
+    monitoring_actor  ! Monitor.START(Utils.getTimestamp)
     watchThread.setDaemon(true)
     watchThread.start()
 
@@ -60,13 +61,13 @@ class JsonReceiverActor extends Actor with ActorLogging {
   }
 
   override def postStop() {
-    SYSTEM_ACTORS.monitoring  ! Monitor.STOP(Utils.getTimestamp)
+    monitoring_actor  ! Monitor.STOP(Utils.getTimestamp)
     watchThread.interrupt()
     watchThread.join()
   }
 
   override def postRestart(reason: Throwable): Unit = {
-    SYSTEM_ACTORS.monitoring  ! Monitor.RESTART(reason, Utils.getTimestamp)
+    monitoring_actor  ! Monitor.RESTART(reason, Utils.getTimestamp)
     preStart()
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/7202f0f2/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
index 5ec7588..ad61a37 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
@@ -36,8 +36,8 @@ protected class Orchestration(val name: String,
     * List of map of Ensembles = [EnsembleID, Ensemble]
     */
   private val ensembles:HashMap[String, ActorRef] = HashMap.empty[String, ActorRef]
-
   private val awaitingTermination:HashMap[String, JsObject] = HashMap.empty[String, JsObject]
+  private val monitoring_actor = FEY_MONITOR.actorRef
 
   override def receive: Receive = {
 
@@ -50,7 +50,7 @@ protected class Orchestration(val name: String,
       context.actorSelection(s"*") ! Ensemble.PRINT_ENSEMBLE
 
     case Terminated(actor) =>
-      SYSTEM_ACTORS.monitoring  ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
+      monitoring_actor  ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
       context.unwatch(actor)
       log.warning(s"ACTOR DEAD ${actor.path}")
       ensembles.remove(actor.path.name)
@@ -69,19 +69,19 @@ protected class Orchestration(val name: String,
     }
 
   override def preStart(): Unit = {
-    SYSTEM_ACTORS.monitoring  ! Monitor.START(Utils.getTimestamp)
+    monitoring_actor  ! Monitor.START(Utils.getTimestamp)
     if (ORCHESTRATION_CACHE.orchestration_metadata.contains(guid)){
       replayOrchestrationState()
     }
   }
 
   override def postStop() = {
-    SYSTEM_ACTORS.monitoring  ! Monitor.STOP(Utils.getTimestamp)
+    monitoring_actor  ! Monitor.STOP(Utils.getTimestamp)
     log.info(s"STOPPED ${self.path.name}")
   }
 
   override def postRestart(reason: Throwable): Unit = {
-    SYSTEM_ACTORS.monitoring  ! Monitor.RESTART(reason, Utils.getTimestamp)
+    monitoring_actor  ! Monitor.RESTART(reason, Utils.getTimestamp)
     log.info(s"RESTARTED ${self.path}")
     preStart()
   }


Mime
View raw message