iota-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tonyfaust...@apache.org
Subject [01/10] incubator-iota git commit: [IOTA-28] Preparing for Receiver actor
Date Wed, 28 Sep 2016 01:19:11 GMT
Repository: incubator-iota
Updated Branches:
  refs/heads/master 2acc7fc87 -> 9b619625d


[IOTA-28] Preparing for Receiver actor

 - Creating separate object for fey_core ActorRef
 - Updating ORCHESTRATION_RECEIVED to accept Option[File]


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/000ca914
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/000ca914
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/000ca914

Branch: refs/heads/master
Commit: 000ca914beee95dd728c47c8e0bfe10deb977574
Parents: 2acc7fc
Author: Barbara Gomes <barbaramaltagomes@gmail.com>
Authored: Fri Jul 22 11:53:26 2016 -0700
Committer: Barbara Gomes <barbaramaltagomes@gmail.com>
Committed: Fri Jul 22 11:53:26 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/iota/fey/Application.scala | 10 ++++-
 .../scala/org/apache/iota/fey/FeyCore.scala     | 42 +++++++++++++++-----
 .../org/apache/iota/fey/FeyGenericActor.scala   |  1 +
 .../org/apache/iota/fey/JsonReceiverActor.scala |  9 +++--
 .../scala/org/apache/iota/fey/MyService.scala   |  2 +-
 .../scala/org/apache/iota/fey/FeyCoreSpec.scala | 12 +++---
 6 files changed, 54 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/000ca914/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
