Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 30277200C0D for ; Tue, 31 Jan 2017 23:29:13 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2EC27160B52; Tue, 31 Jan 2017 22:29:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D57AB160B36 for ; Tue, 31 Jan 2017 23:29:11 +0100 (CET) Received: (qmail 81771 invoked by uid 500); 31 Jan 2017 22:29:11 -0000 Mailing-List: contact commits-help@iota.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@iota.incubator.apache.org Delivered-To: mailing list commits@iota.incubator.apache.org Received: (qmail 81762 invoked by uid 99); 31 Jan 2017 22:29:11 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Jan 2017 22:29:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 98EC8180031 for ; Tue, 31 Jan 2017 22:29:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id OlzMLkfB7T23 for ; Tue, 31 Jan 2017 22:29:06 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 24FB55FE3E for ; Tue, 31 Jan 2017 22:29:05 +0000 (UTC) Received: (qmail 81216 invoked by uid 99); 31 Jan 2017 22:29:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Jan 2017 22:29:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8A13BDFD71; Tue, 31 Jan 2017 22:29:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tonyfaustini@apache.org To: commits@iota.incubator.apache.org Date: Tue, 31 Jan 2017 22:29:10 -0000 Message-Id: In-Reply-To: <694fcacef80f4c54bffc949061f61547@git.apache.org> References: <694fcacef80f4c54bffc949061f61547@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/12] incubator-iota git commit: [IOTA-36] - First version of global performers. In this version, global performers can not connect to any other performer. archived-at: Tue, 31 Jan 2017 22:29:13 -0000 [IOTA-36] - First version of global performers. In this version, global performers can not connect to any other performer. Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/580e3010 Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/580e3010 Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/580e3010 Branch: refs/heads/master Commit: 580e30103d72d2448426a120eb6a39b544661e68 Parents: 7aae205 Author: Barbara Gomes Authored: Mon Jan 30 16:03:39 2017 -0800 Committer: Barbara Gomes Committed: Mon Jan 30 16:03:39 2017 -0800 ---------------------------------------------------------------------- .../resources/fey-json-schema-validator.json | 95 ++++++++++ .../scala/org/apache/iota/fey/Ensemble.scala | 25 ++- .../scala/org/apache/iota/fey/FeyCore.scala | 34 ++-- .../org/apache/iota/fey/GlobalPerformer.scala | 186 +++++++++++++++++++ .../org/apache/iota/fey/Orchestration.scala | 53 +++++- .../main/scala/org/apache/iota/fey/Utils.scala | 3 + 6 files changed, 372 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/resources/fey-json-schema-validator.json ---------------------------------------------------------------------- diff --git a/fey-core/src/main/resources/fey-json-schema-validator.json b/fey-core/src/main/resources/fey-json-schema-validator.json index 79952dd..a86c09f 100644 --- a/fey-core/src/main/resources/fey-json-schema-validator.json +++ b/fey-core/src/main/resources/fey-json-schema-validator.json @@ -13,6 +13,101 @@ "name":{ "type":"string" }, + "global-performers":{ + "type":"array", + "items": { + "guid": { + "type": "string" + }, + "controlAware": { + "type": "boolean" + }, + "dispatcher": { + "type": "string" + }, + "autoScale": { + "type": "object", + "lowerBound": { + "type": "integer", + "minimum": 1 + }, + "upperBound": { + "type": "integer", + "minimum": 1 + }, + "backoffThreshold": { + "type": "number", + "minimum": 0.0 + }, + "roundRobin": { + "type": "boolean" + }, + "required": [ + "lowerBound", + "upperBound" + ] + }, + "schedule": { + "type": "integer", + "minimum": 0 + }, + "backoff": { + "type": "integer", + "minimum": 0 + }, + "source": { + "type": "object", + "name": { + "type": "string", + "pattern": "\\w+(\\.jar)" + }, + "location": { + "type": "object", + "url": { + "type": "string", + "pattern": "(?i)(^(http|https|file)):\/\/" + }, + "credentials": { + "user": { + "type": "string" + }, + "password": { + "type": "string" + }, + "required": [ + "user", + "password" + ] + }, + "required": [ + "url" + ] + }, + "classPath": { + "type": "string", + "pattern": "\\w+" + }, + "parameters": { + "patternProperties": { + ".*": { + "type": "string" + } + } + }, + "required": [ + "name", + "classPath", + "parameters" + ] + }, + "required":[ + "guid", + "schedule", + "backoff", + "source" + ] + } + }, "ensembles":{ "type":"array", "items":{ http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala ---------------------------------------------------------------------- diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala index 4db5f98..15b6ead 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala @@ -162,7 +162,16 @@ protected class Ensemble(val orchestrationID: String, * @return (performerID, ActorRef of the performer) */ private def createFeyActor(performerID: String, connectionIDs: Array[String], tmpActors:HashMap[String, ActorRef]):(String, ActorRef) = { - if(!tmpActors.contains(performerID)){ + // Performer is a global performer and is already created + if(GlobalPerformer.activeGlobalPerformers.contains(orchestrationID) + && GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.contains(performerID)){ + (performerID, GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.get(performerID).get) + } + // performer was already created + else if(tmpActors.contains(performerID)){ + (performerID, tmpActors.get(performerID).get) + } + else{ val performerInfo = performers_metadata.get(performerID) if (performerInfo.isDefined) { val connections: Map[String, ActorRef] = connectionIDs.map(connID => { @@ -178,12 +187,12 @@ protected class Ensemble(val orchestrationID: String, val strategy = if(performerInfo.get.isRoundRobin) { - log.info(s"Using Round Robin for performer ${performerID}") - RoundRobinPool(1, Some(resizer)) - } else { - log.info(s"Using Smallest mailbox for performer ${performerID}") - SmallestMailboxPool(1, Some(resizer)) - } + log.info(s"Using Round Robin for performer ${performerID}") + RoundRobinPool(1, Some(resizer)) + } else { + log.info(s"Using Smallest mailbox for performer ${performerID}") + SmallestMailboxPool(1, Some(resizer)) + } actor = context.actorOf(strategy.props(actorProps), name = performerID) @@ -197,8 +206,6 @@ protected class Ensemble(val orchestrationID: String, }else{ throw new IllegalPerformerCreation(s"Performer $performerID is not defined in the JSON") } - }else{ - (performerID, tmpActors.get(performerID).get) } } http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala ---------------------------------------------------------------------- diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala index 9c2b61d..99dc7c4 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala @@ -24,7 +24,7 @@ import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, PoisonPill, import akka.routing.GetRoutees import com.eclipsesource.schema._ import org.apache.iota.fey.JSON_PATH._ -import org.apache.iota.fey.Orchestration.{CREATE_ENSEMBLES, DELETE_ENSEMBLES, UPDATE_ENSEMBLES} +import org.apache.iota.fey.Orchestration.{CREATE_ENSEMBLES, CREATE_GLOBAL_PERFORMERS_AND_ENSEMBLES, DELETE_ENSEMBLES, UPDATE_ENSEMBLES} import org.apache.iota.fey.Utils._ import play.api.libs.json._ @@ -101,6 +101,7 @@ protected class FeyCore extends Actor with ActorLogging{ log.info(s"TERMINATED ${actorRef.path.name}") FEY_CACHE.activeOrchestrations.remove(actorRef.path.name) ORCHESTRATION_CACHE.orchestration_metadata.remove(actorRef.path.name) + ORCHESTRATION_CACHE.orchestration_globals.remove(actorRef.path.name) if(!FEY_CACHE.orchestrationsAwaitingTermination.isEmpty) { checkForOrchestrationWaitingForTermination(actorRef.path.name) } @@ -155,10 +156,12 @@ protected class FeyCore extends Actor with ActorLogging{ val orchestrationCommand = (orchestrationJSON \ COMMAND).as[String].toUpperCase() val orchestrationTimestamp = (orchestrationJSON \ ORCHESTRATION_TIMESTAMP).as[String] val ensembles = (orchestrationJSON \ ENSEMBLES).as[List[JsObject]] + val globalPerformers = (orchestrationJSON \ GLOBAL_PERFORMERS).asOpt[List[JsObject]] + orchestrationCommand match { - case "RECREATE" => recreateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp) - case "CREATE" => createOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp) - case "UPDATE" => updateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp) + case "RECREATE" => recreateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp, globalPerformers) + case "CREATE" => createOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp, globalPerformers) + case "UPDATE" => updateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp, globalPerformers) case "DELETE" => deleteOrchestration(orchestrationID,true) case x => throw new CommandNotRecognized(s"Command: $x") } @@ -177,13 +180,13 @@ protected class FeyCore extends Actor with ActorLogging{ * @return */ private def recreateOrchestration(ensemblesSpecJson: List[JsObject], orchestrationID: String, - orchestrationName: String, orchestrationTimestamp: String) = { + orchestrationName: String, orchestrationTimestamp: String, globalPerformers:Option[List[JsObject]]) = { FEY_CACHE.activeOrchestrations.get(orchestrationID) match { case Some(orchestration) => try{ // If timestamp is greater than the last timestamp if(orchestration._1 != orchestrationTimestamp){ - val orchestrationInfo = new OrchestrationInformation(ensemblesSpecJson,orchestrationID,orchestrationName,orchestrationTimestamp) + val orchestrationInfo = new OrchestrationInformation(ensemblesSpecJson,orchestrationID,orchestrationName,orchestrationTimestamp, globalPerformers) FEY_CACHE.orchestrationsAwaitingTermination.put(orchestrationID, orchestrationInfo) deleteOrchestration(orchestrationID, true) }else{ @@ -192,7 +195,7 @@ protected class FeyCore extends Actor with ActorLogging{ }catch{ case e: Exception => } - case None => createOrchestration(ensemblesSpecJson,orchestrationID,orchestrationName,orchestrationTimestamp) + case None => createOrchestration(ensemblesSpecJson,orchestrationID,orchestrationName,orchestrationTimestamp, globalPerformers) } } @@ -206,7 +209,7 @@ protected class FeyCore extends Actor with ActorLogging{ case Some(orchestrationAwaiting) => FEY_CACHE.orchestrationsAwaitingTermination.remove(terminatedOrchestrationName) createOrchestration(orchestrationAwaiting.ensembleSpecJson, orchestrationAwaiting.orchestrationID, - orchestrationAwaiting.orchestrationName, orchestrationAwaiting.orchestrationTimestamp) + orchestrationAwaiting.orchestrationName, orchestrationAwaiting.orchestrationTimestamp, orchestrationAwaiting.globalPerformers) case None => } } @@ -222,7 +225,7 @@ protected class FeyCore extends Actor with ActorLogging{ * @param orchestrationTimestamp */ private def createOrchestration(ensemblesSpecJson: List[JsObject], orchestrationID: String, - orchestrationName: String, orchestrationTimestamp: String) = { + orchestrationName: String, orchestrationTimestamp: String, globalPerformers:Option[List[JsObject]]) = { try{ if(!FEY_CACHE.activeOrchestrations.contains(orchestrationID)) { val orchestration = context.actorOf( @@ -230,7 +233,13 @@ protected class FeyCore extends Actor with ActorLogging{ name = orchestrationID) FEY_CACHE.activeOrchestrations.put(orchestrationID, (orchestrationTimestamp, orchestration)) context.watch(orchestration) - orchestration ! CREATE_ENSEMBLES(ensemblesSpecJson) + + if(globalPerformers.isDefined && globalPerformers.get.size > 0){ + orchestration ! CREATE_GLOBAL_PERFORMERS_AND_ENSEMBLES(globalPerformers.get, ensemblesSpecJson) + }else { + orchestration ! CREATE_ENSEMBLES(ensemblesSpecJson) + } + }else{ log.error(s"Orchestration $orchestrationID is already defined in the network.") } @@ -270,8 +279,9 @@ protected class FeyCore extends Actor with ActorLogging{ } } + // TODO: Check out how to manage global performers for updating private def updateOrchestration(ensemblesSpecJson: List[JsObject], orchestrationID: String, - orchestrationName: String, orchestrationTimestamp: String) = { + orchestrationName: String, orchestrationTimestamp: String, globalPerformers:Option[List[JsObject]]) = { FEY_CACHE.activeOrchestrations.get(orchestrationID) match { case None => log.warning(s"Orchestration not update. No active Orchestration $orchestrationID.") case Some(orchestration) => { @@ -368,4 +378,4 @@ private object FEY_CACHE{ } sealed case class OrchestrationInformation(ensembleSpecJson: List[JsObject], orchestrationID: String, - orchestrationName: String, orchestrationTimestamp: String) + orchestrationName: String, orchestrationTimestamp: String, globalPerformers:Option[List[JsObject]]) http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala ---------------------------------------------------------------------- diff --git a/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala b/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala new file mode 100644 index 0000000..e343f2e --- /dev/null +++ b/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iota.fey + +import akka.actor.SupervisorStrategy.Restart +import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Terminated} +import akka.routing._ +import play.api.libs.json.JsObject + +import scala.collection.mutable.HashMap +import scala.concurrent.duration._ + +protected class GlobalPerformer(val orchestrationID: String, + val orchestrationName: String, + val globalPerformers: List[JsObject], + val ensemblesSpec : List[JsObject]) extends Actor with ActorLogging{ + + val monitoring_actor = FEY_MONITOR.actorRef + var global_metadata: Map[String, Performer] = Map.empty[String, Performer] + var global_performer: Map[String,ActorRef] = Map.empty[String,ActorRef] + + override def receive: Receive = { + + case GlobalPerformer.PRINT_GLOBAL => + context.actorSelection(s"*") ! FeyGenericActor.PRINT_PATH + + case Terminated(actor) => + monitoring_actor ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp) + log.error(s"DEAD Global Performers ${actor.path.name}") + context.children.foreach{ child => + context.unwatch(child) + context.stop(child) + } + throw new RestartGlobalPerformers(s"DEAD Global Performer ${actor.path.name}") + + case GetRoutees => //Discard + + case x => log.warning(s"Message $x not treated by Global Performers") + } + + /** + * If any of the global performer dies, it tries to restart it. + * If we could not be restarted, then the terminated message will be received + * and Global Performer is going to throw an Exception to its orchestration + * asking it to Restart all the entire orchestration. The restart will then stop all of its + * children when call the preStart. + */ + override val supervisorStrategy = + OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) { + case e: Exception => Restart + } + + /** + * Uses the json spec to create the performers + */ + override def preStart() : Unit = { + + monitoring_actor ! Monitor.START(Utils.getTimestamp) + + global_metadata = Ensemble.extractPerformers(globalPerformers) + + createGlobalPerformers() + + } + + override def postStop() : Unit = { + monitoring_actor ! Monitor.STOP(Utils.getTimestamp) + } + + override def postRestart(reason: Throwable): Unit = { + monitoring_actor ! Monitor.RESTART(reason, Utils.getTimestamp) + preStart() + } + + private def createGlobalPerformers() = { + try { + global_metadata.foreach((global_performer) => { + createFeyActor(global_performer._1, global_performer._2) + }) + context.parent ! Orchestration.CREATE_ENSEMBLES(ensemblesSpec) + } catch { + /* if the creation fails, it will stop the orchestration */ + case e: Exception => + log.error(e,"During Global Manager creation") + throw new RestartGlobalPerformers("Could not create global performer") + } + } + + private def createFeyActor(performerID: String, performerInfo: Performer) = { + val actor: ActorRef = { + val actorProps = getPerformer(performerInfo) + if (performerInfo.autoScale) { + + val resizer = DefaultResizer(lowerBound = performerInfo.lowerBound, upperBound = performerInfo.upperBound, + messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = performerInfo.backoffThreshold, backoffRate = 0.1) + + val strategy = + if (performerInfo.isRoundRobin) { + log.info(s"Using Round Robin for performer ${performerID}") + RoundRobinPool(1, Some(resizer)) + } else { + log.info(s"Using Smallest mailbox for performer ${performerID}") + SmallestMailboxPool(1, Some(resizer)) + } + + context.actorOf(strategy.props(actorProps), name = performerID) + } else { + context.actorOf(actorProps, name = performerID) + } + } + + context.watch(actor) + GlobalPerformer.activeGlobalPerformers.get(orchestrationID) match { + case Some(globals) => GlobalPerformer.activeGlobalPerformers.put(orchestrationID, (globals ++ Map(performerID -> actor))) + case None => GlobalPerformer.activeGlobalPerformers.put(orchestrationID, Map(performerID -> actor)) + } + } + + /** + * Creates actor props based on JSON configuration + * + * @param performerInfo Performer object + * @return Props of actor based on JSON config + */ + private def getPerformer(performerInfo: Performer): Props = { + + val clazz = loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName) + + val dispatcher = if(performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else "" + + val actorProps = Props(clazz, + performerInfo.parameters, performerInfo.backoff, Map.empty, performerInfo.schedule, orchestrationName, orchestrationID, performerInfo.autoScale) + + // dispatcher has higher priority than controlAware. That means that if both are defined + // then the custom dispatcher will be used + if(dispatcher != ""){ + log.info(s"Using dispatcher: $dispatcher") + actorProps.withDispatcher(dispatcher) + } + else if(performerInfo.controlAware){ + actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX) + }else{ + actorProps + } + } + + /** + * Load a clazz instance of FeyGenericActor from a jar + * + * @param classPath class path + * @param jarLocation Full path where to load the jar from + * @return clazz instance of FeyGenericActor + */ + private def loadClazzFromJar(classPath: String, jarLocation: String, jarName: String):Class[FeyGenericActor] = { + try { + Utils.loadActorClassFromJar(jarLocation,classPath,jarName) + }catch { + case e: Exception => + log.error(e,s"Could not load class $classPath from jar $jarLocation. Please, check the Jar repository path as well the jar name") + throw e + } + } + +} + +object GlobalPerformer{ + + val activeGlobalPerformers:HashMap[String, Map[String, ActorRef]] = HashMap.empty[String, Map[String, ActorRef]] + + case object PRINT_GLOBAL +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala ---------------------------------------------------------------------- diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala index 1e47a80..0b9152b 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala @@ -44,6 +44,7 @@ protected class Orchestration(val name: String, case CREATE_ENSEMBLES(ensemblesJsonSpec) => createEnsembles(ensemblesJsonSpec) case DELETE_ENSEMBLES(ensemblesJsonSpec) => deleteEnsembles(ensemblesJsonSpec) case UPDATE_ENSEMBLES(ensemblesJsonSpec) => updateEnsembles(ensemblesJsonSpec) + case CREATE_GLOBAL_PERFORMERS_AND_ENSEMBLES(globalSpec, ensembleSpec) => createGlobalsAndEnsembles(globalSpec, ensembleSpec) case PRINT_PATH => log.info(s"** ${self.path} **") @@ -64,6 +65,7 @@ protected class Orchestration(val name: String, override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) { + case e: RestartGlobalPerformers => akka.actor.SupervisorStrategy.Escalate case e: RestartEnsemble => Restart case e: Exception => Restart } @@ -103,9 +105,18 @@ protected class Orchestration(val name: String, */ private def replayOrchestrationState() = { val ensemblesSpec = ORCHESTRATION_CACHE.orchestration_metadata.get(guid).get.map(_._2).toList - ORCHESTRATION_CACHE.orchestration_metadata.remove(guid) - ORCHESTRATION_CACHE.orchestration_name.remove(guid) - self ! CREATE_ENSEMBLES(ensemblesSpec) + val global = ORCHESTRATION_CACHE.orchestration_globals.get(guid) + if(global.isDefined){ + val globalSpec = global.get.map(_._2).toList + ORCHESTRATION_CACHE.orchestration_metadata.remove(guid) + ORCHESTRATION_CACHE.orchestration_name.remove(guid) + ORCHESTRATION_CACHE.orchestration_globals.remove(guid) + self ! CREATE_GLOBAL_PERFORMERS_AND_ENSEMBLES(globalSpec, ensemblesSpec) + }else{ + ORCHESTRATION_CACHE.orchestration_metadata.remove(guid) + ORCHESTRATION_CACHE.orchestration_name.remove(guid) + self ! CREATE_ENSEMBLES(ensemblesSpec) + } } /** @@ -168,6 +179,40 @@ protected class Orchestration(val name: String, * @param ensemblesJsonSpec * @return */ + private def createGlobalsAndEnsembles(globalSpec: List[JsObject], ensemblesJsonSpec: List[JsObject]) = { + log.info(s"Creating global performers: ${globalSpec}") + try{ + // Actor will send message to orchestration to create ensembles once global performers are created + val global_manager = context.actorOf(Props(classOf[GlobalPerformer], guid, name, globalSpec, ensemblesJsonSpec), name = "GLOBAL_MANAGER") + context.watch(global_manager) + }catch{ + case e: Exception => + log.error(s"Could not create Global Performers manager actor for orchestration $guid") + throw new RestartOrchestration(s"Could not create global actors") + } + + //Fill orchestration_globals + ORCHESTRATION_CACHE.orchestration_globals.get(guid) match { + case None => + ORCHESTRATION_CACHE.orchestration_globals.put(guid, (globalSpec.map(global => { + val guid = (global \ GUID).as[String] + (guid, global) + }).toMap)) + case Some(cachedGlobals) => + ORCHESTRATION_CACHE.orchestration_metadata.put(guid, cachedGlobals ++ (globalSpec.map(global => { + val guid = (global \ GUID).as[String] + (guid, global) + }).toMap)) + } + } + + /** + * Creates Ensembles from the json specification and make it + * a member of the orchestration Ensembles + * + * @param ensemblesJsonSpec + * @return + */ private def createEnsembles(ensemblesJsonSpec: List[JsObject]) = { log.info(s"Creating Ensembles: ${ensemblesJsonSpec}") val newEnsembles = ensemblesJsonSpec.map(ensembleSpec => { @@ -236,6 +281,7 @@ protected object Orchestration{ case class CREATE_ENSEMBLES(ensemblesJsonSpec: List[JsObject]) case class DELETE_ENSEMBLES(ensemblesJsonSpec: List[JsObject]) case class UPDATE_ENSEMBLES(ensemblesJsonSpec: List[JsObject]) + case class CREATE_GLOBAL_PERFORMERS_AND_ENSEMBLES(globalPerformersSpec: List[JsObject], ensemblesJsonSpec: List[JsObject]) case object PRINT_PATH } @@ -249,5 +295,6 @@ protected object ORCHESTRATION_CACHE{ * Value = Map[Ensemble GUID, JsObject of the ensemble] */ val orchestration_metadata: HashMap[String, Map[String,JsObject]] = HashMap.empty[String, Map[String,JsObject]] + val orchestration_globals : HashMap[String, Map[String,JsObject]] = HashMap.empty[String, Map[String,JsObject]] val orchestration_name: HashMap[String, String] = HashMap.empty } http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala ---------------------------------------------------------------------- diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala index fda1a30..3bf0eb8 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala @@ -201,6 +201,7 @@ object JSON_PATH{ val JAR_CRED_USER = "user" val JAR_CRED_PASSWORD = "password" val PERFORMER_DISPATCHER = "dispatcher" + val GLOBAL_PERFORMERS = "global-performers" } object CONFIG{ @@ -329,3 +330,5 @@ case class IllegalPerformerCreation(message:String) extends Exception(message) case class NetworkNotDefined(message:String) extends Exception(message) case class CommandNotRecognized(message:String) extends Exception(message) case class RestartEnsemble(message:String) extends Exception(message) +case class RestartGlobalPerformers(message: String) extends Exception(message) +case class RestartOrchestration(message: String) extends Exception(message) \ No newline at end of file