iota-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tonyfaust...@apache.org
Subject [1/2] incubator-iota git commit: [IOTA-24] Offer control-aware mailbox to Performers
Date Fri, 08 Jul 2016 01:05:30 GMT
Repository: incubator-iota
Updated Branches:
  refs/heads/master df7428188 -> 40f6adb25


[IOTA-24] Offer control-aware mailbox to Performers

Adding JSON configuration so the user can ask Fey to start the Performer using a control-aware
mailbox


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/9126d531
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/9126d531
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/9126d531

Branch: refs/heads/master
Commit: 9126d5318c5b862e9b0cefd9d84c101706e2ad01
Parents: df74281
Author: Barbara Gomes <barbaramaltagomes@gmail.com>
Authored: Wed Jul 6 14:43:01 2016 -0700
Committer: Barbara Gomes <barbaramaltagomes@gmail.com>
Committed: Wed Jul 6 14:43:01 2016 -0700

----------------------------------------------------------------------
 fey-core/src/main/resources/application.conf    |  6 +++
 .../resources/fey-json-schema-validator.json    |  3 ++
 .../scala/org/apache/iota/fey/Ensemble.scala    | 41 +++++++++++++++-----
 .../main/scala/org/apache/iota/fey/Utils.scala  |  2 +
 4 files changed, 42 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/9126d531/fey-core/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/application.conf b/fey-core/src/main/resources/application.conf
index 28992f4..26fa9a9 100644
--- a/fey-core/src/main/resources/application.conf
+++ b/fey-core/src/main/resources/application.conf
@@ -63,4 +63,10 @@ akka {
   loggers = ["akka.event.slf4j.Slf4jLogger"]
   loglevel = "DEBUG"
   logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+
+  fey-dispatchers {
+    control-aware-dispatcher {
+      mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/9126d531/fey-core/src/main/resources/fey-json-schema-validator.json
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/fey-json-schema-validator.json b/fey-core/src/main/resources/fey-json-schema-validator.json
index 83dca5e..8f12b24 100644
--- a/fey-core/src/main/resources/fey-json-schema-validator.json
+++ b/fey-core/src/main/resources/fey-json-schema-validator.json
@@ -29,6 +29,9 @@
           "guid":{
             "type":"string"
           },
+          "controlAware":{
+            "type":"boolean"
+          },
           "autoScale":{
             "type":"integer",
             "minimum":0

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/9126d531/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index 87fbe7f..73d1b59 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -158,20 +158,17 @@ protected class Ensemble(val orchestrationID: String,
           createFeyActor(connID, connectors.getOrElse(connID,Array.empty),tmpActors)
         }).toMap
 
-        val clazz = loadClazzFromJar(performerInfo.classPath, performerInfo.jarName)
 
         var actor:ActorRef = null
+        val actorProps = getPerformer(performerInfo, connections)
         if(performerInfo.autoScale > 0) {
           val resizer = DefaultResizer(lowerBound = 1, upperBound = performerInfo.autoScale,
messagesPerResize = 200, backoffThreshold = 0.4)
           val smallestMailBox = SmallestMailboxPool(1, Some(resizer))
-          actor = context.actorOf(
-            smallestMailBox.props(Props(clazz,
-              performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule,
orchestrationName, orchestrationID, true)),
-            name = performerID)
+
+          actor = context.actorOf(smallestMailBox.props(actorProps), name = performerID)
+
         }else{
-          actor = context.actorOf(Props(clazz, performerInfo.parameters,
-            performerInfo.backoff, connections, performerInfo.schedule, orchestrationName,
orchestrationID, false),
-            name = performerID)
+          actor = context.actorOf(actorProps, name = performerID)
         }
 
         context.watch(actor)
@@ -186,6 +183,29 @@ protected class Ensemble(val orchestrationID: String,
   }
 
   /**
+    * Creates actor props based on JSON configuration
+    * @param performerInfo Performer object
+    * @param connections connections
+    * @return Props of actor based on JSON config
+    */
+  private def getPerformer(performerInfo: Performer, connections: Map[String, ActorRef]):
Props = {
+
+    val clazz = loadClazzFromJar(performerInfo.classPath, performerInfo.jarName)
+
+    val autoScale = if(performerInfo.autoScale > 0) true else false
+
+    val actorProps = Props(clazz,
+      performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule,
orchestrationName, orchestrationID, autoScale)
+
+    if(performerInfo.controlAware){
+      actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX)
+    }else{
+      actorProps
+    }
+
+  }
+
+  /**
     * Load a clazz instance of FeyGenericActor from a jar
     *
     * @param classPath class path
@@ -248,7 +268,8 @@ object Ensemble {
       val jarName: String = (performer \ SOURCE \ SOURCE_NAME).as[String]
       val classPath: String = (performer \ SOURCE \ SOURCE_CLASSPATH).as[String]
       val params:Map[String,String] = getMapOfParams((performer \ SOURCE \ SOURCE_PARAMS).as[JsObject])
-      (id, new Performer(id, jarName, classPath,params,schedule.millisecond,backoff.millisecond,
autoScale))
+      val controlAware:Boolean = if (performer.keys.contains(CONTROL_AWARE)) (performer \
CONTROL_AWARE).as[Boolean] else false
+      (id, new Performer(id, jarName, classPath,params,schedule.millisecond,backoff.millisecond,
autoScale,controlAware))
     }).toMap
   }
 
@@ -284,4 +305,4 @@ object Ensemble {
 case class Performer(uid: String, jarName: String,
                 classPath: String, parameters: Map[String,String],
                 schedule: FiniteDuration, backoff: FiniteDuration,
-                autoScale: Int)
+                autoScale: Int, controlAware: Boolean)

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/9126d531/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index dab47bd..bdc420c 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -175,6 +175,7 @@ object JSON_PATH{
   val ORCHESTRATION_NAME = "name"
   val ORCHESTRATION_TIMESTAMP = "timestamp"
   val PERFORMER_AUTO_SCALE = "autoScale"
+  val CONTROL_AWARE = "controlAware"
 }
 
 object CONFIG{
@@ -183,6 +184,7 @@ object CONFIG{
 
   val FILE_APPENDER = "FEY-FILE"
   val CONSOLE_APPENDER = "FEY-CONSOLE"
+  val CONTROL_AWARE_MAILBOX = "akka.fey-dispatchers.control-aware-dispatcher"
 
   var CHECKPOINT_DIR = ""
   var JSON_REPOSITORY = ""


Mime
View raw message