index eb08d63..637fbf0 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
@@ -42,8 +42,9 @@ object SYSTEM_ACTORS{
 
   import FEY_SYSTEM._
 
-  val fey = system.actorOf(FeyCore.props, name = "FEY-CORE")
-  fey ! FeyCore.START
+  FEY_CORE_ACTOR
+
+  FEY_CORE_ACTOR.actorRef ! FeyCore.START
 
   val service = system.actorOf(Props[MyServiceActor], name = "FEY_REST_API")
 
@@ -52,6 +53,11 @@ object SYSTEM_ACTORS{
 
 }
 
+object FEY_CORE_ACTOR{
+  import FEY_SYSTEM._
+  val actorRef = system.actorOf(FeyCore.props, name = "FEY-CORE")
+}
+
 object FEY_MONITOR{
   import FEY_SYSTEM._
 

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/000ca914/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
index 8028f05..4909a0b 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
@@ -52,17 +52,15 @@ protected class FeyCore extends Actor with ActorLogging{
       val jsonReceiverActor: ActorRef = context.actorOf(Props[JsonReceiverActor], name =
JSON_RECEIVER_NAME)
       context.watch(jsonReceiverActor)
 
-    case ORCHESTRATION_RECEIVED(orchestrationJson, file) =>
-      log.info(s"NEW FILE ${file.getAbsolutePath}")
-      try{
-        processJson(orchestrationJson)
-        renameProcessedFile(file, "processed")
-      }catch {
-        case e: Exception =>
-          renameProcessedFile(file, "failed")
-          log.error(e, s"JSON not processed ${file.getAbsolutePath}")
+    case ORCHESTRATION_RECEIVED(orchestrationJson, optionFile) =>
+      optionFile match {
+        case Some(file) =>
+          orchestrationReceivedWithFile(orchestrationJson, file)
+        case None =>
+          orchestrationReceivedNoFile(orchestrationJson)
       }
 
+
     case STOP_EMPTY_ORCHESTRATION(orchID) =>
       log.warning(s"Deleting Empty Orchestration $orchID")
       deleteOrchestration(orchID)
@@ -76,6 +74,29 @@ protected class FeyCore extends Actor with ActorLogging{
 
   }
 
+  private def orchestrationReceivedNoFile(json: JsValue) = {
+    val orchGUID = (json \ GUID).as[String]
+    log.info(s"Orchestration $orchGUID received")
+    try{
+      processJson(json)
+    }catch {
+      case e: Exception =>
+        log.error(e, s"JSON for orchestration $orchGUID could not be processed")
+    }
+  }
+
+  private def orchestrationReceivedWithFile(json: JsValue, file: File) = {
+    log.info(s"NEW FILE ${file.getAbsolutePath}")
+    try{
+      processJson(json)
+      renameProcessedFile(file, "processed")
+    }catch {
+      case e: Exception =>
+        renameProcessedFile(file, "failed")
+        log.error(e, s"JSON not processed ${file.getAbsolutePath}")
+    }
+  }
+
   private def processTerminatedMessage(actorRef: ActorRef) = {
     monitoring_actor ! Monitor.TERMINATE(actorRef.path.toString, Utils.getTimestamp)
     log.info(s"TERMINATED ${actorRef.path.name}")
@@ -306,7 +327,8 @@ protected object FeyCore{
     * @param json
     * @param file
     */
-  case class ORCHESTRATION_RECEIVED(json: JsValue, file: File)
+  case class ORCHESTRATION_RECEIVED(json: JsValue, file: Option[File])
+
 
   case class STOP_EMPTY_ORCHESTRATION(orchID: String)
 

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/000ca914/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
index 9a6fb78..36f39fa 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
@@ -1,3 +1,4 @@
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/000ca914/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
index 1664478..3fb5330 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
@@ -29,13 +29,16 @@ class JsonReceiverActor extends Actor with ActorLogging {
   import JsonReceiverActor._
 
   val monitoring_actor = FEY_MONITOR.actorRef
-  val watchFileTask = new WatchServiceReceiver(self)
-  var watchThread = new Thread(watchFileTask, GLOBAL_DEFINITIONS.WATCH_SERVICE_THREAD)
+  var watchFileTask: WatchServiceReceiver = _
+  var watchThread: Thread = _
 
   override def preStart() {
     prepareDynamicJarRepo()
     processCheckpointFiles()
 
+    watchFileTask = new WatchServiceReceiver(self)
+    watchThread = new Thread(watchFileTask, GLOBAL_DEFINITIONS.WATCH_SERVICE_THREAD)
+
     monitoring_actor  ! Monitor.START(Utils.getTimestamp)
     watchThread.setDaemon(true)
     watchThread.start()
@@ -74,7 +77,7 @@ class JsonReceiverActor extends Actor with ActorLogging {
   override def receive: Receive = {
     case JSON_RECEIVED(json, file) =>
       log.info(s"JSON RECEIVED => ${Json.stringify(json)}")
-      context.parent ! FeyCore.ORCHESTRATION_RECEIVED(json, file)
+      context.parent ! FeyCore.ORCHESTRATION_RECEIVED(json, Some(file))
 
     case _ =>
   }

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/000ca914/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala b/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala
index 41a6982..0935c5b 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala
@@ -49,7 +49,7 @@ sealed trait MyService extends HttpService {
         get{
           respondWithMediaType(`text/html`) {
             complete {
-              SYSTEM_ACTORS.fey ! JSON_TREE
+              FEY_CORE_ACTOR.actorRef ! JSON_TREE
               Thread.sleep(2000)
               val json = IdentifyFeyActors.generateTreeJson()
               IdentifyFeyActors.getHTMLTree(json)

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/000ca914/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala
index b5a2ca9..c616821 100644
--- a/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala
+++ b/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala
@@ -62,7 +62,7 @@ class FeyCoreSpec extends BaseAkkaSpec  {
   val orchestration_name = "TEST-ACTOR"
 
   "Sending FeyCore.ORCHESTRATION_RECEIVED with CREATE command to FeyCore" should {
-    feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.create_json_test),
new File("/tmp/fey/test/json"))
+    feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.create_json_test),
None)
     s"result in creating an Orchestration child actor with the name '$orchestration_name'"
in {
       orchestrationref = TestProbe().expectActor(s"$feyPath/$orchestration_name")
     }
@@ -85,14 +85,14 @@ class FeyCoreSpec extends BaseAkkaSpec  {
 
   "Sending FeyCore.ORCHESTRATION_RECEIVED with UPDATE command to FeyCore" should {
     s"result in creating a new Performer child actor with the name '$orchestration_name/MY-ENSEMBLE-0001/TEST-0002'"
in {
-      feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.update_json_test),
new File("/tmp/fey/test/json"))
+      feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.update_json_test),
None)
       ensemble1Test2ref = TestProbe().expectActor(s"$feyPath/$orchestration_name/MY-ENSEMBLE-0001/TEST-0002")
     }
   }
 
   "Sending FeyCore.ORCHESTRATION_RECEIVED with UPDATE command and DELETE ensemble to FeyCore"
should {
     s"result in termination of Ensemble with the name '$orchestration_name/MY-ENSEMBLE-0001'"
in {
-      feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.update_delete_json_test),
new File("/tmp/fey/test/json"))
+      feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.update_delete_json_test),
None)
       TestProbe().verifyActorTermination(ensemble1ref)
     }
     s"result in termination of Performer with the name '$orchestration_name/MY-ENSEMBLE-0001/TEST-0001'"
in {
@@ -106,7 +106,7 @@ class FeyCoreSpec extends BaseAkkaSpec  {
   "Sending FeyCore.ORCHESTRATION_RECEIVED with RECREATE command and same Timestamp to FeyCore"
should {
     s"result in logging a 'not recreated' message at Warn " in {
       EventFilter.warning(pattern = s".*$orchestration_name not recreated.*", occurrences
= 1) intercept {
-        feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.recreate_timestamp_json_test),
new File("/tmp/fey/test/json"))
+        feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.recreate_timestamp_json_test),
None)
       }
     }
   }
@@ -121,7 +121,7 @@ class FeyCoreSpec extends BaseAkkaSpec  {
 
   "Sending FeyCore.ORCHESTRATION_RECEIVED with DELETE command to FeyCore" should {
     s"result in termination of Orchestration with the name '$orchestration_name'" in {
-      feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.delete_json_test),
new File("/tmp/fey/test/json"))
+      feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.delete_json_test),
None)
       TestProbe().verifyActorTermination(orchestrationref)
     }
     "result in sending TERMINATE message to Monitor actor" in {
@@ -140,7 +140,7 @@ class FeyCoreSpec extends BaseAkkaSpec  {
 
   "Sending FeyCore.STOP_EMPTY_ORCHESTRATION to FeyCore" should {
     s"result in termination of 'TEST-ORCH-2'" in {
-      feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.orchestration_test_json),
new File("/tmp/fey/test/json"))
+      feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.orchestration_test_json),
None)
       val ref = TestProbe().expectActor(s"$feyPath/TEST-ORCH-2")
       FEY_CACHE.activeOrchestrations should have size(1)
       FEY_CACHE.activeOrchestrations should contain key("TEST-ORCH-2")


Mime
View raw message