iota-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tonyfaust...@apache.org
Subject [02/10] incubator-iota git commit: FeyGenericActor Tests
Date Thu, 21 Jul 2016 23:37:53 GMT
FeyGenericActor Tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/c96d829f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/c96d829f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/c96d829f

Branch: refs/heads/master
Commit: c96d829f98bd8e40e784261c92aeba856f1e1588
Parents: 7c65386
Author: Barbara Gomes <barbaramaltagomes@gmail.com>
Authored: Tue Jul 19 10:52:19 2016 -0700
Committer: Barbara Gomes <barbaramaltagomes@gmail.com>
Committed: Tue Jul 19 10:52:19 2016 -0700

----------------------------------------------------------------------
 .../org/apache/iota/fey/FeyGenericActor.scala   |  21 ++-
 .../org/apache/iota/fey/EnsembleSpec.scala      |  24 +--
 .../apache/iota/fey/FeyGenericActorSpec.scala   | 188 +++++++++++++++++++
 .../apache/iota/fey/FeyGenericActorTest.scala   |  68 +++++++
 .../org/apache/iota/fey/OrchestrationSpec.scala |   2 +-
 5 files changed, 290 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/c96d829f/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
index f1c94e1..9a6fb78 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
@@ -51,7 +51,7 @@ abstract class FeyGenericActor(val params: Map[String,String] = Map.empty,
     */
   @volatile private var scheduler: Cancellable = null
   @volatile private var endBackoff: Long = 0
-  private val monitoring_actor = FEY_MONITOR.actorRef
+  private[fey] val monitoring_actor = FEY_MONITOR.actorRef
 
   override final def receive: Receive = {
 
@@ -140,6 +140,25 @@ abstract class FeyGenericActor(val params: Map[String,String] = Map.empty,
   }
 
   /**
+    * Check state of scheduler
+    * @return true if scheduller is running
+    */
+  final def isShedulerRunning():Boolean = {
+    if(scheduler != null && !scheduler.isCancelled){
+      true
+    }else{
+      false
+    }
+  }
+
+  /**
+    * get endBackoff
+    * @return
+    */
+  final def getEndBackoff(): Long = {
+    endBackoff
+  }
+  /**
     * Called by the scheduler.
     */
   def execute(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/c96d829f/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
index 0b3ea56..f968804 100644
--- a/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
+++ b/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
@@ -37,7 +37,7 @@ class EnsembleSpec extends BaseAkkaSpec{
     override val monitoring_actor = monitor.ref
   }), parent.ref, (ensembleJson \ JSON_PATH.GUID).as[String])
 
-  val ensembleState = ensembleRef.underlyingActor
+  var ensembleState = ensembleRef.underlyingActor
 
   var simplePerformerRef: ActorRef = _
 
@@ -93,7 +93,7 @@ class EnsembleSpec extends BaseAkkaSpec{
     }
     s"result in Performer 'TEST-0004' restarted" in {
       val newPerformer = TestProbe().expectActor(s"${parent.ref.path}/${(ensembleJson \ JSON_PATH.GUID).as[String]}/TEST-0004")
-      newPerformer.compareTo(simplePerformerRef) should not be(0)
+      newPerformer should not equal(simplePerformerRef)
     }
     "result in two paths added to IdentifyFeyActors.actorsPath" in{
       globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
@@ -213,13 +213,14 @@ class EnsembleSpec extends BaseAkkaSpec{
       monitor.expectMsgClass(classOf[Monitor.STOP])
       monitor.expectMsgClass(classOf[Monitor.RESTART])
       monitor.expectMsgClass(classOf[Monitor.START])
+      ensembleState = ensembleRef.underlyingActor
     }
     "keep ensemble actorRef when restarted" in {
-      TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}").compareTo(advEnsembleRef)
should be(0)
+      TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}")
should equal(advEnsembleRef)
     }
     "stop and start the performer with a new reference" in{
-      TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-SCHEDULER").compareTo(scheduleRef)
should not be(0)
-      TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-PARAMS").compareTo(paramsRef)
should not be(0)
+      TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-SCHEDULER")
should not equal(scheduleRef)
+      TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-PARAMS")
should not equal(paramsRef)
       scheduleRef = TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-SCHEDULER")
       paramsRef = TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-PARAMS")
     }
