iota-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tonyfaust...@apache.org
Subject [07/12] incubator-iota git commit: [IOTA-36] - First version of global performers. In this version, global performers can not connect to any other performer.
Date Tue, 31 Jan 2017 22:29:10 GMT
[IOTA-36] - First version of global performers. In this version, global performers can not
connect to any other performer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/580e3010
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/580e3010
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/580e3010

Branch: refs/heads/master
Commit: 580e30103d72d2448426a120eb6a39b544661e68
Parents: 7aae205
Author: Barbara Gomes <barbaramaltagomes@gmail.com>
Authored: Mon Jan 30 16:03:39 2017 -0800
Committer: Barbara Gomes <barbaramaltagomes@gmail.com>
Committed: Mon Jan 30 16:03:39 2017 -0800

----------------------------------------------------------------------
 .../resources/fey-json-schema-validator.json    |  95 ++++++++++
 .../scala/org/apache/iota/fey/Ensemble.scala    |  25 ++-
 .../scala/org/apache/iota/fey/FeyCore.scala     |  34 ++--
 .../org/apache/iota/fey/GlobalPerformer.scala   | 186 +++++++++++++++++++
 .../org/apache/iota/fey/Orchestration.scala     |  53 +++++-
 .../main/scala/org/apache/iota/fey/Utils.scala  |   3 +
 6 files changed, 372 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/resources/fey-json-schema-validator.json
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/fey-json-schema-validator.json b/fey-core/src/main/resources/fey-json-schema-validator.json
index 79952dd..a86c09f 100644
--- a/fey-core/src/main/resources/fey-json-schema-validator.json
+++ b/fey-core/src/main/resources/fey-json-schema-validator.json
@@ -13,6 +13,101 @@
   "name":{
     "type":"string"
   },
