iota-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tonyfaust...@apache.org
Subject incubator-iota git commit: [IOTA-32] Fey - Define minimum number of actors when autoscaling
Date Sat, 08 Oct 2016 03:13:14 GMT
Repository: incubator-iota
Updated Branches:
  refs/heads/master 4a79b5255 -> 76cf4616f


[IOTA-32] Fey - Define minimum number of actors when autoscaling


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/76cf4616
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/76cf4616
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/76cf4616

Branch: refs/heads/master
Commit: 76cf4616fa111ed5b1c9321984a0555a70edec0d
Parents: 4a79b52
Author: Barbara Gomes <barbaramaltagomes@gmail.com>
Authored: Thu Oct 6 16:35:09 2016 -0700
Committer: Barbara Gomes <barbaramaltagomes@gmail.com>
Committed: Thu Oct 6 16:35:09 2016 -0700

----------------------------------------------------------------------
 .../resources/fey-json-schema-validator.json    | 22 +++++-
 .../scala/org/apache/iota/fey/Ensemble.scala    | 72 +++++++++++++-------
 .../main/scala/org/apache/iota/fey/Utils.scala  |  4 ++
 .../org/apache/iota/fey/EnsembleSpec.scala      | 20 ++++--
 .../org/apache/iota/fey/Utils_JSONTest.scala    |  5 +-
 5 files changed, 93 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/76cf4616/fey-core/src/main/resources/fey-json-schema-validator.json
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/fey-json-schema-validator.json b/fey-core/src/main/resources/fey-json-schema-validator.json
index 579fbca..79952dd 100644
--- a/fey-core/src/main/resources/fey-json-schema-validator.json
+++ b/fey-core/src/main/resources/fey-json-schema-validator.json
@@ -36,8 +36,26 @@
             "type": "string"
           },
           "autoScale":{
-            "type":"integer",
-            "minimum":0
+            "type": "object",
+            "lowerBound" : {
+              "type":"integer",
+              "minimum":1
+            },
+            "upperBound" : {
+              "type":"integer",
+              "minimum":1
+            },
+            "backoffThreshold" :{
+              "type":"number",
+              "minimum":0.0
+            },
+            "roundRobin":{
+              "type": "boolean"
+            },
+            "required":[
+              "lowerBound",
+              "upperBound"
+            ]
           },
           "schedule":{
             "type":"integer",

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/76cf4616/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index afa39f2..a606217 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -19,7 +19,7 @@ package org.apache.iota.fey
 
 import akka.actor.SupervisorStrategy.Restart
 import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, PoisonPill, Props, Terminated}
-import akka.routing.{ActorRefRoutee, DefaultResizer, GetRoutees, SmallestMailboxPool}
+import akka.routing._
 import org.apache.iota.fey.JSON_PATH._
 import play.api.libs.json.JsObject
 