@@ -266,6 +267,7 @@ class EnsembleSpec extends BaseAkkaSpec{
   var backParamsRef: ActorRef = _
   var backScheduleRef: ActorRef = _
   val backprocessParamsTB = TestProbe("BACKOFF")
+  val routee = """$a"""
 
   s"creating Ensemble with Backoff performer" should {
     s"result in creation of Ensemble actor " in {
@@ -292,8 +294,8 @@ class EnsembleSpec extends BaseAkkaSpec{
    "not process messages during the backoff period" in{
      backScheduleRef ! ((schedulerScheduleTB.ref, system.deadLetters, generalScheduleTB.ref))
      backParamsRef ! ((system.deadLetters, backprocessParamsTB.ref, generalParamsTB.ref))
-     backprocessParamsTB.expectMsg("PROCESS - EXECUTE - akka://FEY-TEST/system/ENSEMBLE-1/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$a")
-     backprocessParamsTB.expectNoMsg(1000.milliseconds)
+     backprocessParamsTB.expectMsg(s"PROCESS - EXECUTE - ${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee")
+     backprocessParamsTB.expectNoMsg(990.milliseconds)
    }
   }
 
@@ -302,10 +304,10 @@ class EnsembleSpec extends BaseAkkaSpec{
       globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
       Thread.sleep(500)
       IdentifyFeyActors.actorsPath should have size(4)
-      IdentifyFeyActors.actorsPath should contain("akka://FEY-TEST/system/ENSEMBLE-1/MY-ENSEMBLE-0005")
-      IdentifyFeyActors.actorsPath should contain("akka://FEY-TEST/system/ENSEMBLE-1/MY-ENSEMBLE-0005/PERFORMER-PARAMS")
-      IdentifyFeyActors.actorsPath should contain("akka://FEY-TEST/system/ENSEMBLE-1/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER")
-      IdentifyFeyActors.actorsPath should contain("akka://FEY-TEST/system/ENSEMBLE-1/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$a")
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005")
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-PARAMS")
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER")
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/c96d829f/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorSpec.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorSpec.scala
new file mode 100644
index 0000000..3b5a117
--- /dev/null
+++ b/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorSpec.scala
@@ -0,0 +1,188 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import akka.actor.{ActorRef, Props}
+import akka.testkit.{EventFilter, TestActorRef, TestProbe}
+
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
+
+class FeyGenericActorSpec extends BaseAkkaSpec{
+
+  val parent = TestProbe("GENERIC-PARENT")
+  val monitor = TestProbe("MONITOR-GENERIC")
+  val connectTB = TestProbe("CONNECT_TO")
+  val genericRef: TestActorRef[FeyGenericActorTest] = TestActorRef[FeyGenericActorTest](
Props(new FeyGenericActorTest(Map.empty, 1.seconds,
+    Map("CONNECTED" -> connectTB.ref),300.milliseconds,"MY-ORCH", "MY-ORCH", false){
+    override private[fey] val monitoring_actor = monitor.ref
+  }),parent.ref, "GENERIC-TEST")
+  var genericState:FeyGenericActorTest = genericRef.underlyingActor
+  val path =  genericRef.path.toString
+  var latestBackoff: Long = 0
+
+  "Creating a GenericActor with Schedule time defined" should {
+    "result in scheduler started" in{
+      genericState.isShedulerRunning() should be(true)
+    }
+    "result in onStart method called" in {
+      genericState.started should be(true)
+    }
+    "result in START message sent to Monitor" in{
+      monitor.expectMsgClass(classOf[Monitor.START])
+    }
+    "result in one active actor" in {
+      globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
+      Thread.sleep(500)
+      IdentifyFeyActors.actorsPath should have size(1)
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/GENERIC-TEST")
+    }
+  }
+
+  "Backoff of GenericActor" should {
+    "be zero until the first PROCESS message" in {
+      genericState.getEndBackoff() should equal(0)
+    }
+    "change when first PROCESS message was received" in{
+      genericRef ! FeyGenericActor.PROCESS("TEST1")
+      latestBackoff = genericState.getEndBackoff()
+      latestBackoff should not equal(0)
+      connectTB.expectMsgClass(classOf[FeyGenericActor.PROCESS[String]])
+    }
+  }
+
+  "Sending PROCESS message to GenericActor" should{
+    "call processMessage method" in{
+      genericState.processed should be(true)
+    }
+  }
+
+  "customReceive method" should {
+    "process any non treated message" in{
+      genericRef ! "TEST_CUSTOM"
+      genericState.count should equal(1)
+    }
+  }
+
+  "Sending PROCESS message to GenericActor" should {
+    "be discarded when backoff is enabled" in{
+      genericRef ! FeyGenericActor.PROCESS("DISCARDED")
+      latestBackoff should equal(genericState.getEndBackoff())
+      genericRef ! FeyGenericActor.PROCESS("DISCARDED2")
+      connectTB.expectNoMsg(100.milliseconds)
+    }
+    "be processed when backoff has finished" in{
+      while(latestBackoff >= System.nanoTime()){}
+      genericRef ! FeyGenericActor.PROCESS("SHOULD BE PROCESSED")
+      connectTB.expectMsgClass(classOf[FeyGenericActor.PROCESS[String]])
+      latestBackoff = genericState.getEndBackoff()
+    }
+  }
+
+  "Calling startBackoff" should {
+    "set endBackoff with time now" in {
+      genericState.getEndBackoff() > latestBackoff
+    }
+  }
+
+  "Calling propagateMessage" should {
+    "send message to connectTo actors" in {
+      genericState.propagateMessage("PROPAGATE")
+      connectTB.expectMsgClass(classOf[FeyGenericActor.PROCESS[String]])
+    }
+  }
+
+  "Scheduler component" should{
+    "call execute() method" in {
+      genericState.executing should be(true)
+    }
+  }
+
+  "Sending EXCEPTION(IllegalArgumentException) message to GenericActor" should{
+    "Throw IllegalArgumentException" in{
+      EventFilter[IllegalArgumentException](occurrences = 1) intercept {
+        genericRef ! FeyGenericActor.EXCEPTION(new IllegalArgumentException("Testing"))
+      }
+    }
+    "Result in restart of the actor with sequence of Monitoring: STOP -> RESTART ->
START" in{
+      monitor.expectMsgClass(classOf[Monitor.STOP])
+      monitor.expectMsgClass(classOf[Monitor.RESTART])
+      //Restart does not change the actorRef but does change the object inside the ActorReference
+      genericState = genericRef.underlyingActor
+      monitor.expectMsgClass(classOf[Monitor.START])
+    }
+    "call onStart method" in {
+      genericState.started should be(true)
+    }
+    "call onRestart method" in {
+      Thread.sleep(100)
+      genericState.restarted should be(true)
+    }
+    "restart scheduler" in {
+      genericState.isShedulerRunning() should be(true)
+    }
+  }
+
+  "Sending STOP to GenericActor" should{
+    "terminate GenericActor" in{
+      genericRef ! FeyGenericActor.STOP
+      TestProbe().verifyActorTermination(genericRef)
+      TestProbe().notExpectActor(path)
+    }
+    "call onStop method" in {
+      genericState.stopped should be(true)
+    }
+    "cancel scheduler" in{
+      genericState.isShedulerRunning() should be(false)
+    }
+    "send STOP - TERMINATE message to Monitor" in{
+      monitor.expectMsgClass(classOf[Monitor.STOP])
+    }
+    "result in no active actors" in {
+      globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
+      Thread.sleep(500)
+      IdentifyFeyActors.actorsPath shouldBe empty
+    }
+  }
+
+
+  val connectTB2 = TestProbe("CONNECT2_TO")
+  var generic2Ref: TestActorRef[FeyGenericActorTest] = _
+  var generic2State:FeyGenericActorTest = _
+
+  "Creating GenericActor with schedule anc backoff equal to zero" should{
+    "not start a scheduler" in{
+      generic2Ref = TestActorRef[FeyGenericActorTest]( Props(new FeyGenericActorTest(Map.empty,
0.seconds,
+        Map("CONNECTED" -> connectTB2.ref),0.milliseconds,"MY-ORCH-2", "MY-ORCH-2", false)),parent.ref,
"GENERIC-TEST-2")
+      generic2State = generic2Ref.underlyingActor
+      generic2State.isShedulerRunning() should be(false)
+    }
+    "result in one active actor" in {
+      globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
+      Thread.sleep(500)
+      IdentifyFeyActors.actorsPath should have size(1)
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/GENERIC-TEST-2")
+    }
+    "result in no discarded PROCESS messages" in {
+      generic2Ref ! FeyGenericActor.PROCESS("NOT-DISCARDED")
+      connectTB2.expectMsgClass(classOf[FeyGenericActor.PROCESS[String]])
+      generic2Ref ! FeyGenericActor.PROCESS("NOT-DISCARDED2")
+      connectTB2.expectMsgClass(classOf[FeyGenericActor.PROCESS[String]])
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/c96d829f/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorTest.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorTest.scala b/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorTest.scala
new file mode 100644
index 0000000..4e248ec
--- /dev/null
+++ b/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorTest.scala
@@ -0,0 +1,68 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import akka.actor.ActorRef
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
+import scala.concurrent.duration.FiniteDuration
+
+class FeyGenericActorTest(override val params: Map[String,String] = Map.empty,
+               override val backoff: FiniteDuration = 1.minutes,
+               override val connectTo: Map[String,ActorRef] = Map.empty,
+               override val schedulerTimeInterval: FiniteDuration = 2.seconds,
+               override val orchestrationName: String = "",
+               override val orchestrationID: String = "",
+               override val autoScale: Boolean = false) extends FeyGenericActor {
+
+  var count = 0
+  var started = false
+  var processed = false
+  var executing = false
+  var stopped = false
+  var restarted = false
+
+  override def onStart(): Unit = {
+    started = true
+  }
+
+  override def processMessage[T](message: T, sender: ActorRef): Unit = {
+    processed = true
+    log.info(s"Processing message ${message.toString}")
+    propagateMessage(s"PROPAGATING FROM ${self.path.name} - Message: ${message.toString}")
+    startBackoff()
+  }
+
+  override def execute(): Unit = {
+    log.info(s"Executing action in ${self.path.name}")
+    executing = true
+  }
+
+  override def customReceive: Receive = {
+    case "TEST_CUSTOM" => count+=1
+  }
+
+  override def onStop(): Unit = {
+    log.info(s"Actor ${self.path.name} stopped.")
+    stopped = true
+  }
+
+  override def onRestart(reason: Throwable): Unit = {
+    restarted = true
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/c96d829f/fey-core/src/test/scala/org/apache/iota/fey/OrchestrationSpec.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/OrchestrationSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/OrchestrationSpec.scala
index 8719f36..fff4bab 100644
--- a/fey-core/src/test/scala/org/apache/iota/fey/OrchestrationSpec.scala
+++ b/fey-core/src/test/scala/org/apache/iota/fey/OrchestrationSpec.scala
@@ -68,7 +68,7 @@ class OrchestrationSpec extends BaseAkkaSpec{
       TestProbe().expectActor(s"${orchRef.path}/${(ensemble2 \ JSON_PATH.GUID).as[String]}/TEST-0001")
       TestProbe().expectActor(s"${orchRef.path}/${(ensemble1 \ JSON_PATH.GUID).as[String]}/TEST-0001")
     }
-    s"result in two entries in Orchestration.ensembles matching the craeted ensembles" in
{
+    s"result in two entries in Orchestration.ensembles matching the created ensembles" in
{
       orchState.ensembles should have size(2)
       orchState.ensembles should contain key((ensemble1 \ JSON_PATH.GUID).as[String])
       orchState.ensembles should contain key((ensemble2 \ JSON_PATH.GUID).as[String])


Mime
View raw message