+  "global-performers":{
+    "type":"array",
+    "items": {
+      "guid": {
+        "type": "string"
+      },
+      "controlAware": {
+        "type": "boolean"
+      },
+      "dispatcher": {
+        "type": "string"
+      },
+      "autoScale": {
+        "type": "object",
+        "lowerBound": {
+          "type": "integer",
+          "minimum": 1
+        },
+        "upperBound": {
+          "type": "integer",
+          "minimum": 1
+        },
+        "backoffThreshold": {
+          "type": "number",
+          "minimum": 0.0
+        },
+        "roundRobin": {
+          "type": "boolean"
+        },
+        "required": [
+          "lowerBound",
+          "upperBound"
+        ]
+      },
+      "schedule": {
+        "type": "integer",
+        "minimum": 0
+      },
+      "backoff": {
+        "type": "integer",
+        "minimum": 0
+      },
+      "source": {
+        "type": "object",
+        "name": {
+          "type": "string",
+          "pattern": "\\w+(\\.jar)"
+        },
+        "location": {
+          "type": "object",
+          "url": {
+            "type": "string",
+            "pattern": "(?i)(^(http|https|file)):\/\/"
+          },
+          "credentials": {
+            "user": {
+              "type": "string"
+            },
+            "password": {
+              "type": "string"
+            },
+            "required": [
+              "user",
+              "password"
+            ]
+          },
+          "required": [
+            "url"
+          ]
+        },
+        "classPath": {
+          "type": "string",
+          "pattern": "\\w+"
+        },
+        "parameters": {
+          "patternProperties": {
+            ".*": {
+              "type": "string"
+            }
+          }
+        },
+        "required": [
+          "name",
+          "classPath",
+          "parameters"
+        ]
+      },
+      "required":[
+        "guid",
+        "schedule",
+        "backoff",
+        "source"
+      ]
+    }
+  },
   "ensembles":{
     "type":"array",
     "items":{

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index 4db5f98..15b6ead 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -162,7 +162,16 @@ protected class Ensemble(val orchestrationID: String,
     * @return (performerID, ActorRef of the performer)
     */
   private def createFeyActor(performerID: String, connectionIDs: Array[String], tmpActors:HashMap[String,
ActorRef]):(String, ActorRef) = {
-    if(!tmpActors.contains(performerID)){
+    // Performer is a global performer and is already created
+    if(GlobalPerformer.activeGlobalPerformers.contains(orchestrationID)
+      && GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.contains(performerID)){
+      (performerID, GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.get(performerID).get)
+    }
+      // performer was already created
+    else if(tmpActors.contains(performerID)){
+      (performerID, tmpActors.get(performerID).get)
+    }
+    else{
       val performerInfo = performers_metadata.get(performerID)
       if (performerInfo.isDefined) {
         val connections: Map[String, ActorRef] = connectionIDs.map(connID => {
@@ -178,12 +187,12 @@ protected class Ensemble(val orchestrationID: String,
 
           val strategy =
             if(performerInfo.get.isRoundRobin) {
-               log.info(s"Using Round Robin for performer ${performerID}")
-               RoundRobinPool(1, Some(resizer))
-             } else {
-               log.info(s"Using Smallest mailbox for performer ${performerID}")
-               SmallestMailboxPool(1, Some(resizer))
-             }
+              log.info(s"Using Round Robin for performer ${performerID}")
+              RoundRobinPool(1, Some(resizer))
+            } else {
+              log.info(s"Using Smallest mailbox for performer ${performerID}")
+              SmallestMailboxPool(1, Some(resizer))
+            }
 
           actor = context.actorOf(strategy.props(actorProps), name = performerID)
 
@@ -197,8 +206,6 @@ protected class Ensemble(val orchestrationID: String,
       }else{
         throw new IllegalPerformerCreation(s"Performer $performerID is not defined in the
JSON")
       }
-    }else{
-      (performerID, tmpActors.get(performerID).get)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
index 9c2b61d..99dc7c4 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
@@ -24,7 +24,7 @@ import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, PoisonPill,
 import akka.routing.GetRoutees
 import com.eclipsesource.schema._
 import org.apache.iota.fey.JSON_PATH._
-import org.apache.iota.fey.Orchestration.{CREATE_ENSEMBLES, DELETE_ENSEMBLES, UPDATE_ENSEMBLES}
+import org.apache.iota.fey.Orchestration.{CREATE_ENSEMBLES, CREATE_GLOBAL_PERFORMERS_AND_ENSEMBLES,
DELETE_ENSEMBLES, UPDATE_ENSEMBLES}
 import org.apache.iota.fey.Utils._
 import play.api.libs.json._
 
@@ -101,6 +101,7 @@ protected class FeyCore extends Actor with ActorLogging{
     log.info(s"TERMINATED ${actorRef.path.name}")
     FEY_CACHE.activeOrchestrations.remove(actorRef.path.name)
     ORCHESTRATION_CACHE.orchestration_metadata.remove(actorRef.path.name)
+    ORCHESTRATION_CACHE.orchestration_globals.remove(actorRef.path.name)
     if(!FEY_CACHE.orchestrationsAwaitingTermination.isEmpty) {
       checkForOrchestrationWaitingForTermination(actorRef.path.name)
     }
@@ -155,10 +156,12 @@ protected class FeyCore extends Actor with ActorLogging{
     val orchestrationCommand = (orchestrationJSON \ COMMAND).as[String].toUpperCase()
     val orchestrationTimestamp = (orchestrationJSON \ ORCHESTRATION_TIMESTAMP).as[String]
     val ensembles = (orchestrationJSON \ ENSEMBLES).as[List[JsObject]]
+    val globalPerformers = (orchestrationJSON \ GLOBAL_PERFORMERS).asOpt[List[JsObject]]
+
     orchestrationCommand match {
-      case "RECREATE" => recreateOrchestration(ensembles, orchestrationID, orchestrationName,
orchestrationTimestamp)
-      case "CREATE" => createOrchestration(ensembles, orchestrationID, orchestrationName,
orchestrationTimestamp)
-      case "UPDATE" => updateOrchestration(ensembles, orchestrationID, orchestrationName,
orchestrationTimestamp)
+      case "RECREATE" => recreateOrchestration(ensembles, orchestrationID, orchestrationName,
orchestrationTimestamp, globalPerformers)
+      case "CREATE" => createOrchestration(ensembles, orchestrationID, orchestrationName,
orchestrationTimestamp, globalPerformers)
+      case "UPDATE" => updateOrchestration(ensembles, orchestrationID, orchestrationName,
orchestrationTimestamp, globalPerformers)
       case "DELETE" => deleteOrchestration(orchestrationID,true)
       case x => throw new CommandNotRecognized(s"Command: $x")
     }
@@ -177,13 +180,13 @@ protected class FeyCore extends Actor with ActorLogging{
     * @return
     */
   private def recreateOrchestration(ensemblesSpecJson: List[JsObject], orchestrationID: String,
-                              orchestrationName: String, orchestrationTimestamp: String)
= {
+                              orchestrationName: String, orchestrationTimestamp: String,
globalPerformers:Option[List[JsObject]]) = {
     FEY_CACHE.activeOrchestrations.get(orchestrationID) match {
       case Some(orchestration) =>
         try{
           // If timestamp is greater than the last timestamp
           if(orchestration._1 != orchestrationTimestamp){
-            val orchestrationInfo = new OrchestrationInformation(ensemblesSpecJson,orchestrationID,orchestrationName,orchestrationTimestamp)
+            val orchestrationInfo = new OrchestrationInformation(ensemblesSpecJson,orchestrationID,orchestrationName,orchestrationTimestamp,
globalPerformers)
             FEY_CACHE.orchestrationsAwaitingTermination.put(orchestrationID, orchestrationInfo)
             deleteOrchestration(orchestrationID, true)
           }else{
@@ -192,7 +195,7 @@ protected class FeyCore extends Actor with ActorLogging{
         }catch{
           case e: Exception =>
         }
-      case None => createOrchestration(ensemblesSpecJson,orchestrationID,orchestrationName,orchestrationTimestamp)
+      case None => createOrchestration(ensemblesSpecJson,orchestrationID,orchestrationName,orchestrationTimestamp,
globalPerformers)
     }
   }
 
@@ -206,7 +209,7 @@ protected class FeyCore extends Actor with ActorLogging{
       case Some(orchestrationAwaiting) =>
         FEY_CACHE.orchestrationsAwaitingTermination.remove(terminatedOrchestrationName)
         createOrchestration(orchestrationAwaiting.ensembleSpecJson, orchestrationAwaiting.orchestrationID,
-          orchestrationAwaiting.orchestrationName, orchestrationAwaiting.orchestrationTimestamp)
+          orchestrationAwaiting.orchestrationName, orchestrationAwaiting.orchestrationTimestamp,
orchestrationAwaiting.globalPerformers)
       case None =>
     }
   }
@@ -222,7 +225,7 @@ protected class FeyCore extends Actor with ActorLogging{
     * @param orchestrationTimestamp
     */
   private def createOrchestration(ensemblesSpecJson: List[JsObject], orchestrationID: String,
-                            orchestrationName: String, orchestrationTimestamp: String) =
{
+                            orchestrationName: String, orchestrationTimestamp: String, globalPerformers:Option[List[JsObject]])
= {
     try{
       if(!FEY_CACHE.activeOrchestrations.contains(orchestrationID)) {
         val orchestration = context.actorOf(
@@ -230,7 +233,13 @@ protected class FeyCore extends Actor with ActorLogging{
           name = orchestrationID)
         FEY_CACHE.activeOrchestrations.put(orchestrationID, (orchestrationTimestamp, orchestration))
         context.watch(orchestration)
-        orchestration ! CREATE_ENSEMBLES(ensemblesSpecJson)
+
+        if(globalPerformers.isDefined && globalPerformers.get.size > 0){
+          orchestration ! CREATE_GLOBAL_PERFORMERS_AND_ENSEMBLES(globalPerformers.get, ensemblesSpecJson)
+        }else {
+          orchestration ! CREATE_ENSEMBLES(ensemblesSpecJson)
+        }
+
       }else{
         log.error(s"Orchestration $orchestrationID is already defined in the network.")
       }
@@ -270,8 +279,9 @@ protected class FeyCore extends Actor with ActorLogging{
     }
   }
 
+  // TODO: Check out how to manage global performers for updating
   private def updateOrchestration(ensemblesSpecJson: List[JsObject], orchestrationID: String,
-                            orchestrationName: String, orchestrationTimestamp: String) =
{
+                            orchestrationName: String, orchestrationTimestamp: String, globalPerformers:Option[List[JsObject]])
= {
     FEY_CACHE.activeOrchestrations.get(orchestrationID) match {
       case None => log.warning(s"Orchestration not update. No active Orchestration $orchestrationID.")
       case Some(orchestration) => {
@@ -368,4 +378,4 @@ private object FEY_CACHE{
 }
 
 sealed case class OrchestrationInformation(ensembleSpecJson: List[JsObject], orchestrationID:
String,
-                                     orchestrationName: String, orchestrationTimestamp: String)
+                                     orchestrationName: String, orchestrationTimestamp: String,
globalPerformers:Option[List[JsObject]])

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala b/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala
new file mode 100644
index 0000000..e343f2e
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import akka.actor.SupervisorStrategy.Restart
+import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Terminated}
+import akka.routing._
+import play.api.libs.json.JsObject
+
+import scala.collection.mutable.HashMap
+import scala.concurrent.duration._
+
+protected class GlobalPerformer(val orchestrationID: String,
+                                val orchestrationName: String,
+                                val globalPerformers: List[JsObject],
+                                val ensemblesSpec :  List[JsObject]) extends Actor with ActorLogging{
+
+  val monitoring_actor = FEY_MONITOR.actorRef
+  var global_metadata: Map[String, Performer] = Map.empty[String, Performer]
+  var global_performer: Map[String,ActorRef] = Map.empty[String,ActorRef]
+
+  override def receive: Receive = {
+
+    case GlobalPerformer.PRINT_GLOBAL =>
+      context.actorSelection(s"*") ! FeyGenericActor.PRINT_PATH
+
+    case Terminated(actor) =>
+      monitoring_actor  ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
+      log.error(s"DEAD Global Performers ${actor.path.name}")
+      context.children.foreach{ child =>
+        context.unwatch(child)
+        context.stop(child)
+      }
+      throw new RestartGlobalPerformers(s"DEAD Global Performer ${actor.path.name}")
+
+    case GetRoutees => //Discard
+
+    case x => log.warning(s"Message $x not treated by Global Performers")
+  }
+
+  /**
+    * If any of the global performer dies, it tries to restart it.
+    * If we could not be restarted, then the terminated message will be received
+    * and Global Performer is going to throw an Exception to its orchestration
+    * asking it to Restart all the entire orchestration. The restart will then stop all of
its
+    * children when call the preStart.
+    */
+  override val supervisorStrategy =
+  OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
+    case e: Exception => Restart
+  }
+
+  /**
+    * Uses the json spec to create the performers
+    */
+  override def preStart() : Unit = {
+
+    monitoring_actor ! Monitor.START(Utils.getTimestamp)
+
+    global_metadata = Ensemble.extractPerformers(globalPerformers)
+
+    createGlobalPerformers()
+
+  }
+
+  override def postStop() : Unit = {
+    monitoring_actor  ! Monitor.STOP(Utils.getTimestamp)
+  }
+
+  override def postRestart(reason: Throwable): Unit = {
+    monitoring_actor  ! Monitor.RESTART(reason, Utils.getTimestamp)
+    preStart()
+  }
+
+  private def createGlobalPerformers() = {
+    try {
+      global_metadata.foreach((global_performer) => {
+        createFeyActor(global_performer._1, global_performer._2)
+      })
+      context.parent ! Orchestration.CREATE_ENSEMBLES(ensemblesSpec)
+    } catch {
+      /* if the creation fails, it will stop the orchestration */
+      case e: Exception =>
+        log.error(e,"During Global Manager creation")
+        throw new RestartGlobalPerformers("Could not create global performer")
+    }
+  }
+
+  private def createFeyActor(performerID: String, performerInfo: Performer) = {
+    val actor: ActorRef = {
+      val actorProps = getPerformer(performerInfo)
+      if (performerInfo.autoScale) {
+
+        val resizer = DefaultResizer(lowerBound = performerInfo.lowerBound, upperBound =
performerInfo.upperBound,
+          messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = performerInfo.backoffThreshold,
backoffRate = 0.1)
+
+        val strategy =
+          if (performerInfo.isRoundRobin) {
+            log.info(s"Using Round Robin for performer ${performerID}")
+            RoundRobinPool(1, Some(resizer))
+          } else {
+            log.info(s"Using Smallest mailbox for performer ${performerID}")
+            SmallestMailboxPool(1, Some(resizer))
+          }
+
+        context.actorOf(strategy.props(actorProps), name = performerID)
+      } else {
+        context.actorOf(actorProps, name = performerID)
+      }
+    }
+
+    context.watch(actor)
+    GlobalPerformer.activeGlobalPerformers.get(orchestrationID) match {
+      case Some(globals) => GlobalPerformer.activeGlobalPerformers.put(orchestrationID,
(globals ++ Map(performerID -> actor)))
+      case None => GlobalPerformer.activeGlobalPerformers.put(orchestrationID, Map(performerID
-> actor))
+    }
+  }
+
+  /**
+    * Creates actor props based on JSON configuration
+    *
+    * @param performerInfo Performer object
+    * @return Props of actor based on JSON config
+    */
+  private def getPerformer(performerInfo: Performer): Props = {
+
+    val clazz = loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}",
performerInfo.jarName)
+
+    val dispatcher = if(performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}"
else ""
+
+    val actorProps = Props(clazz,
+      performerInfo.parameters, performerInfo.backoff, Map.empty, performerInfo.schedule,
orchestrationName, orchestrationID, performerInfo.autoScale)
+
+    // dispatcher has higher priority than controlAware. That means that if both are defined
+    // then the custom dispatcher will be used
+    if(dispatcher != ""){
+      log.info(s"Using dispatcher: $dispatcher")
+      actorProps.withDispatcher(dispatcher)
+    }
+    else if(performerInfo.controlAware){
+      actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX)
+    }else{
+      actorProps
+    }
+  }
+
+  /**
+    * Load a clazz instance of FeyGenericActor from a jar
+    *
+    * @param classPath class path
+    * @param jarLocation Full path where to load the jar from
+    * @return clazz instance of FeyGenericActor
+    */
+  private def loadClazzFromJar(classPath: String, jarLocation: String, jarName: String):Class[FeyGenericActor]
= {
+    try {
+      Utils.loadActorClassFromJar(jarLocation,classPath,jarName)
+    }catch {
+      case e: Exception =>
+        log.error(e,s"Could not load class $classPath from jar $jarLocation. Please, check
the Jar repository path as well the jar name")
+        throw e
+    }
+  }
+
+}
+
+object GlobalPerformer{
+
+  val activeGlobalPerformers:HashMap[String, Map[String, ActorRef]] = HashMap.empty[String,
Map[String, ActorRef]]
+
+  case object PRINT_GLOBAL
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
index 1e47a80..0b9152b 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
@@ -44,6 +44,7 @@ protected class Orchestration(val name: String,
     case CREATE_ENSEMBLES(ensemblesJsonSpec) => createEnsembles(ensemblesJsonSpec)
     case DELETE_ENSEMBLES(ensemblesJsonSpec) => deleteEnsembles(ensemblesJsonSpec)
     case UPDATE_ENSEMBLES(ensemblesJsonSpec) => updateEnsembles(ensemblesJsonSpec)
+    case CREATE_GLOBAL_PERFORMERS_AND_ENSEMBLES(globalSpec, ensembleSpec) => createGlobalsAndEnsembles(globalSpec,
ensembleSpec)
 
     case PRINT_PATH =>
       log.info(s"** ${self.path} **")
@@ -64,6 +65,7 @@ protected class Orchestration(val name: String,
 
   override val supervisorStrategy =
     OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
+      case e: RestartGlobalPerformers => akka.actor.SupervisorStrategy.Escalate
       case e: RestartEnsemble => Restart
       case e: Exception => Restart
     }
@@ -103,9 +105,18 @@ protected class Orchestration(val name: String,
     */
   private def replayOrchestrationState() = {
     val ensemblesSpec = ORCHESTRATION_CACHE.orchestration_metadata.get(guid).get.map(_._2).toList
-    ORCHESTRATION_CACHE.orchestration_metadata.remove(guid)
-    ORCHESTRATION_CACHE.orchestration_name.remove(guid)
-    self ! CREATE_ENSEMBLES(ensemblesSpec)
+    val global = ORCHESTRATION_CACHE.orchestration_globals.get(guid)
+    if(global.isDefined){
+      val globalSpec = global.get.map(_._2).toList
+      ORCHESTRATION_CACHE.orchestration_metadata.remove(guid)
+      ORCHESTRATION_CACHE.orchestration_name.remove(guid)
+      ORCHESTRATION_CACHE.orchestration_globals.remove(guid)
+      self ! CREATE_GLOBAL_PERFORMERS_AND_ENSEMBLES(globalSpec, ensemblesSpec)
+    }else{
+      ORCHESTRATION_CACHE.orchestration_metadata.remove(guid)
+      ORCHESTRATION_CACHE.orchestration_name.remove(guid)
+      self ! CREATE_ENSEMBLES(ensemblesSpec)
+    }
   }
 
   /**
@@ -168,6 +179,40 @@ protected class Orchestration(val name: String,
     * @param ensemblesJsonSpec
     * @return
     */
+  private def createGlobalsAndEnsembles(globalSpec: List[JsObject], ensemblesJsonSpec: List[JsObject])
= {
+    log.info(s"Creating global performers: ${globalSpec}")
+    try{
+      // Actor will send message to orchestration to create ensembles once global performers
are created
+      val global_manager = context.actorOf(Props(classOf[GlobalPerformer], guid, name, globalSpec,
ensemblesJsonSpec), name = "GLOBAL_MANAGER")
+      context.watch(global_manager)
+    }catch{
+      case e: Exception =>
+        log.error(s"Could not create Global Performers manager actor for orchestration $guid")
+        throw new RestartOrchestration(s"Could not create global actors")
+    }
+
+    //Fill orchestration_globals
+    ORCHESTRATION_CACHE.orchestration_globals.get(guid) match {
+      case None =>
+        ORCHESTRATION_CACHE.orchestration_globals.put(guid, (globalSpec.map(global =>
{
+          val guid = (global \ GUID).as[String]
+          (guid, global)
+        }).toMap))
+      case Some(cachedGlobals) =>
+        ORCHESTRATION_CACHE.orchestration_metadata.put(guid, cachedGlobals ++ (globalSpec.map(global
=> {
+          val guid = (global \ GUID).as[String]
+          (guid, global)
+        }).toMap))
+    }
+  }
+
+  /**
+    * Creates Ensembles from the json specification and make it
+    * a member of the orchestration Ensembles
+    *
+    * @param ensemblesJsonSpec
+    * @return
+    */
   private def createEnsembles(ensemblesJsonSpec: List[JsObject]) = {
     log.info(s"Creating Ensembles: ${ensemblesJsonSpec}")
     val newEnsembles = ensemblesJsonSpec.map(ensembleSpec => {
@@ -236,6 +281,7 @@ protected object Orchestration{
   case class CREATE_ENSEMBLES(ensemblesJsonSpec: List[JsObject])
   case class DELETE_ENSEMBLES(ensemblesJsonSpec: List[JsObject])
   case class UPDATE_ENSEMBLES(ensemblesJsonSpec: List[JsObject])
+  case class CREATE_GLOBAL_PERFORMERS_AND_ENSEMBLES(globalPerformersSpec: List[JsObject],
ensemblesJsonSpec: List[JsObject])
   case object PRINT_PATH
 }
 
@@ -249,5 +295,6 @@ protected object ORCHESTRATION_CACHE{
     * Value = Map[Ensemble GUID, JsObject of the ensemble]
     */
   val orchestration_metadata: HashMap[String, Map[String,JsObject]] = HashMap.empty[String,
Map[String,JsObject]]
+  val orchestration_globals : HashMap[String, Map[String,JsObject]] = HashMap.empty[String,
Map[String,JsObject]]
   val orchestration_name: HashMap[String, String] = HashMap.empty
 }

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index fda1a30..3bf0eb8 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -201,6 +201,7 @@ object JSON_PATH{
   val JAR_CRED_USER = "user"
   val JAR_CRED_PASSWORD = "password"
   val PERFORMER_DISPATCHER = "dispatcher"
+  val GLOBAL_PERFORMERS = "global-performers"
 }
 
 object CONFIG{
@@ -329,3 +330,5 @@ case class IllegalPerformerCreation(message:String)  extends Exception(message)
 case class NetworkNotDefined(message:String)  extends Exception(message)
 case class CommandNotRecognized(message:String)  extends Exception(message)
 case class RestartEnsemble(message:String)  extends Exception(message)
+case class RestartGlobalPerformers(message: String) extends  Exception(message)
+case class RestartOrchestration(message: String) extends  Exception(message)
\ No newline at end of file


Mime
View raw message