eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [08/11] incubator-eagle git commit: [EAGLE-382][EAGLE-385] Monitoring Application Framework Core
Date Thu, 21 Jul 2016 12:28:52 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
new file mode 100644
index 0000000..6b9c033
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import org.apache.eagle.service.application.AppManagerConstants
+import org.apache.eagle.stream.application.impl.StormExecutionPlatform
+import org.slf4j.{LoggerFactory, Logger}
+
+import scala.collection.mutable
+
+
+object ExecutionPlatformFactory {
+  private val LOG: Logger = LoggerFactory.getLogger(ExecutionPlatformFactory.getClass)
+
+  var managerCache = new mutable.HashMap[String, ExecutionPlatform] with
+    mutable.SynchronizedMap[String, ExecutionPlatform]
+
+  def getApplicationManager(managerType: String): ExecutionPlatform = {
+    if(managerCache.contains(managerType)) {
+      managerCache.get(managerType).get
+    } else {
+      managerType match {
+        case AppManagerConstants.EAGLE_CLUSTER_STORM =>
+          val instance = new StormExecutionPlatform
+          managerCache.put(managerType, instance)
+          instance
+        case _ =>
+          throw new Exception(s"Invalid managerType $managerType")
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
new file mode 100644
index 0000000..07737ac
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import org.codehaus.jackson.annotate.JsonIgnore
+
+class TaskExecutor(runnable: Runnable) extends Thread(runnable) {
+
+  @JsonIgnore override def getContextClassLoader: ClassLoader = {
+    return super.getContextClassLoader
+  }
+
+  @JsonIgnore override def getUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = {
+    return super.getUncaughtExceptionHandler
+  }
+
+  def shutdown {
+    this.interrupt
+  }
+
+  def restart {
+    this.interrupt
+    this.start
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
new file mode 100644
index 0000000..7d52649
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.impl
+
+import com.typesafe.config.Config
+import org.apache.eagle.datastream.ExecutionEnvironments
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment
+import org.apache.eagle.stream.application.AbstractDynamicApplication
+import org.slf4j.LoggerFactory
+
+
+object StormDynamicTopology extends AbstractDynamicApplication {
+  val LOG = LoggerFactory.getLogger(classOf[AbstractDynamicApplication])
+
+  override def submit(application: String, config: Config) {
+    val stream = compileStream(application, config)
+    var ret = true
+
+    try {
+      val stormEnv = ExecutionEnvironments.getWithConfig[StormExecutionEnvironment](stream.getConfig)
+      stream.submit(stormEnv)
+    } catch {
+      case e: Throwable =>
+        ret = false
+        LOG.error(e.toString)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
new file mode 100644
index 0000000..af4cafa
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.impl
+
+import java.net.URLDecoder
+import java.nio.file.{Files, Paths}
+
+import backtype.storm.generated.InvalidTopologyException
+import backtype.storm.utils.{NimbusClient, Utils}
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.eagle.common.config.EagleConfigConstants
+import org.apache.eagle.service.application.AppManagerConstants
+import org.apache.eagle.service.application.entity.{TopologyDescriptionEntity, TopologyExecutionEntity, TopologyExecutionStatus}
+import org.apache.eagle.stream.application.{ApplicationManager, ApplicationManagerUtils, ExecutionPlatform, TopologyFactory}
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConversions
+
+object StormExecutionPlatform {
+  val ACTIVE: String = "ACTIVE"
+  val INACTIVE: String = "INACTIVE"
+  val KILLED: String = "KILLED"
+  val REBALANCING: String = "REBALANCING"
+}
+
+class StormExecutionPlatform extends ExecutionPlatform {
+  val LOG = LoggerFactory.getLogger(classOf[StormExecutionPlatform])
+
+  private def getNimbusClient(appConfig: com.typesafe.config.Config): NimbusClient = {
+    val conf = Utils.readStormConfig().asInstanceOf[java.util.HashMap[String, Object]]
+    conf.putAll(Utils.readCommandLineOpts().asInstanceOf[java.util.HashMap[String, Object]])
+
+    if(appConfig.hasPath("envContextConfig.nimbusHost")) {
+      LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_HOST} as ${appConfig.getString("envContextConfig.nimbusHost")}")
+      conf.put(backtype.storm.Config.NIMBUS_HOST, appConfig.getString("envContextConfig.nimbusHost"))
+    }
+
+    if(appConfig.hasPath("envContextConfig.nimbusThriftPort")) {
+      LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_THRIFT_PORT} as ${appConfig.getString("envContextConfig.nimbusThriftPort")}")
+      conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, appConfig.getNumber("envContextConfig.nimbusThriftPort"))
+    }
+    NimbusClient.getConfiguredClient(conf)
+  }
+
+  def startLocal(topologyName: String, topology: TopologyDescriptionEntity, topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
+    val worker: Thread = ApplicationManager.submit(topologyName, new Runnable {
+      override def run(): Unit = {
+        try {
+          val topologyType = topology.getType.toUpperCase()
+          topologyType match {
+            case TopologyDescriptionEntity.TYPE.CLASS =>
+              TopologyFactory.submit(topology.getExeClass, config)
+            case TopologyDescriptionEntity.TYPE.DYNAMIC =>
+              StormDynamicTopology.submit(topology.getExeClass, config)
+            case m@_ =>
+              LOG.error("Unsupported topology type: " + topology.getType)
+          }
+        } catch {
+          case ex: Throwable =>
+            LOG.error(s"topology $topologyName in local mode is interrupted with ${ex.toString}")
+        }
+      }
+    })
+    topologyExecution.setFullName(topologyName)
+    topologyExecution.setStatus(ApplicationManager.getWorkerStatus(worker.getState))
+    topologyExecution.setDescription("Running inside " + worker.toString + " in local mode")
+  }
+
+  override def start(topology: TopologyDescriptionEntity, topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
+    val stormJarPath: String = URLDecoder.decode(classOf[ExecutionPlatform].getProtectionDomain.getCodeSource.getLocation.getPath, "UTF-8")
+    if (stormJarPath == null || !Files.exists(Paths.get(stormJarPath)) || !stormJarPath.endsWith(".jar")) {
+      val errMsg = s"storm jar file $stormJarPath does not exists, or is a invalid jar file"
+      LOG.error(errMsg)
+      throw new Exception(errMsg)
+    }
+    LOG.info(s"Detected a storm.jar location at: $stormJarPath")
+    System.setProperty("storm.jar", stormJarPath)
+
+    val fullName = ApplicationManagerUtils.generateTopologyFullName(topologyExecution)
+    val extConfigStr = "envContextConfig.topologyName=%s".format(fullName)
+    val extConfig = ConfigFactory.parseString(extConfigStr)
+    val newConfig = extConfig.withFallback(config)
+
+    val mode = if(config.hasPath(AppManagerConstants.RUNNING_MODE)) config.getString(AppManagerConstants.RUNNING_MODE) else EagleConfigConstants.LOCAL_MODE
+    topologyExecution.setMode(mode)
+    if (topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) {
+      startLocal(fullName, topology, topologyExecution, newConfig)
+      return
+    }
+
+    val topologyType = topology.getType.toUpperCase()
+    topologyType match {
+      case TopologyDescriptionEntity.TYPE.CLASS =>
+        TopologyFactory.submit(topology.getExeClass, newConfig)
+      case TopologyDescriptionEntity.TYPE.DYNAMIC =>
+        StormDynamicTopology.submit(topology.getExeClass, newConfig)
+      case m@_ =>
+        throw new InvalidTopologyException("Unsupported topology type: " + topology.getType)
+    }
+    topologyExecution.setFullName(fullName)
+    //topologyExecution.setStatus(TopologyExecutionStatus.STARTED)
+  }
+
+  override def stop(topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
+    val name: String = ApplicationManagerUtils.generateTopologyFullName(topologyExecution)
+
+    if(topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) {
+      stopLocal(name, topologyExecution)
+    } else {
+      getNimbusClient(config).getClient.killTopology(name)
+      topologyExecution.setStatus(TopologyExecutionStatus.STOPPING)
+      //topologyExecution.setDescription("")
+    }
+  }
+
+  def stopLocal(name: String, topologyExecution: TopologyExecutionEntity): Unit = {
+      val taskWorker = ApplicationManager.stop(name)
+      topologyExecution.setStatus(ApplicationManager.getWorkerStatus(taskWorker.getState))
+      topologyExecution.setDescription(s"topology status is ${taskWorker.getState}")
+      /*try{
+        ApplicationManager.remove(name)
+      } catch {
+        case ex: IllegalArgumentException =>
+          LOG.warn(s"ApplicationManager.remove($name) failed as it has been removed")
+      }*/
+  }
+
+
+  def getTopology(topologyName: String, config: Config) = {
+    val topologySummery = getNimbusClient(config).getClient.getClusterInfo.get_topologies
+    JavaConversions.collectionAsScalaIterable(topologySummery).find { t => t.get_name.equals(topologyName) }
+    match {
+      case Some(t) => Some(t)
+      case None    => None
+    }
+  }
+
+  override def status(topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
+    val name: String = ApplicationManagerUtils.generateTopologyFullName(topologyExecution)
+
+    if(topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) {
+      statusLocal(name, topologyExecution)
+    } else {
+      val topology = getTopology(name, config)
+      topology match {
+        case Some(topology) =>
+          topologyExecution.setStatus(ApplicationManager.getTopologyStatus(topology.get_status()))
+          topologyExecution.setUrl(ApplicationManagerUtils.buildStormTopologyURL(config, topology.get_id()))
+          topologyExecution.setDescription(topology.toString)
+        case None =>
+          topologyExecution.setStatus(TopologyExecutionStatus.STOPPED)
+          topologyExecution.setUrl("")
+          topologyExecution.setDescription("")
+      }
+    }
+  }
+
+  def statusLocal(name: String, topologyExecution: TopologyExecutionEntity): Unit = {
+    try {
+      val currentStatus = topologyExecution.getStatus()
+      val newStatus = ApplicationManager.getWorkerStatus(ApplicationManager.get(name).getState())
+      if (!currentStatus.equals(newStatus)) {
+        LOG.info("Status of topology: %s changed from %s to %s".format(topologyExecution.getFullName, currentStatus, newStatus))
+        topologyExecution.setStatus(newStatus)
+        topologyExecution.setDescription(String.format("Status of topology: %s changed from %s to %s", name, currentStatus, newStatus))
+      } else if(currentStatus.equalsIgnoreCase(TopologyExecutionStatus.STOPPED)) {
+        ApplicationManager.remove(name)
+      }
+    }catch {
+      case ex: Throwable =>
+        topologyExecution.setDescription("")
+        topologyExecution.setStatus(TopologyExecutionStatus.STOPPED)
+    }
+  }
+
+  override def status(topologyExecutions: java.util.List[TopologyExecutionEntity], config: Config): Unit = {
+    JavaConversions.collectionAsScalaIterable(topologyExecutions) foreach {
+      topologyExecution => status(topologyExecution, config)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
new file mode 100644
index 0000000..8fbf60d
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import java.util.concurrent.Callable
+
+import akka.actor.{Actor, ActorLogging}
+import akka.dispatch.Futures
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigSyntax}
+import org.apache.eagle.common.config.EagleConfigConstants
+import org.apache.eagle.service.application.AppManagerConstants
+import org.apache.eagle.service.application.entity.TopologyOperationEntity.OPERATION
+import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, TopologyExecutionStatus, TopologyOperationEntity}
+import org.apache.eagle.stream.application.{ApplicationSchedulerAsyncDAO, ExecutionPlatformFactory}
+
+import scala.collection.JavaConversions
+import scala.util.{Failure, Success}
+
+
+private[scheduler] class AppCommandExecutor extends Actor with ActorLogging {
+  @volatile var _config: Config = _
+  @volatile var _dao: ApplicationSchedulerAsyncDAO = _
+
+  import context.dispatcher
+
+  def start(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) = {
+    val options: ConfigParseOptions = ConfigParseOptions.defaults.setSyntax(ConfigSyntax.PROPERTIES).setAllowMissing(false)
+    _dao.loadTopologyDescriptionByName(topologyOperation.getSite, topologyOperation.getApplication, topologyOperation.getTopology) onComplete {
+      case Success(topology) =>
+        val topologyConfig: Config = ConfigFactory.parseString(topology.getContext, options)
+
+        if(!topologyConfig.hasPath(EagleConfigConstants.APP_CONFIG)) {
+          topologyOperation.setMessage("Fail to detect topology configuration")
+          topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+          _dao.updateOperationStatus(topologyOperation)
+        } else {
+          val config = topologyConfig.getConfig(EagleConfigConstants.APP_CONFIG).withFallback(_config)
+          val clusterType = if(config.hasPath(AppManagerConstants.CLUSTER_ENV)) config.getString(AppManagerConstants.CLUSTER_ENV) else AppManagerConstants.EAGLE_CLUSTER_STORM
+          topologyExecution.setEnvironment(clusterType)
+
+          Futures.future(new Callable[TopologyExecutionEntity]{
+            override def call(): TopologyExecutionEntity = {
+              topologyExecution.setStatus(TopologyExecutionStatus.STARTING)
+              _dao.updateTopologyExecutionStatus(topologyExecution)
+              ExecutionPlatformFactory.getApplicationManager(clusterType).start(topology, topologyExecution, config)
+              topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
+              topologyExecution
+            }
+          }, context.dispatcher) onComplete {
+            case Success(topologyExecutionEntity) =>
+              topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
+              updateStatus(topologyExecution, topologyOperation)
+            case Failure(ex) =>
+              topologyOperation.setMessage(ex.getMessage)
+              topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+              _dao.updateOperationStatus(topologyOperation)
+          }
+        }
+
+      case Failure(ex) =>
+        topologyOperation.setMessage(ex.getMessage)
+        topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+        _dao.updateOperationStatus(topologyOperation)
+    }
+  }
+
+  def stop(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) = {
+    val clusterType = topologyExecution.getEnvironment
+
+    Futures.future(new Callable[TopologyExecutionEntity]{
+      override def call(): TopologyExecutionEntity = {
+        topologyExecution.setStatus(TopologyExecutionStatus.STOPPING)
+        _dao.updateTopologyExecutionStatus(topologyExecution)
+        ExecutionPlatformFactory.getApplicationManager(clusterType).stop(topologyExecution, _config)
+        topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
+        topologyExecution
+      }
+    }, context.dispatcher) onComplete {
+      case Success(topologyExecutionEntity) =>
+        topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
+        updateStatus(topologyExecution, topologyOperation)
+      case Failure(ex) =>
+        topologyOperation.setMessage(ex.toString)
+        topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+        _dao.updateOperationStatus(topologyOperation)
+    }
+  }
+
+  def status(topologyExecution: TopologyExecutionEntity) = {
+    val clusterType = topologyExecution.getEnvironment
+
+    Futures.future(new Callable[TopologyExecutionEntity]{
+      override def call(): TopologyExecutionEntity = {
+        ExecutionPlatformFactory.getApplicationManager(clusterType).status(topologyExecution, _config)
+        topologyExecution
+      }
+    }, context.dispatcher) onComplete {
+      case _ =>
+        _dao.updateTopologyExecutionStatus(topologyExecution)
+    }
+  }
+
+  def updateStatus(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity): Unit = {
+    _dao.updateOperationStatus(topologyOperation)
+    _dao.updateTopologyExecutionStatus(topologyExecution)
+  }
+
+  def execute(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity): Unit = {
+    try {
+      topologyOperation.getOperation match {
+        case OPERATION.START =>
+          start(topologyExecution, topologyOperation)
+        case OPERATION.STOP =>
+          stop(topologyExecution, topologyOperation)
+        case m@_ =>
+          log.warning("Unsupported operation: " + topologyOperation)
+          throw new Exception(s"Unsupported operation: ${topologyOperation.getOperation}, possible values are START/STOP")
+      }
+    } catch {
+      case e: Throwable =>
+        topologyOperation.setMessage(e.getMessage)
+        topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+        _dao.updateOperationStatus(topologyOperation)
+    }
+  }
+
+  override def receive = {
+    case InitializationEvent(config: Config) =>
+      _config = config
+      _dao = new ApplicationSchedulerAsyncDAO(config, context.dispatcher)
+    case SchedulerCommand(topologyExecution, topologyOperation) =>
+      execute(topologyExecution, topologyOperation)
+    case HealthCheckerEvent =>
+      _dao.loadAllTopologyExecutionEntities() onComplete {
+        case Success(topologyExecutions) =>
+          log.info(s"Load ${topologyExecutions.size()} topologies in execution")
+          JavaConversions.collectionAsScalaIterable(topologyExecutions) foreach { topologyExecution =>
+            try{
+              status(topologyExecution)
+            } catch {
+              case ex: Throwable =>
+                log.error(ex.getMessage)
+            }
+          }
+        case Failure(ex) =>
+          log.error(s"Fail to load any topologyExecutionEntity due to Exception: ${ex.getMessage}")
+      }
+    case TerminatedEvent =>
+      context.stop(self)
+    case m@_ =>
+      log.warning("Unsupported operation $m")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
new file mode 100644
index 0000000..c731846
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import akka.actor.{Actor, ActorLogging}
+import com.typesafe.config.Config
+import org.apache.eagle.service.application.entity.TopologyOperationEntity.OPERATION_STATUS
+import org.apache.eagle.stream.application.ApplicationSchedulerAsyncDAO
+
+import scala.collection.JavaConversions
+import scala.util.{Failure, Success}
+
+
+private[scheduler] class AppCommandLoader extends Actor with ActorLogging {
+  @volatile var _config: Config = null
+  @volatile var _dao: ApplicationSchedulerAsyncDAO = null
+
+  import context.dispatcher
+
+  override def receive = {
+    case InitializationEvent(config: Config) =>
+      _config = config
+      _dao = new ApplicationSchedulerAsyncDAO(config, context.dispatcher)
+    case ClearPendingOperation =>
+      if(_dao == null) _dao = new ApplicationSchedulerAsyncDAO(_config, context.dispatcher)
+      _dao.clearPendingOperations()
+    case CommandLoaderEvent => {
+      val _sender = sender()
+      _dao.readOperationsByStatus(OPERATION_STATUS.INITIALIZED) onComplete {
+        case Success(commands) => {
+          log.info(s"Load ${commands.size()} new commands")
+          JavaConversions.collectionAsScalaIterable(commands) foreach { command =>
+            command.setStatus(OPERATION_STATUS.PENDING)
+            _dao.updateOperationStatus(command) onComplete {
+              case Success(response) =>
+                _dao.loadTopologyExecutionByName(command.getSite, command.getApplication, command.getTopology) onComplete {
+                  case Success(topologyExecution) => {
+                    _sender ! SchedulerCommand(topologyExecution, command)
+                  }
+                  case Failure(ex) =>
+                    log.error(ex.getMessage)
+                    command.setMessage(ex.getMessage)
+                    command.setStatus(OPERATION_STATUS.FAILED)
+                    _dao.updateOperationStatus(command)
+                }
+              case Failure(ex) =>
+                log.error(s"Got an exception to update command status $command: ${ex.getMessage}")
+                command.setMessage(ex.getMessage)
+                command.setStatus(OPERATION_STATUS.FAILED)
+                _dao.updateOperationStatus(command)
+            }
+          }
+        }
+        case Failure(ex) =>
+          log.error(s"Failed to get commands due to exception ${ex.getMessage}")
+      }
+    }
+    case TerminatedEvent =>
+      context.stop(self)
+    case m@_ => throw new UnsupportedOperationException(s"Event is not supported $m")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
new file mode 100644
index 0000000..476a3fb
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import akka.actor.{ActorSystem, Props}
+import com.typesafe.config.Config
+import org.apache.eagle.service.application.AppManagerConstants
+import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, TopologyOperationEntity}
+import org.apache.eagle.stream.application.ApplicationManager
+
+import scala.concurrent.duration._
+
+
+private[scheduler] class ScheduleEvent
+private[scheduler] case class InitializationEvent(config: Config) extends ScheduleEvent
+private[scheduler] case class TerminatedEvent() extends ScheduleEvent
+private[scheduler] case class CommandLoaderEvent() extends ScheduleEvent
+private[scheduler] case class HealthCheckerEvent() extends ScheduleEvent
+private[scheduler] case class ClearPendingOperation() extends ScheduleEvent
+private[scheduler] case class SchedulerCommand(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) extends ScheduleEvent
+
+case class EagleServiceUnavailableException(message:String) extends Exception(message)
+case class DuplicatedDefinitionException(message:String) extends Exception(message)
+case class LoadTopologyFailureException(message:String) extends Exception(message)
+
+
+/**
+ * 1. Sync command from eagle service
+ * 2. Coordinate command to different actor
+ * 3. Actor execute command as requested
+ */
+class ApplicationScheduler {
+  //val config = ConfigFactory.load()
+  val DEFAULT_COMMAND_LOADER_INTERVAL_SECS = 2
+  val DEFAULT_HEALTH_CHECK_INTERVAL_SECS = 5
+
+  def start(config: Config) = {
+    val system = ActorSystem("application-manager-scheduler", config)
+    system.log.info(s"Started actor system: $system")
+
+    import system.dispatcher
+
+    val commandLoaderIntervalSecs: Long = if(config.hasPath(AppManagerConstants.APP_COMMAND_LOADER_INTERVAL_SECS)) config.getLong(AppManagerConstants.APP_COMMAND_LOADER_INTERVAL_SECS) else DEFAULT_COMMAND_LOADER_INTERVAL_SECS
+    val healthCheckIntervalSecs: Long = if(config.hasPath(AppManagerConstants.APP_HEALTH_CHECK_INTERVAL_SECS)) config.getLong(AppManagerConstants.APP_HEALTH_CHECK_INTERVAL_SECS) else DEFAULT_HEALTH_CHECK_INTERVAL_SECS
+
+    val coordinator = system.actorOf(Props[StreamAppCoordinator])
+    system.scheduler.scheduleOnce(0 seconds, coordinator, InitializationEvent(config))
+    system.scheduler.scheduleOnce(1 seconds, coordinator, ClearPendingOperation)
+    system.scheduler.schedule(2.seconds, commandLoaderIntervalSecs.seconds, coordinator, CommandLoaderEvent)
+    system.scheduler.schedule(10.seconds, healthCheckIntervalSecs.seconds, coordinator, HealthCheckerEvent)
+
+    /*
+     registerOnTermination is called when you have shut down the ActorSystem (system.shutdown),
+     and the callbacks will be executed after all actors have been stopped.
+     */
+    system.registerOnTermination(new Runnable {
+      override def run(): Unit = {
+        coordinator ! TerminatedEvent
+        ApplicationManager.stopAll()
+      }
+    })
+    system
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
new file mode 100644
index 0000000..17006ee
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import akka.actor.{Actor, ActorLogging, ActorRef, Props}
+
+private[scheduler] class StreamAppCoordinator extends Actor with ActorLogging {
+  var commandLoader: ActorRef = null
+  var commandExecutor: ActorRef = null
+
+
+  override def preStart(): Unit = {
+    commandLoader = context.actorOf(Props[AppCommandLoader], "command-loader")
+    commandExecutor = context.actorOf(Props[AppCommandExecutor], "command-worker")
+  }
+
+  override def receive = {
+    case InitializationEvent(config) => {
+      log.info(s"Config updated: $config")
+      commandLoader ! InitializationEvent(config)
+      commandExecutor ! InitializationEvent(config)
+    }
+    case ClearPendingOperation =>
+      commandLoader ! ClearPendingOperation
+    case CommandLoaderEvent =>
+      commandLoader ! CommandLoaderEvent
+    case command: SchedulerCommand =>
+      log.info(s"Executing command: $SchedulerCommand")
+      commandExecutor ! command
+    case HealthCheckerEvent =>
+      commandExecutor ! HealthCheckerEvent
+    case TerminatedEvent =>
+      log.info("Coordinator exit ...")
+      context.stop(self)
+    case m@_ =>
+      log.warning(s"Coordinator Unsupported message: $m")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf
new file mode 100644
index 0000000..4c21a7c
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+### scheduler propertise
+appCommandLoaderIntervalSecs = 1
+appHealthCheckIntervalSecs = 5
+
+### execution platform properties
+envContextConfig.env = "storm"
+envContextConfig.url = "http://sandbox.hortonworks.com:8744"
+envContextConfig.nimbusHost = "sandbox.hortonworks.com"
+envContextConfig.nimbusThriftPort = 6627
+envContextConfig.jarFile = "/dir-to-jar/eagle-topology-0.3.0-incubating-assembly.jar"
+
+### default topology properties
+eagleProps.mailHost = "mailHost.com"
+eagleProps.mailSmtpPort = "25"
+eagleProps.mailDebug = "true"
+eagleProps.eagleService.host = "localhost"
+eagleProps.eagleService.port = 9099
+eagleProps.eagleService.username = "admin"
+eagleProps.eagleService.password = "secret"
+eagleProps.dataJoinPollIntervalSec = 30
+
+dynamicConfigSource.enabled = true
+dynamicConfigSource.initDelayMillis = 0
+dynamicConfigSource.delayMillis = 30000
+
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties
new file mode 100644
index 0000000..25331ab
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+ eagle.log.dir=../logs
+ eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+ log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala
new file mode 100644
index 0000000..e87ee92
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import com.typesafe.config.Config
+import org.apache.eagle.stream.application.TopologyExecutable
+import org.slf4j.LoggerFactory
+
+class MockTopology extends TopologyExecutable {
+  private val LOG = LoggerFactory.getLogger(classOf[MockTopology])
+  override def submit(topology: String, config: Config): Unit = {
+    LOG.info(s"$topology is running")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala
new file mode 100644
index 0000000..1cad3a7
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala
@@ -0,0 +1,40 @@
+package org.apache.eagle.stream.application.scheduler
+
+import com.typesafe.config.ConfigFactory
+import org.apache.eagle.common.config.EagleConfigConstants
+import org.apache.eagle.stream.application.ExecutionPlatform
+import org.apache.eagle.stream.application.impl.StormExecutionPlatform
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+
+object StormApplicationManagerSpec extends App {
+  val manager: ExecutionPlatform = new StormExecutionPlatform
+  val baseConfig = ConfigFactory.load()
+  val topoConfigStr: String = "webConfig{\"hbase.zookeeper.property.clientPort\":\"2181\", \"hbase.zookeeper.quorum\":\"localhost\"}\nappConfig{\n  \"envContextConfig\" : {\n    \"env\" : \"storm\",\n    \"mode\" : \"cluster\",\n    \"topologyName\" : \"sandbox-hbaseSecurityLog-topology\",\n    \"stormConfigFile\" : \"security-auditlog-storm.yaml\",\n    \"parallelismConfig\" : {\n      \"kafkaMsgConsumer\" : 1,\n      \"hbaseSecurityLogAlertExecutor*\" : 1\n    }\n  },\n  \"dataSourceConfig\": {\n    \"topic\" : \"sandbox_hbase_security_log\",\n    \"zkConnection\" : \"127.0.0.1:2181\",\n    \"zkConnectionTimeoutMS\" : 15000,\n    \"brokerZkPath\" : \"/brokers\",\n    \"fetchSize\" : 1048586,\n    \"deserializerClass\" : \"org.apache.eagle.security.hbase.parse.HbaseAuditLogKafkaDeserializer\",\n    \"transactionZKServers\" : \"127.0.0.1\",\n    \"transactionZKPort\" : 2181,\n    \"transactionZKRoot\" : \"/consumers\",\n    \"consumerGroupId\" : \"eagle.hbasesecurity.consumer\",\n  
   \"transactionStateUpdateMS\" : 2000\n  },\n  \"alertExecutorConfigs\" : {\n     \"hbaseSecurityLogAlertExecutor\" : {\n       \"parallelism\" : 1,\n       \"partitioner\" : \"org.apache.eagle.policy.DefaultPolicyPartitioner\"\n       \"needValidation\" : \"true\"\n     }\n  },\n  \"eagleProps\" : {\n    \"site\" : \"sandbox\",\n    \"application\": \"hbaseSecurityLog\",\n    \"dataJoinPollIntervalSec\" : 30,\n    \"mailHost\" : \"mailHost.com\",\n    \"mailSmtpPort\":\"25\",\n    \"mailDebug\" : \"true\",\n    \"eagleService\": {\n      \"host\": \"localhost\",\n      \"port\": 9099\n      \"username\": \"admin\",\n      \"password\": \"secret\"\n    }\n  },\n  \"dynamicConfigSource\" : {\n    \"enabled\" : true,\n    \"initDelayMillis\" : 0,\n    \"delayMillis\" : 30000\n  }\n}"
+
+  val topoConfig = ConfigFactory.parseString(topoConfigStr)
+  val conf = topoConfig.getConfig(EagleConfigConstants.APP_CONFIG).withFallback(baseConfig)
+
+  //val (ret, nextState) = manager.execute("START", topologyDescModel, null, conf)
+  //println(s"Result: ret=$ret, nextState=$nextState")
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala
new file mode 100644
index 0000000..3db2d67
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.{TestActorRef, TestKit}
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{Ignore, BeforeAndAfterAll, MustMatchers, WordSpecLike}
+
+@Ignore
+class TestSchedulerSpec extends TestKit(ActorSystem("stream-app-scheduler"))
+with WordSpecLike with MustMatchers with BeforeAndAfterAll {
+
+  "A Scheduler actor" must {
+    "Forward a message it receives" in {
+      val coordinator = TestActorRef[StreamAppCoordinator]
+      coordinator ! CommandLoaderEvent
+      expectNoMsg()
+    }
+  }
+
+  "A Integrated test" must {
+    "run end-to-end" in {
+      val coordinator = system.actorOf(Props[StreamAppCoordinator])
+      coordinator ! CommandLoaderEvent
+      expectNoMsg()
+    }
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    system.shutdown()
+  }
+}
+
+@Ignore
+object TestStreamAppScheduler extends App {
+  val conf: String = """
+                          akka.loglevel = "DEBUG"
+                          akka.actor.debug {
+                            receive = on
+                            lifecycle = on
+                          }
+                     """
+  new ApplicationScheduler().start(ConfigFactory.parseString(conf))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/pom.xml b/eagle-core/eagle-app/pom.xml
new file mode 100644
index 0000000..6f3069c
--- /dev/null
+++ b/eagle-core/eagle-app/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>eagle-core</artifactId>
+        <groupId>org.apache.eagle</groupId>
+        <version>0.5.0-incubating-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>eagle-app-parent</artifactId>
+    <packaging>pom</packaging>
+    <description>Eagle Applications</description>
+
+    <modules>
+        <module>eagle-app-base</module>
+        <module>eagle-stream-application-manager</module>
+        <module>eagle-application-service</module>
+    </modules>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-application-management/eagle-application-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-application-service/pom.xml b/eagle-core/eagle-application-management/eagle-application-service/pom.xml
deleted file mode 100644
index 377ead5..0000000
--- a/eagle-core/eagle-application-management/eagle-application-service/pom.xml
+++ /dev/null
@@ -1,54 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~  Unless required by applicable law or agreed to in writing, software
-  ~  distributed under the License is distributed on an "AS IS" BASIS,
-  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~  See the License for the specific language governing permissions and
-  ~  limitations under the License.
-  ~
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>eagle-application-management</artifactId>
-        <groupId>org.apache.eagle</groupId>
-        <version>0.5.0-incubating-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>eagle-application-service</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-policy-base</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-service-base</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <configuration>
-                    <skipTests>true</skipTests>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/AppManagerConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/AppManagerConstants.java b/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/AppManagerConstants.java
deleted file mode 100644
index 3aa3579..0000000
--- a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/AppManagerConstants.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.service.application;
-
-
-public class AppManagerConstants {
-    public final static String SITE_TAG = "site";
-    public final static String APPLICATION_TAG = "application";
-    public final static String OPERATION_TAG = "operation";
-    public final static String OPERATION_ID_TAG = "operationID";
-    public final static String TOPOLOGY_TAG = "topology";
-    public final static String FULLNAME = "fullName";
-    public final static String APPLICATION_ID = "id";
-
-    public final static String CLUSTER_ENV = "envContextConfig.env";
-    public final static String CLUSTER_URL = "envContextConfig.url";
-    public final static String DEFAULT_CLUSTER_URL = "http://sandbox.hortonworks.com:8744";
-
-    public final static String RUNNING_MODE = "envContextConfig.mode";
-    public final static String EAGLE_CLUSTER_STORM = "storm";
-    public final static String EAGLE_CLUSTER_SPARK = "spark";
-
-    public final static String APP_COMMAND_LOADER_ENABLED = "appCommandLoaderEnabled";
-    public final static String APP_COMMAND_LOADER_INTERVAL_SECS = "appCommandLoaderIntervalSecs";
-    public final static String APP_HEALTH_CHECK_INTERVAL_SECS = "appHealthCheckIntervalSecs";
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java b/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
deleted file mode 100644
index 6e4521d..0000000
--- a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.service.application;
-
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.service.application.dao.ApplicationManagerDAO;
-import org.apache.eagle.service.application.dao.ApplicationManagerDaoImpl;
-import org.apache.eagle.service.application.entity.TopologyExecutionStatus;
-import org.apache.eagle.service.application.entity.TopologyOperationEntity;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.type.TypeFactory;
-
-import javax.ws.rs.*;
-import javax.ws.rs.core.MediaType;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.*;
-
-@Path(ApplicationManagementResource.ROOT_PATH)
-public class ApplicationManagementResource {
-    private final static ApplicationManagerDAO dao = new ApplicationManagerDaoImpl();
-    public final static String ROOT_PATH = "/app";
-
-    @Path("operation")
-    @POST
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    public GenericServiceAPIResponseEntity createOperation(InputStream inputStream) {
-        GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity<>();
-        List<TopologyOperationEntity> operations = new LinkedList<>();
-        try {
-            List<TopologyOperationEntity> entities = (List<TopologyOperationEntity>) unmarshalOperationEntities(inputStream);
-            if (entities == null) {
-                throw new IllegalArgumentException("inputStream cannot convert to TopologyOperationEntity");
-            }
-            for (TopologyOperationEntity entity : entities) {
-                String status = dao.loadTopologyExecutionStatus(entity.getSite(), entity.getApplication(), entity.getTopology());
-                if(status == null) {
-                    throw new Exception(String.format("Fail to fetch the topology execution status by site=%s, application=%s, topology=%s", entity.getSite(), entity.getApplication(), entity.getTopology()));
-                }
-                int operationsInRunning = dao.loadTopologyOperationsInRunning(entity.getSite(), entity.getApplication(), entity.getTopology());
-                if(operationsInRunning !=0) {
-                    throw new Exception(operationsInRunning + "operations are running, please wait for a minute");
-                }
-                if (validateOperation(entity.getOperation(), status)) {
-                    Map<String, String> tags = entity.getTags();
-                    tags.put(AppManagerConstants.OPERATION_ID_TAG, UUID.randomUUID().toString());
-                    entity.setTags(tags);
-                    entity.setLastModifiedDate(System.currentTimeMillis());
-                    entity.setTimestamp(System.currentTimeMillis());
-                    operations.add(entity);
-                } else {
-                    throw new Exception(String.format("%s is an invalid operation, as the topology's current status is %s", entity.getOperation(), status));
-                }
-            }
-            response = dao.createOperation(operations);
-        } catch (Exception e) {
-            response.setSuccess(false);
-            response.setException(e);
-        }
-        return response;
-    }
-
-    private boolean validateOperation(String operation, String status) {
-        boolean ret = false;
-        switch (operation) {
-            case TopologyOperationEntity.OPERATION.START:
-                return TopologyExecutionStatus.isReadyToStart(status);
-            case TopologyOperationEntity.OPERATION.STOP:
-                return TopologyExecutionStatus.isReadyToStop(status);
-            default: break;
-        }
-        return ret;
-    }
-
-    private List<? extends TaggedLogAPIEntity> unmarshalOperationEntities(InputStream inputStream) throws IllegalAccessException, InstantiationException, IOException {
-        ObjectMapper objectMapper = new ObjectMapper();
-        return objectMapper.readValue(inputStream, TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, TopologyOperationEntity.class));
-    }
-
-    @Path("topology")
-    @DELETE
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    public GenericServiceAPIResponseEntity deleteTopology(@QueryParam("topology") String topology) {
-        return dao.deleteTopology(topology);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDAO.java b/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDAO.java
deleted file mode 100644
index dfa261b..0000000
--- a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDAO.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.service.application.dao;
-
-
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.service.application.entity.TopologyExecutionStatus;
-import org.apache.eagle.service.application.entity.TopologyOperationEntity;
-
-import java.util.List;
-
-public interface ApplicationManagerDAO {
-    String loadTopologyExecutionStatus(String site, String application, String topology);
-    int loadTopologyOperationsInRunning(String site, String application, String topology) throws Exception;
-    GenericServiceAPIResponseEntity createOperation(List<TopologyOperationEntity> entities) throws Exception;
-    GenericServiceAPIResponseEntity deleteTopology(String topology);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDaoImpl.java b/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDaoImpl.java
deleted file mode 100644
index 4881cf4..0000000
--- a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDaoImpl.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.service.application.dao;
-
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.service.application.entity.TopologyExecutionEntity;
-import org.apache.eagle.service.application.entity.TopologyOperationEntity;
-import org.apache.eagle.service.generic.GenericEntityServiceResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class ApplicationManagerDaoImpl implements ApplicationManagerDAO {
-    private static Logger LOG = LoggerFactory.getLogger(ApplicationManagerDaoImpl.class);
-    GenericEntityServiceResource resource = new GenericEntityServiceResource();
-
-    @Override
-    public String loadTopologyExecutionStatus(String site, String application, String topology) {
-        String query = String.format("%s[@site=\"%s\" AND @application=\"%s\" AND @topology=\"%s\"]{*}", Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME, site, application, topology);
-        GenericServiceAPIResponseEntity<TopologyExecutionEntity> response = resource.search(query,  null, null, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false);
-        if(!response.isSuccess()) {
-            LOG.error(response.getException());
-            return null;
-        }
-        List<TopologyExecutionEntity> list = response.getObj();
-        if(list == null || list.size() != 1) {
-            LOG.error("ERROR: fetching 0 or more than 1 topology execution entities");
-            return null;
-        }
-        return list.get(0).getStatus();
-    }
-
-    @Override
-    public int loadTopologyOperationsInRunning(String site, String application, String topology) throws Exception {
-        int ret = 0;
-        String query = String.format("%s[@site=\"%s\" AND @application=\"%s\" AND @topology=\"%s\" AND (@status=\"%s\" OR @status=\"%s\")]{*}", Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME, site, application, topology, TopologyOperationEntity.OPERATION_STATUS.INITIALIZED, TopologyOperationEntity.OPERATION_STATUS.PENDING);
-        GenericServiceAPIResponseEntity<TopologyExecutionEntity> response = resource.search(query, null, null, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false);
-        if(!response.isSuccess()) {
-            throw new Exception(response.getException());
-        }
-        if(response.getObj() != null && response.getObj().size() != 0) {
-            ret = response.getObj().size();
-        }
-        return ret;
-    }
-
-    @Override
-    public GenericServiceAPIResponseEntity createOperation(List<TopologyOperationEntity> entities) throws Exception {
-        if(entities.size() == 0) {
-            LOG.info("TopologyOperationEntity set is empty.");
-        }
-        GenericServiceAPIResponseEntity response = resource.updateEntities(entities, Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME);
-        return response;
-    }
-
-    @Override
-    public GenericServiceAPIResponseEntity deleteTopology(String topology) {
-        String topologyQuery = Constants.TOPOLOGY_DESCRIPTION_SERVICE_ENDPOINT_NAME+ "[@topology=\"" + topology + "\"]{*}";
-        String executionQuery = Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME + "[@topology=\"" + topology + "\"]{*}";
-        int pageSize = Integer.MAX_VALUE;
-
-        GenericServiceAPIResponseEntity response = resource.deleteByQuery(topologyQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
-        if(response.isSuccess()) {
-            response = resource.deleteByQuery(executionQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
-        }
-        if(!response.isSuccess()) {
-            LOG.error(response.getException());
-        }
-        return response;
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/ApplicationEntityRepo.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/ApplicationEntityRepo.java b/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/ApplicationEntityRepo.java
deleted file mode 100644
index 3226650..0000000
--- a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/ApplicationEntityRepo.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.service.application.entity;
-
-
-import org.apache.eagle.log.entity.repo.EntityRepository;
-
-public class ApplicationEntityRepo  extends EntityRepository {
-    public ApplicationEntityRepo() {
-        this.registerEntity(TopologyDescriptionEntity.class);
-        this.registerEntity(TopologyExecutionEntity.class);
-        this.registerEntity(TopologyOperationEntity.class);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyDescriptionEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyDescriptionEntity.java b/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyDescriptionEntity.java
deleted file mode 100644
index 6442e6c..0000000
--- a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyDescriptionEntity.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.service.application.entity;
-
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.service.application.AppManagerConstants;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("eagle_metadata")
-@ColumnFamily("f")
-@Prefix("topologyDescription")
-@Service(Constants.TOPOLOGY_DESCRIPTION_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"topology"})
-public class TopologyDescriptionEntity extends TaggedLogAPIEntity {
-    @Column("a")
-    private String exeClass;
-    @Column("b")
-    private String type;
-    @Column("c")
-    private String description;
-    @Column("d")
-    private String version;
-    private String context;
-    public String getContext() {
-        return context;
-    }
-
-    public void setContext(String context) {
-        this.context = context;
-    }
-
-    public String getExeClass() {
-        return exeClass;
-    }
-
-    public void setExeClass(String exeClass) {
-        this.exeClass = exeClass;
-        valueChanged("exeClass");
-    }
-
-    public String getType() {
-        return type;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-        valueChanged("type");
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-        valueChanged("description");
-    }
-
-    public String getVersion() {
-        return version;
-    }
-
-    public void setVersion(String version) {
-        this.version = version;
-        valueChanged("version");
-    }
-
-    public String getTopology() {
-        return this.getTags().get(AppManagerConstants.TOPOLOGY_TAG);
-    }
-
-    public final static class TYPE {
-        public final static String DYNAMIC = "DYNAMIC";
-        public final static String CLASS = "CLASS";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionEntity.java b/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionEntity.java
deleted file mode 100644
index 9991d3b..0000000
--- a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionEntity.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.service.application.entity;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.apache.eagle.policy.common.Constants;
-
-import org.apache.eagle.service.application.AppManagerConstants;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("eagle_metadata")
-@ColumnFamily("f")
-@Prefix("topologyExecution")
-@Service(Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"site", "application", "topology"})
-public class TopologyExecutionEntity extends TaggedLogAPIEntity {
-    @Column("a")
-    private String fullName;
-    @Column("b")
-    private String url;
-    @Column("c")
-    private String description;
-    @Column("d")
-    private String status;
-    @Column("e")
-    private long lastModifiedDate;
-    @Column("f")
-    private String mode;
-    @Column("g")
-    private String environment;
-
-    public String getEnvironment() {
-        return environment;
-    }
-
-    public void setEnvironment(String environment) {
-        this.environment = environment;
-        valueChanged("environment");
-    }
-
-    public String getMode() {
-        return mode;
-    }
-
-    public void setMode(String mode) {
-        this.mode = mode;
-        valueChanged("mode");
-    }
-
-    public String getFullName() {
-        return fullName;
-    }
-
-    public void setFullName(String fullName) {
-        this.fullName = fullName;
-        valueChanged("fullName");
-    }
-
-    public String getUrl() {
-        return url;
-    }
-
-    public void setUrl(String url) {
-        this.url = url;
-        valueChanged("url");
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-        valueChanged("description");
-    }
-
-    public String getStatus() {
-        return status;
-    }
-
-    public void setStatus(String status) {
-        this.status = status;
-        valueChanged("status");
-    }
-
-    public long getLastModifiedDate() {
-        return lastModifiedDate;
-    }
-
-    public void setLastModifiedDate(long lastModifiedDate) {
-        this.lastModifiedDate = lastModifiedDate;
-        valueChanged("lastModifiedDate");
-    }
-
-    public String getSite() {
-        return this.getTags().get(AppManagerConstants.SITE_TAG);
-    }
-
-    public String getApplication() {
-        return this.getTags().get(AppManagerConstants.APPLICATION_TAG);
-    }
-
-    public String getTopology() {
-        return this.getTags().get(AppManagerConstants.TOPOLOGY_TAG);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionStatus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionStatus.java b/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionStatus.java
deleted file mode 100644
index f62ad8a..0000000
--- a/eagle-core/eagle-application-management/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionStatus.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.service.application.entity;
-
-
-public class TopologyExecutionStatus {
-    public final static String STOPPED = "STOPPED";
-    public final static String STARTED = "STARTED";
-    public final static String STARTING = "STARTING";
-    public final static String STOPPING = "STOPPING";
-    public final static String NEW = "NEW";
-
-    public static boolean isReadyToStart(String status){
-        return status.equals(STOPPED) || status.equals(NEW);
-    }
-
-    public static boolean isReadyToStop(String status){
-        return status.equals(STARTED);
-    }
-
-}
-


Mime
View raw message