@@ -171,13 +171,21 @@ protected class Ensemble(val orchestrationID: String,
 
         var actor:ActorRef = null
         val actorProps = getPerformer(performerInfo, connections)
-        if(performerInfo.autoScale > 0) {
+        if(performerInfo.autoScale) {
 
-          val resizer = DefaultResizer(lowerBound = 1, upperBound = performerInfo.autoScale,
-            messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = 0.4)
-          val smallestMailBox = SmallestMailboxPool(1, Some(resizer))
+          val resizer = DefaultResizer(lowerBound = performerInfo.lowerBound, upperBound
= performerInfo.upperBound,
+            messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = performerInfo.backoffThreshold,
backoffRate = 0.1)
 
-          actor = context.actorOf(smallestMailBox.props(actorProps), name = performerID)
+          val strategy =
+            if(performerInfo.isRoundRobin) {
+               log.info(s"Using Round Robin for performer ${performerID}")
+               RoundRobinPool(1, Some(resizer))
+             } else {
+               log.info(s"Using Smallest mailbox for performer ${performerID}")
+               SmallestMailboxPool(1, Some(resizer))
+             }
+
+          actor = context.actorOf(strategy.props(actorProps), name = performerID)
 
         }else{
           actor = context.actorOf(actorProps, name = performerID)
@@ -204,11 +212,10 @@ protected class Ensemble(val orchestrationID: String,
 
     val clazz = loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}",
performerInfo.jarName)
 
-    val autoScale = if(performerInfo.autoScale > 0) true else false
     val dispatcher = if(performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}"
else ""
 
     val actorProps = Props(clazz,
-      performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule,
orchestrationName, orchestrationID, autoScale)
+      performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule,
orchestrationName, orchestrationID, performerInfo.autoScale)
 
     // dispatcher has higher priority than controlAware. That means that if both are defined
     // then the custom dispatcher will be used
@@ -282,7 +289,19 @@ object Ensemble {
       val id: String= (performer \ GUID).as[String]
       val schedule: Int = (performer \ SCHEDULE).as[Int]
       val backoff: Int = (performer \ BACKOFF).as[Int]
-      val autoScale: Int = if (performer.keys.contains(PERFORMER_AUTO_SCALE)) (performer
\ PERFORMER_AUTO_SCALE).as[Int] else 0
+
+      val autoScale: Boolean = if (performer.keys.contains(PERFORMER_AUTO_SCALE)) true else
false
+      val lowerBound: Int = if (autoScale) (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_LOWER_BOUND).as[Int]
else 0
+      val upperBound: Int = if (autoScale) (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_UPPER_BOUND).as[Int]
else 0
+      if(lowerBound > upperBound){
+        throw new IllegalArgumentException(" Could not define performer. Autoscale param:
Lower bound greater than upper bound")
+      }
+      val threshold: Double = if (autoScale && (performer \ PERFORMER_AUTO_SCALE).as[JsObject].keys.contains(PERFORMER_BACKOFF_THRESHOLD))
+        (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_BACKOFF_THRESHOLD).as[Double] else
0.3
+      val roundRobin: Boolean = if (autoScale && (performer \ PERFORMER_AUTO_SCALE).as[JsObject].keys.contains(PERFORMER_ROUND_ROBIN))
+        (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_ROUND_ROBIN).as[Boolean] else false
+
+
       val jarName: String = (performer \ SOURCE \ SOURCE_NAME).as[String]
       val classPath: String = (performer \ SOURCE \ SOURCE_CLASSPATH).as[String]
       val params:Map[String,String] = getMapOfParams((performer \ SOURCE \ SOURCE_PARAMS).as[JsObject])
@@ -290,7 +309,8 @@ object Ensemble {
       val location: String = if ( (performer \ SOURCE).as[JsObject].keys.contains(JAR_LOCATION)
) CONFIG.DYNAMIC_JAR_REPO else CONFIG.JAR_REPOSITORY
       val dispatcher: String = if (performer.keys.contains(PERFORMER_DISPATCHER)) (performer
\ PERFORMER_DISPATCHER).as[String] else ""
 
-      (id, new Performer(id, jarName, classPath, params, schedule.millisecond, backoff.millisecond,
autoScale,controlAware, location, dispatcher))
+      (id, new Performer(id, jarName, classPath, params, schedule.millisecond, backoff.millisecond,
+        autoScale, lowerBound, upperBound,threshold, roundRobin,controlAware, location, dispatcher))
     }).toMap
   }
 
@@ -316,18 +336,24 @@ object Ensemble {
 /**
   * Holds the performer information
   *
-  * @param uid performer uid
-  * @param jarName performer jar name
-  * @param classPath performer class path
-  * @param parameters performer params
-  * @param schedule performer schedule interval
-  * @param backoff performer backoff interval
-  * @param autoScale if actor was started as a router and can autoscala
-  * @param controlAware if the actor uses a controlAware mailbox
-  * @param jarLocation download jar
-  * @param dispatcher Akka dispatcher that the actor is using
+  * @param uid
+  * @param jarName
+  * @param classPath
+  * @param parameters
+  * @param schedule
+  * @param backoff
+  * @param autoScale
+  * @param lowerBound
+  * @param upperBound
+  * @param backoffThreshold
+  * @param isRoundRobin
+  * @param controlAware
+  * @param jarLocation
+  * @param dispatcher
   */
 case class Performer(uid: String, jarName: String,
-                classPath: String, parameters: Map[String,String],
-                schedule: FiniteDuration, backoff: FiniteDuration,
-                autoScale: Int, controlAware: Boolean, jarLocation: String, dispatcher: String)
+                     classPath: String, parameters: Map[String,String],
+                     schedule: FiniteDuration, backoff: FiniteDuration,
+                     autoScale: Boolean, lowerBound: Int, upperBound: Int,
+                     backoffThreshold: Double, isRoundRobin: Boolean, controlAware: Boolean,
+                     jarLocation: String, dispatcher: String)

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/76cf4616/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index d4f3210..14c2a0c 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -190,6 +190,10 @@ object JSON_PATH{
   val ORCHESTRATION_NAME = "name"
   val ORCHESTRATION_TIMESTAMP = "timestamp"
   val PERFORMER_AUTO_SCALE = "autoScale"
+  val PERFORMER_LOWER_BOUND = "lowerBound"
+  val PERFORMER_UPPER_BOUND = "upperBound"
+  val PERFORMER_BACKOFF_THRESHOLD = "backoffThreshold"
+  val PERFORMER_ROUND_ROBIN = "roundRobin"
   val CONTROL_AWARE = "controlAware"
   val JAR_LOCATION = "location"
   val JAR_LOCATION_URL = "url"

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/76cf4616/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
index e7e3554..ae7ce50 100644
--- a/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
+++ b/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
@@ -68,7 +68,11 @@ class EnsembleSpec extends BaseAkkaSpec{
       performer.controlAware should equal(false)
       performer.jarName should equal((performerSpec \ SOURCE \ SOURCE_NAME).as[String])
       performer.jarLocation should equal(CONFIG.JAR_REPOSITORY)
-      performer.autoScale should equal(0)
+      performer.autoScale should equal(false)
+      performer.lowerBound should equal(0)
+      performer.upperBound should equal(0)
+      performer.isRoundRobin should equal(false)
+      performer.backoffThreshold should equal(0.3)
       performer.backoff should equal((performerSpec \ BACKOFF).as[Int].millisecond)
       performer.classPath should equal((performerSpec \ SOURCE \ SOURCE_CLASSPATH).as[String])
       performer.uid should equal((performerSpec \ GUID).as[String])
@@ -268,6 +272,9 @@ class EnsembleSpec extends BaseAkkaSpec{
   var backScheduleRef: ActorRef = _
   val backprocessParamsTB = TestProbe("BACKOFF")
   val routee = """$a"""
+  val routee2 = """$b"""
+  val routee3 = """$c"""
+  val routee4 = """$d"""
 
   s"creating Ensemble with Backoff performer" should {
     s"result in creation of Ensemble actor " in {
@@ -286,8 +293,10 @@ class EnsembleSpec extends BaseAkkaSpec{
     s"create 'PERFORMER-PARAMS' with backoff time equal to 1 second" in{
       backEnsembleState.performers_metadata.get("PERFORMER-PARAMS").get.backoff should  equal(1000.millisecond)
     }
-    s"create 'PERFORMER-SCHEDUKE' with autoScale equal to true" in{
-      backEnsembleState.performers_metadata.get("PERFORMER-SCHEDULER").get.autoScale should
 equal(2)
+    s"create 'PERFORMER-SCHEDULER' with autoScale equal to true" in {
+      backEnsembleState.performers_metadata.get("PERFORMER-SCHEDULER").get.autoScale should
equal(true)
+      backEnsembleState.performers_metadata.get("PERFORMER-SCHEDULER").get.lowerBound should
equal(4)
+      backEnsembleState.performers_metadata.get("PERFORMER-SCHEDULER").get.upperBound should
equal(6)
     }
   }
   s"Performer with backoff enabled" should {
@@ -303,11 +312,14 @@ class EnsembleSpec extends BaseAkkaSpec{
     "result in router and routees created" in {
       globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
       Thread.sleep(500)
-      IdentifyFeyActors.actorsPath should have size(4)
+      IdentifyFeyActors.actorsPath should have size(7)
       IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005")
       IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-PARAMS")
       IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER")
       IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee")
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee2")
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee3")
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee4")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/76cf4616/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala b/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala
index 70d3ddc..106a406 100644
--- a/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala
+++ b/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala
@@ -333,7 +333,10 @@ object Utils_JSONTest {
            "guid": "PERFORMER-SCHEDULER",
            "schedule": 200,
            "backoff": 0,
-           "autoScale": 2,
+           "autoScale": {
+            "lowerBound" : 4,
+            "upperBound" : 6
+           },
            "source": {
              "name": "fey-test-actor.jar",
              "classPath": "org.apache.iota.fey.TestActor",


Mime
View raw message