eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [38/51] [partial] incubator-eagle git commit: EAGLE-184 Migrate eagle website from https://github.com/eaglemonitoring/eaglemonitoring.github.io to document branch
Date Thu, 03 Mar 2016 18:10:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
deleted file mode 100644
index 2174560..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
+++ /dev/null
@@ -1,186 +0,0 @@
-package org.apache.eagle.stream.pipeline.extension
-
-import java.util.Properties
-import java.util.concurrent.atomic.AtomicBoolean
-
-import com.typesafe.config.ConfigFactory
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider
-import org.apache.eagle.datastream.core._
-import org.apache.eagle.partition.PartitionStrategy
-import org.apache.eagle.stream.pipeline.parser.Processor
-import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
-//import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-
-object ModuleManager{
-  def getModuleMapperByType(moduleType:String):ModuleMapper = {
-    classOfProcessorMapping(moduleType)
-  }
-
-  def findModuleType(moduleType:String):Boolean = classOfProcessorMapping.contains(moduleType)
-
-  val classOfProcessorMapping = Map[String,ModuleMapper](
-    "KafkaSource" -> KafkaSourceStreamProducer,
-    "KafkaSink" -> KafkaSinkStreamProducer,
-    "Alert" -> AlertStreamProducer,
-    "Persistence" -> PersistProducer,
-    "Aggregator" -> AggregatorProducer,
-    "Console" -> ConsoleStreamProducer
-  )
-}
-
-trait ModuleMapper{
-  def getType:String
-  def map(module:Processor):StreamProducer[Any]
-}
-object KafkaSourceStreamProducer extends ModuleMapper{
-  def getType = "KafkaSource"
-  override def map(module:Processor): StreamProducer[Any] = {
-    val config = module.getConfig
-    new StormSourceProducer[Any](new KafkaSourcedSpoutProvider(null).getSpout(ConfigFactory.parseMap(config.asJava)))
-  }
-}
-object KafkaSinkStreamProducer extends ModuleMapper{
-  def getType = "KafkaSink"
-  override def map(module:Processor): StreamProducer[Any] = {
-    val config = module.getConfig
-    ForeachProducer[AnyRef](KafkaSinkExecutor(config))
-  }
-}
-object ConsoleStreamProducer extends ModuleMapper{
-  override def getType: String = "Stdout"
-  override def map(module:Processor): StreamProducer[Any] = ForeachProducer[Any](m=>print(s"$m\n"))
-}
-object AlertStreamProducer extends ModuleMapper{
-  def getType:String = "Alert"
-  override def map(module:Processor): StreamProducer[Any] = {
-    val config = module.getConfig
-    val moduleId = module.getId
-    // Support create functional AlertStreamProducer constructor
-    new AlertStreamProducer (
-      upStreamNames = config.getOrElse("upStreamNames",if(module.inputIds!=null) module.inputIds.asJava else null).asInstanceOf[java.util.List[String]],
-      alertExecutorId = config.getOrElse("alertExecutorId",moduleId).asInstanceOf[String],
-      consume = config.getOrElse("consume",true).asInstanceOf[Boolean],
-      strategy = config.get("strategy") match {case Some(strategy)=> Class.forName(strategy.asInstanceOf[String]).newInstance().asInstanceOf[PartitionStrategy] case None => null}
-    )
-  }
-}
-
-object PersistProducer extends ModuleMapper{
-  override def getType = "Persistence"
-  override def map(module:Processor): StreamProducer[Any] = {
-    val config = module.getConfig
-    new PersistProducer(config.getOrElse("executorId",module.getId).asInstanceOf[String],StorageType.withName(config.getOrElse("storageType",null).asInstanceOf[String]))
-  }
-}
-
-object AggregatorProducer extends ModuleMapper{
-  override def getType: String = "Aggregator"
-  override def map(module:Processor): StreamProducer[Any] = {
-    val config = module.getConfig
-    new AggregateProducer(
-      upStreamNames = config.getOrElse("upStreamNames",if(module.inputIds!=null) module.inputIds.asJava else null).asInstanceOf[java.util.List[String]],
-      config.getOrElse("analyzer",module.getId).asInstanceOf[String],
-      config.get("sql") match {case Some(sql) => sql.asInstanceOf[String] case None => null },
-      config.get("strategy") match {case Some(strategy)=> Class.forName(strategy.asInstanceOf[String]).newInstance().asInstanceOf[PartitionStrategy] case None => null}
-    )
-  }
-}
-
-object KafkaSinkExecutor{
-//  val LOG = LoggerFactory.getLogger(classOf[KafkaSinkExecutor])
-}
-
-/**
-  * @todo currently support single topic now, should support topic selector
-  * @param config
-  */
-case class KafkaSinkExecutor(config:Map[String,AnyRef]) extends ((AnyRef) => Unit) with Serializable{
-
-  val TOPIC_KEY = "topic"
-  def getDefaultProps = {
-    val props = new Properties()
-    props.putAll(Map[String,AnyRef](
-      "bootstrap.servers" -> "localhost:6667",
-      "acks" -> "all",
-      "retries" -> "3",
-      "batch.size" -> "16384",
-      "linger.ms" -> "1",
-      "buffer.memory" -> "33554432",
-      "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
-      "value.serializer" -> classOf[org.apache.eagle.dataproc.impl.storm.kafka.JsonSerializer].getCanonicalName
-    ).asJava)
-    props
-  }
-
-  @transient var initialized:AtomicBoolean = new AtomicBoolean(false)
-  @transient var producer:KafkaProducer[String,AnyRef] = null
-  @transient var topic:String = null
-  @transient var timeoutMs:Long = 3000
-
-  private def init():Unit = {
-    if(this.initialized != null && this.initialized.get()){
-//      LOG.info("Already initialized, skip")
-      return
-    }
-    this.initialized = new AtomicBoolean(false)
-    if (producer != null) {
-//      LOG.info(s"Closing $producer")
-      producer.close()
-    }
-//    LOG.info("Initializing and creating Kafka Producer")
-    if (config.contains(TOPIC_KEY)) {
-      this.topic = config.get(TOPIC_KEY).get.asInstanceOf[String]
-    } else {
-      throw new IllegalStateException("topic is not defined")
-    }
-    val props = getDefaultProps
-    props.putAll((config - TOPIC_KEY).asJava)
-    producer = new KafkaProducer[String, AnyRef](props)
-//    LOG.info(s"Created new KafkaProducer: $producer")
-    initialized.set(true)
-  }
-
-  override def apply(value: AnyRef): Unit = {
-    if(initialized == null || !initialized.get()) init()
-    if(topic == null) throw new IllegalStateException("topic is not defined")
-    val isList = value.isInstanceOf[java.util.List[AnyRef]]
-    val record: ProducerRecord[String, AnyRef] = if(isList){
-      val list = value.asInstanceOf[java.util.List[AnyRef]]
-      if(list.size() == 1) {
-        new ProducerRecord[String, AnyRef](topic, value.asInstanceOf[java.util.List[AnyRef]].get(0))
-      }else{
-        new ProducerRecord[String, AnyRef](topic, value)
-      }
-    }else{
-      new ProducerRecord[String, AnyRef](topic,value)
-    }
-    producer.send(record,new Callback(){
-      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
-        if(exception!=null){
-//          LOG.error(s"Failed to send record $value to topic: $topic",exception)
-          throw new IllegalStateException(s"Failed to send record $value to topic: $topic",exception)
-        }
-      }
-    })
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala
deleted file mode 100644
index 7e1f4cf..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline.parser
-
-import com.typesafe.config.Config
-import org.apache.eagle.stream.pipeline.utils.ParseException
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-import scala.collection.mutable
-
-
-class DataFlow {
-  def getInputs(id: String):Seq[Processor] = {
-    this.getConnectors.filter(_.to.equals(id)).map(c => getProcessor(c.from).get)
-  }
-
-  /**
-    * Connect if not, do nothing if already connected
-    *
-    * @param from
-    * @param to
-    */
-  def connect(from: String, to: String): Unit = {
-    val connector = Connector(from,to,null)
-    var exists = false
-    connectors.foreach(c => exists = (c.from.equals(from) && c.to.equals(to)) || exists)
-    if(!exists) addConnector(connector)
-  }
-
-  private var processors = mutable.Map[String,Processor]()
-  private var connectors = mutable.Seq[Connector]()
-  def setProcessors(processors:Seq[Processor]):Unit = {
-    processors.foreach{module =>
-      this.processors.put(module.getId,module)
-    }
-  }
-  def setProcessors(processors:mutable.Map[String,Processor]):Unit = {
-    this.processors = processors
-  }
-  def setConnectors(connectors:Seq[Connector]):Unit = {
-    connectors.foreach(connector =>{
-      this.connectors :+= connector
-    })
-  }
-  def addProcessor(module:Processor):Unit = {
-    if(contains(module)) throw new IllegalArgumentException(s"Duplicated processor id error, ${module.getId} has already been defined as ${getProcessor(module.getId)}")
-    processors.put(module.getId,module)
-  }
-
-  def contains(module:Processor):Boolean = processors.contains(module.getId)
-  def addConnector(connector:Connector):Unit = {
-    connectors :+= connector
-  }
-  def getProcessors:Seq[Processor] = processors.values.toSeq
-  def getProcessor(processorId:String):Option[Processor] = processors.get(processorId)
-  def getConnectors:Seq[Connector] = connectors
-}
-
-/**
-  * Stream Processor
-  *
-  * @param processorId
-  * @param processorType
-  * @param schema
-  * @param processorConfig
-  */
-case class Processor(var processorId:String = null,var processorType:String = null,var schema:Schema = null, var processorConfig:Map[String,AnyRef] = null) extends Serializable {
-  private[pipeline] var inputs:Seq[Processor] = null
-  private[pipeline] var inputIds:Seq[String] = null
-
-  def getId:String = processorId
-  def getType:String = processorType
-  def getConfig:Map[String,AnyRef] = processorConfig
-  def getSchema:Option[Schema] = if(schema == null) None else Some(schema)
-
-  /**
-    * @todo assume processorId as streamId
-    * @return
-    */
-  def streamId = processorId
-}
-
-case class Connector (from:String,to:String, config:Map[String,AnyRef]) extends Serializable{
-  import Connector._
-
-  def group:Option[String] = config.get(GROUP_FIELD).asInstanceOf[Option[String]]
-  def groupByFields:Option[Seq[String]] = config.get(GROUP_BY_FIELD_FIELD) match {
-    case Some(obj) => Some(obj.asInstanceOf[java.util.List[String]].asScala.toSeq)
-    case None => None
-  }
-  def groupByIndexes:Option[Seq[Int]] = config.get(GROUP_BY_INDEX_FIELD) match {
-    case Some(obj) => Some(obj.asInstanceOf[java.util.List[java.lang.Integer]].asScala.toSeq.map(Int.unbox(_)))
-    case None => None
-  }
-}
-
-object Connector{
-  val GROUP_FIELD = "grouping"
-  val GROUP_BY_FIELD_FIELD = "groupByField"
-  val GROUP_BY_INDEX_FIELD = "groupByIndex"
-}
-
-private [pipeline]
-object Processor {
-  val SCHEMA_FIELD:String = "schema"
-  val INPUTS_FIELD = "inputs"
-  def parse(processorId:String,processorType:String,context:Map[String,AnyRef], schemaSet:SchemaSet):Processor = {
-    val schema = context.get(SCHEMA_FIELD) match {
-      case Some(schemaDef) => schemaDef match {
-        case schemaId:String => schemaSet.get(schemaId).getOrElse {
-          throw new ParseException(s"Schema [$schemaId] is not found but referred by [$processorType:$processorId] in $context")
-        }
-        case schemaMap:java.util.HashMap[String,AnyRef] => Schema.parse(schemaMap.toMap)
-        case _ => throw new ParseException(s"Illegal value for schema: $schemaDef")
-      }
-      case None => null
-    }
-    val instance = new Processor(processorId,processorType,schema,context-SCHEMA_FIELD)
-    if(context.contains(INPUTS_FIELD)) instance.inputIds = context.get(INPUTS_FIELD).get.asInstanceOf[java.util.List[String]].asScala.toSeq
-    instance
-  }
-}
-
-
-trait DataFlowParser {
-  def parse(config:Config,schemaSet:SchemaSet = SchemaSet.empty()):DataFlow = {
-    val dataw = new DataFlow()
-    val map = config.root().unwrapped().toMap
-
-    // Parse processors and connectors
-    map.foreach(entry => {
-      parseSingle(entry._1,entry._2.asInstanceOf[java.util.HashMap[String,AnyRef]].toMap,dataw,schemaSet)
-    })
-    expand(dataw)
-    validate(dataw)
-    dataw
-  }
-
-  private def expand(datafw: DataFlow):Unit = {
-    datafw.getProcessors.foreach(proc =>{
-      if(proc.inputIds!=null) {
-        proc.inputIds.foreach(id => {
-          // connect if not
-          datafw.connect(id,proc.getId)
-        })
-      }
-      proc.inputs = datafw.getInputs(proc.getId)
-      proc.inputIds = proc.inputs.map(_.getId)
-    })
-  }
-
-  private def
-  validate(pipeline:DataFlow): Unit ={
-    def checkModuleExists(id:String): Unit ={
-      pipeline.getProcessor(id).orElse {
-        throw new ParseException(s"Stream [$id] is not defined before being referred")
-      }
-    }
-
-    pipeline.getConnectors.foreach {connector =>
-      checkModuleExists(connector.from)
-      checkModuleExists(connector.to)
-    }
-  }
-
-  private def
-  parseSingle(identifier:String,config:Map[String,AnyRef],dataflow:DataFlow, schemaSet: SchemaSet):Unit = {
-    Identifier.parse(identifier) match {
-      case DefinitionIdentifier(processorType) => {
-        config foreach {entry =>
-          dataflow.addProcessor(Processor.parse(entry._1, processorType,entry._2.asInstanceOf[java.util.HashMap[String, AnyRef]].toMap,schemaSet))
-        }
-      }
-      case ConnectionIdentifier(fromIds,toId) => fromIds.foreach { fromId =>
-        if(fromId.eq(toId)) throw new ParseException(s"Can't connect $fromId to $toId")
-        dataflow.addConnector(Connector(fromId,toId,config))
-      }
-      case _ => ???
-    }
-  }
-}
-
-
-private[pipeline] trait Identifier
-
-private[pipeline] case class DefinitionIdentifier(moduleType: String) extends Identifier
-private[pipeline] case class ConnectionIdentifier(fromIds: Seq[String], toId: String) extends Identifier
-
-private[pipeline] object Identifier {
-  val ConnectorFlag = "->"
-  val UnitFlagSplitPattern = "\\|"
-  val UnitFlagChar = "|"
-  val ConnectorPattern = s"([\\w-|\\s]+)\\s+$ConnectorFlag\\s+([\\w-_]+)".r
-  def parse(identifier: String): Identifier = {
-    // ${id} -> ${id}
-    ConnectorPattern.findFirstMatchIn(identifier) match {
-      case Some(matcher) => {
-        if(matcher.groupCount != 2){
-          throw new ParseException(s"Illegal connector definition: $identifier")
-        }else{
-          val source = matcher.group(1)
-          val destination = matcher.group(2)
-          if(source.contains(UnitFlagChar)) {
-            val sources = source.split(UnitFlagSplitPattern).toSeq
-            ConnectionIdentifier(sources.map{_.trim()},destination)
-          }else{
-            ConnectionIdentifier(Seq(source),destination)
-          }
-        }
-      }
-      case None => {
-        if(identifier.contains(ConnectorFlag)) throw new ParseException(s"Failed to parse $identifier")
-        DefinitionIdentifier(identifier)
-      }
-    }
-  }
-}
-
-object DataFlow extends DataFlowParser
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
deleted file mode 100644
index cc1e009..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-package org.apache.eagle.stream.pipeline.parser
-
-import java.io.File
-
-import com.typesafe.config.{Config, ConfigFactory}
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-case class Pipeline(config:Config,dataflow:DataFlow)
-
-/**
- * Pipeline configuration parser
- *
- * For example:
- *
- * {{{
- * <code>
- * {
- *    config {
- *      execution.environment.config = someValue
- *    }
- *    schema {
- *      metricStreamSchema {
- *        metric: string
- *        value: double
- *        timestamp: long
- *      }
- *    }
- *    dataflow {
- *      kafkaSource.source1 {
- *        schema = "metricStreamSchema"
- *      }
- *      kafkaSource.source2 {
- *        schema = {
- *          metric: string
- *          value: double
- *          timestamp: long
- *        }
- *      }
- *    }
- * }
- * </code>
- * }}}
- */
-trait PipelineParser{
-  val CONFIG_FIELD = "config"
-  val SCHEMA_FIELD = "schema"
-  val DATAFLOW_FIELD = "dataflow"
-
-  def parse(config:Config):Pipeline = {
-    if(config.isEmpty) throw new IllegalArgumentException("Pipeline configuration is empty")
-    var pConfig:Config = ConfigFactory.empty()
-    var pSchemaSet:SchemaSet = SchemaSet.empty()
-    var pDataflow:DataFlow = null
-    if(config.hasPath(CONFIG_FIELD)) pConfig = config.getConfig(CONFIG_FIELD)
-    if(config.hasPath(SCHEMA_FIELD)) pSchemaSet = SchemaSet.parse(config.getConfig(SCHEMA_FIELD))
-    if(config.hasPath(DATAFLOW_FIELD)) pDataflow = DataFlow.parse(config.getConfig(DATAFLOW_FIELD),pSchemaSet)
-
-    // Merge pipeline config over base config
-    val baseConfig =ConfigFactory.load()
-    pConfig = if(pConfig!=null) pConfig.withFallback(baseConfig) else baseConfig
-    new Pipeline(pConfig,pDataflow)
-  }
-
-  def parseString(config:String):Pipeline = parse(ConfigFactory.parseString(config))
-  def parseResource(resource:String):Pipeline = {
-    // TODO: Load environment, currently hard-code with storm
-    if(resource.startsWith("/") || resource.startsWith("./")){
-      parse(ConfigFactory.parseFile(new File(resource)))
-    } else{
-      parse(ConfigFactory.parseResourcesAnySyntax(getClass.getClassLoader,resource))
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala
deleted file mode 100644
index 7653f9e..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline.parser
-
-import com.typesafe.config.Config
-
-import scala.collection.JavaConversions.mapAsScalaMap
-import scala.collection.mutable
-
-class Field(name:String) extends Serializable{
-  def getName:String = name
-}
-
-case class StringField(name:String) extends Field(name)
-case class LongField(name:String) extends Field(name)
-case class IntegerField(name:String) extends Field(name)
-case class BooleanField(name:String) extends Field(name)
-case class FloatField(name:String) extends Field(name)
-case class DoubleField(name:String) extends Field(name)
-case class DatetimeField(name:String,format:String) extends Field(name)
-
-object Field{
-  def string(name:String) = StringField(name)
-  def long(name:String) = LongField(name)
-  def integer(name:String) = IntegerField(name)
-  def boolean(name:String) = BooleanField(name)
-  def float(name:String) = FloatField(name)
-  def double(name:String) = DoubleField(name)
-  def datetime(name:String)(format:String) = DatetimeField(name,format)
-
-  def apply(name:String,typeName:String):Field = typeName match {
-    case "string" => string(name)
-    case "long" => long(name)
-    case "integer" => integer(name)
-    case "boolean" => boolean(name)
-    case "float" => float(name)
-    case "double" => double(name)
-    case _ => throw new UnsupportedOperationException(s"""Unknown attribute type $typeName for attribute "$name"""")
-  }
-}
-
-case class Schema(attributes:Seq[Field]) extends Serializable{
-  def getAttribute(attributeName:String):Option[Field]={
-    if(attributes != null){
-      attributes.find(_.getName.eq(attributeName))
-    }else None
-  }
-
-  def indexOfAttribute(attributeName:String):Int = {
-    if(attributes != null){
-      attributes.indexWhere(_.getName.eq(attributeName))
-    } else -1
-  }
-
-  @throws[IllegalArgumentException]
-  def indexOfAttributeOrException(attributeName:String):Int = {
-    if(attributes != null){
-      attributes.indexWhere(_.getName.eq(attributeName))
-    } else throw new IllegalArgumentException(s"Attribute [$attributeName] is not found in stream $this")
-  }
-}
-
-object Schema{
-  def parse(map:Map[String,AnyRef]):Schema = {
-    new Schema(map.keys.map {attributeName =>
-      map(attributeName) match{
-        case simpleType:String => Field(attributeName,simpleType)
-        case complexType:java.util.Map[String,AnyRef] => throw new IllegalStateException(s"ComplexType attribute definition is not supported yet [$attributeName : $complexType] ")
-        case otherType@_ => throw new IllegalStateException(s"Illegal attribute definition $attributeName : $otherType")
-      }
-    }.toSeq)
-  }
-
-  /**
-   * @param attributes support string, symbol, Attribute and so on.
-   * @return
-   */
-  def build(attributes:Seq[AnyRef]):Schema = {
-    new Schema(attributes.map{ a:AnyRef =>
-      a match {
-        case t:(String, AnyRef) => {
-          t._2 match {
-            case v:String => Field(t._1,v)
-            case v:Symbol => Field(t._1,v.name)
-            case _ => throw new UnsupportedOperationException(s"Illegal attribute definition $a")
-          }
-        }
-        case t:Field => t
-        case _ => throw new UnsupportedOperationException(s"Illegal attribute definition $a")
-      }
-    })
-  }
-}
-
-private[pipeline] class StreamUndefinedException(message:String = "stream is not defined",throwable: Throwable = null) extends Exception(message,throwable)
-
-private[pipeline] class SchemaSet {
-  private val processorSchemaCache = mutable.Map[String,Schema]()
-  def set(schemaId:String,schema:Schema):Unit = {
-    if(processorSchemaCache.contains(schemaId)) throw new IllegalArgumentException(
-      s"""
-         |Failed to define schema for $schemaId as $schema,
-         |because it has been defined as ${processorSchemaCache(schemaId)},
-         |please call updateSchema(processorId,schema) instead
-       """)
-    processorSchemaCache.put(schemaId,schema)
-  }
-  def get(schemaId:String):Option[Schema] = processorSchemaCache.get(schemaId)
-}
-
-private[pipeline] object SchemaSet{
-  def empty() = new SchemaSet()
-  /**
-   * For example:
-   *
-   * <code>
-   *    {
-   *      metricStream {
-   *        metric: string
-   *        value: double
-   *        timestamp: long
-   *      }
-   *    }
-   * </code>
-   * @param schemaConfig
-   * @return
-   */
-  def parse(schemaConfig:Map[String,AnyRef]):SchemaSet = {
-    val schemas = new SchemaSet()
-    schemaConfig.foreach(entry =>{
-      schemas.set(entry._1,Schema.parse(entry._2.asInstanceOf[java.util.HashMap[String,AnyRef]].toMap))
-    })
-    schemas
-  }
-
-  def parse(config:Config):SchemaSet = parse(config.root().unwrapped().asInstanceOf[java.util.HashMap[String,AnyRef]].toMap)
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala
deleted file mode 100644
index 1c964e1..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package org.apache.eagle.stream.pipeline.runner
-
-import java.util
-
-import com.typesafe.config.Config
-import org.apache.commons.cli.{CommandLine, Options}
-import org.apache.eagle.dataproc.util.ConfigOptionParser
-import org.apache.eagle.datastream.ExecutionEnvironments.storm
-import org.apache.eagle.datastream.core.ExecutionEnvironment
-import org.apache.eagle.stream.pipeline.compiler.PipelineCompiler
-import org.apache.eagle.stream.pipeline.parser.PipelineParser
-import org.slf4j.LoggerFactory
-
-import scala.reflect.runtime.{universe => ru}
-
-trait PipelineRunner extends PipelineParser with PipelineCompiler{
-  import PipelineCLIOptionParser._
-  private val LOG = LoggerFactory.getLogger("PipelineCLIOptionParser")
-  def submit[T <: ExecutionEnvironment](resource:String)(implicit typeTag:ru.TypeTag[T]) =
-    compile(parseResource(resource)).submit[T]
-  def submit(resource:String,clazz:Class[ExecutionEnvironment]) =
-    compile(parseResource(resource)).submit(clazz)
-  def submit(pipelineConfig:Config,clazz:Class[ExecutionEnvironment]) =
-    compile(parse(pipelineConfig)).submit(clazz)
-  def submit[T <: ExecutionEnvironment](pipelineConfig:Config)(implicit typeTag: ru.TypeTag[T]) =
-    compile(parse(pipelineConfig)).submit[T]
-
-  def apply(args:Array[String]):PipelineRunner = {
-    new ConfigOptionParser().load(args)
-    this
-  }
-
-  def main(args: Array[String]): Unit = {
-    val config = PipelineCLIOptionParser.load(args)
-    if(config.hasPath(PIPELINE_CONFIG_KEY)) {
-      submit[storm](config.getString(PIPELINE_CONFIG_KEY))
-    } else {
-      sys.error(
-        s"""
-           |Error: --$PIPELINE_OPT_KEY is required
-           |$USAGE
-         """.stripMargin)
-    }
-  }
-}
-
-private[runner] object PipelineCLIOptionParser extends ConfigOptionParser{
-  val LOG = LoggerFactory.getLogger("PipelineCLIOptionParser")
-  val PIPELINE_OPT_KEY="pipeline"
-
-  val PIPELINE_CONFIG_KEY="pipeline.config"
-
-  val CONFIG_OPT_KEY="conf"
-  val CONFIG_RESOURCE_KEY="config.resource"
-  val CONFIG_FILE_KEY="config.file"
-  val USAGE =
-    """
-      |Usage: java org.apache.eagle.stream.pipeline.Pipeline [options]
-      |
-      |Options:
-      |   --pipeline   pipeline configuration
-      |   --conf       common configuration
-      |   --env        storm (support spark, etc later)
-      |   --mode       local/remote/cluster
-    """.stripMargin
-  
-  override protected def options(): Options = {
-    val options = super.options()
-    options.addOption(PIPELINE_OPT_KEY, true, "Pipeline configuration file")
-    options.addOption(CONFIG_OPT_KEY, true, "Config properties file")
-    options
-  }
-
-  override protected def parseCommand(cmd: CommandLine): util.Map[String, String] = {
-    val map = super.parseCommand(cmd)
-
-    if (cmd.hasOption(PIPELINE_OPT_KEY)) {
-      val pipelineConf = cmd.getOptionValue(PIPELINE_OPT_KEY)
-      if(pipelineConf == null){
-        throw new IllegalArgumentException(s"--$PIPELINE_OPT_KEY should not be null")
-      } else {
-        LOG.info(s"Set $PIPELINE_CONFIG_KEY as $pipelineConf")
-        map.put(PIPELINE_CONFIG_KEY, pipelineConf)
-      }
-    }
-
-    if(cmd.hasOption(CONFIG_OPT_KEY)){
-      val commonConf = cmd.getOptionValue(CONFIG_OPT_KEY)
-      if(commonConf.contains("/")){
-        LOG.info(s"Set $CONFIG_FILE_KEY as $commonConf")
-        map.put(CONFIG_FILE_KEY, commonConf)
-      }else {
-        LOG.info(s"Set $CONFIG_RESOURCE_KEY $commonConf")
-        map.put(CONFIG_RESOURCE_KEY, commonConf)
-      }
-    }
-    map
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala
deleted file mode 100644
index 1102a33..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline.utils
-
-class ParseException(message:String) extends Exception(message)
-class CompileException(message:String) extends Exception(message)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
deleted file mode 100644
index 3e8f69c..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
+++ /dev/null
@@ -1,34 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-	"eagleProps" : {
-		"dataJoinPollIntervalSec" : 30
-		"mailHost" : "smtp.server.host"
-		"mailSmtpPort":"25"
-		"mailDebug" : "true"
-		"eagleService": {
-			"host": "localhost"
-			"port": 9099
-			"username": "admin"
-			"password": "secret"
-		}
-	}
-	"dynamicConfigSource" : {
-		"enabled" : true
-		"initDelayMillis" : 0
-		"delayMillis" : 30000
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh
deleted file mode 100644
index 4250681..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# ./eagle-pipeline.sh --pipeline [pipeline-definition-config] --config [base-configuration]
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties
deleted file mode 100644
index c8a4f46..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
deleted file mode 100644
index 6dddf7a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
+++ /dev/null
@@ -1,131 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-	config {
-		alertExecutorConfigs {
-			defaultAlertExecutor  {
-				"parallelism" : 1
-				"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-				"needValidation" : "true"
-			}
-		}
-		eagleProps  {
-			"site" : "sandbox"
-			"application": "eventSource"
-			"dataJoinPollIntervalSec" : 30
-			"mailHost" : "mail.host.com"
-			"mailSmtpPort":"25"
-			"mailDebug" : "true"
-			"eagleService": {
-				"host": "localhost"
-				"port": 38080
-				"username": "admin"
-				"password": "secret"
-			}
-		}
-		dynamicConfigSource  {
-			"enabled" : true
-			"initDelayMillis" : 0
-			"delayMillis" : 30000
-		}
-	}
-
-	schema {
-		metricStreamSchema {
-			metric: string
-			value: double
-			timestamp: long
-		}
-	}
-
-	dataflow {
-		KafkaSource.metricStream_1 {
-			parallism = 1000
-			topic = "metric_event_1"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-			schema = "metricStreamSchema"
-		}
-
-		KafkaSource.metricStream_2 {
-			parallism = 1000
-			topic = "metric_event_2"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.metricStream_3{
-			parallism = 1000
-			topic = "metric_event_3"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-			schema = "metricStreamSchema"
-		}
-
-		KafkaSink.metricStore {
-			schema = "metricStreamSchema"
-			parallism = 1000
-			topic = "metric_event_2"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		Alert.alert {
-//			upStreamNames = [metricStream_1,metricStream_2]
-			alertExecutorId = defaultAlertExecutor
-		}
-
-//		aggregator.aggreator {
-//			executor = "aggreationExecutor"
-//		}
-
-		metricStream_1|metricStream_2 -> alert {
-			group = shuffle
-		}
-
-		metricStream_1|metricStream_2 -> metricStore {
-			group = shuffle
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf
deleted file mode 100644
index 5e3561a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf
+++ /dev/null
@@ -1,93 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-	config {
-		envContextConfig {
-			"env" : "storm"
-			"mode" : "local"
-			"topologyName" : "dsl-based-topology"
-		}
-		eagleProps  {
-			"site" : "sandbox"
-			"application": "eventSource"
-			"dataJoinPollIntervalSec" : 30
-			"mailHost" : "mail.host.com"
-			"mailSmtpPort":"25"
-			"mailDebug" : "true"
-			"eagleService": {
-				"host": "localhost"
-				"port": 38080
-				"username": "admin"
-				"password": "secret"
-			}
-		}
-		dynamicConfigSource  {
-			"enabled" : true
-			"initDelayMillis" : 0
-			"delayMillis" : 30000
-		}
-	}
-
-	dataflow {
-		KafkaSource.metricStream_1 {
-			parallism = 1000
-			topic = "metric_event_1"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.metricStream_2 {
-			parallism = 1000
-			topic = "metric_event_2"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.metricStream_3{
-			parallism = 1000
-			topic = "metric_event_3"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		Console.printer {}
-
-		metricStream_1|metricStream_2|metricStream_3 -> printer {
-			grouping = shuffle
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf
deleted file mode 100644
index b1c1955..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf
+++ /dev/null
@@ -1,152 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-	config {
-		envContextConfig {
-			"env" : "storm"
-			"mode" : "local"
-			"topologyName" : "dsl-based-topology"
-			"parallelismConfig" : {
-				"kafkaMsgConsumer" : 1
-			}
-		}
-		alertExecutorConfigs {
-			defaultAlertExecutor  {
-				"parallelism" : 1
-				"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-				"needValidation" : "true"
-			}
-		}
-		eagleProps  {
-			"site" : "sandbox"
-			"application": "HADOOP"
-			"dataJoinPollIntervalSec" : 30
-			"mailHost" : "atom.corp.ebay.com"
-			"mailSmtpPort":"25"
-			"mailDebug" : "true"
-			"eagleService": {
-				"host": "localhost"
-				"port": 38080
-				"username": "admin"
-				"password": "secret"
-			}
-		}
-		dynamicConfigSource  {
-			"enabled" : true
-			"initDelayMillis" : 0
-			"delayMillis" : 30000
-		}
-	}
-
-	schema {
-//		JmxStreamOne {
-//			attributes {
-//				metric: string
-//				value: double
-//				timestamp: long
-//			}
-//			alertExecutorId = [defaultAlertExecutor,anotherAlertExecutor]
-//		}
-		JmxStreamOne {
-			metric: string
-			value: double
-			timestamp: long
-		}
-	}
-
-	dataflow {
-		KafkaSource.JmxStreamOne {
-			parallism = 1000
-			topic = "metric_event_1"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.JmxStreamTwo {
-			parallism = 1000
-			topic = "metric_event_2"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.JmxStreamThree{
-			parallism = 1000
-			topic = "metric_event_3"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		Console.printer {
-			format = "%s"
-		}
-
-		KafkaSink.metricStore {
-			topic = "metric_event_persist"
-		}
-
-//		KafkaSink.alertStore {
-//			"topic" = "alert_persist"
-//			"bootstrap.servers" = "localhost:6667"
-//		}
-
-		Alert.alert {
-			inputs = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
-
-			upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
-			alertExecutorId = defaultAlertExecutor
-		}
-
-//		Aggregator.aggreator {
-//			upStreamNames = []
-//			analyzerId = ""
-//			cepQl = ""
-//			strategy = ""
-//		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> alert {
-			grouping = shuffle
-		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
-			grouping = shuffle
-		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
-			grouping = shuffle
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
deleted file mode 100644
index 9c35456..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
+++ /dev/null
@@ -1,125 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-	config {
-		envContextConfig {
-			"env" : "storm"
-			"mode" : "local"
-			"topologyName" : "dsl-topology"
-			"parallelismConfig" : {
-				"kafkaMsgConsumer" : 1
-			},
-		}
-		alertExecutorConfigs {
-			defaultAlertExecutor  {
-				"parallelism" : 1
-				"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-				"needValidation" : "true"
-			}
-		}
-		eagleProps {
-			"site" : "sandbox"
-			"application": "HADOOP"
-		}
-	}
-	
-	dataflow {
-		KafkaSource.JmxStreamOne {
-			parallism = 1000
-			topic = "metric_event_1"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.JmxStreamTwo {
-			parallism = 1000
-			topic = "metric_event_2"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.JmxStreamThree{
-			parallism = 1000
-			topic = "metric_event_3"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		Console.printer {
-			format = "%s"
-		}
-
-		KafkaSink.metricStore {
-			topic = "metric_event_persist"
-		}
-
-//		KafkaSink.aggSink {
-//			topic = "metric_agg_persist"
-//		}
-
-		Alert.defaultAlertExecutor {
-			// upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
-			// alertExecutorId = defaultAlertExecutor
-		}
-
-		Aggregator.Aggregator{ sql = """
-				define stream JmxStreamOne(eagleAlertContext object, timestamp long, metric string, value double);
-				@info(name = "query")
-				from JmxStreamOne[value > 100.0] select * insert into outputStream;
-		"""}
-
-
-		JmxStreamOne -> Aggregator {}
-
-		Aggregator -> printer {}
-
-//		Aggregator -> aggregatedSink{
-//			grouping = shuffle
-//		}
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> defaultAlertExecutor {
-			grouping = shuffle
-		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
-			grouping = shuffle
-		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
-			grouping = shuffle
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf
deleted file mode 100644
index 85b5334..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf
+++ /dev/null
@@ -1,110 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-	config {
-		envContextConfig {
-			"env" : "storm"
-			"mode" : "cluster"
-			"topologyName" : "dynamical-topology-5"
-			"parallelismConfig" : {
-				"kafkaMsgConsumer" : 1
-			},
-			"nimbusHost":"sandbox.hortonworks.com",
-			"nimbusThriftPort":6627
-		}
-		alertExecutorConfigs {
-			defaultAlertExecutor  {
-				"parallelism" : 1
-				"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-				"needValidation" : "true"
-			}
-		}
-		eagleProps {
-			"site" : "sandbox"
-			"application": "HADOOP"
-		}
-	}
-	
-	dataflow {
-		KafkaSource.JmxStreamOne {
-			parallism = 1000
-			topic = "metric_event_1"
-			zkConnection = "sandbox.hortonworks.com:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "sandbox.hortonworks.com"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.JmxStreamTwo {
-			parallism = 1000
-			topic = "metric_event_2"
-			zkConnection = "sandbox.hortonworks.com:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "sandbox.hortonworks.com"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.JmxStreamThree{
-			parallism = 1000
-			topic = "metric_event_3"
-			zkConnection = "sandbox.hortonworks.com:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "sandbox.hortonworks.com"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		Console.printer {
-			format = "%s"
-		}
-
-		KafkaSink.metricStore {
-			"topic" = "metric_event_persist"
-			"bootstrap.servers" = "sandbox.hortonworks.com:6667"
-		}
-
-		Alert.defaultAlertExecutor {
-			// upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
-			// alertExecutorId = defaultAlertExecutor
-		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> defaultAlertExecutor {
-			grouping = shuffle
-		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
-			grouping = shuffle
-		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
-			grouping = shuffle
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
deleted file mode 100644
index 7b552da..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.eagle.stream.pipeline
-
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{FlatSpec, Matchers}
-
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-class ConfigSpec extends FlatSpec with Matchers{
-  "Config" should "be overrode correctly" in {
-    val conf1 = ConfigFactory.parseString(
-      """
-        |value=1
-      """.stripMargin)
-    val conf2 = ConfigFactory.parseString(
-      """
-        |value=2
-      """.stripMargin)
-    val conf3 = conf1.withFallback(conf2)
-    val conf4 = conf2.withFallback(conf1)
-    conf3.getInt("value") should be(1)
-    conf4.getInt("value") should be(2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
deleted file mode 100644
index e63280a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline
-
-import com.typesafe.config.ConfigFactory
-import org.apache.eagle.stream.pipeline.parser.{ConnectionIdentifier, DataFlow, DefinitionIdentifier, Identifier}
-import org.scalatest.{FlatSpec, Matchers}
-
-class DataFlowSpec extends FlatSpec with Matchers {
-  val dataFlowConfig =
-    """
-       |{
-       |	kafkaSource.metric_event_1 {
-       |    schema {
-       |      metric: string
-       |      timestamp: long
-       |      value: double
-       |    }
-       |		parallism = 1000
-       |		topic = "metric_event_1"
-       |		zkConnection = "localhost:2181"
-       |		zkConnectionTimeoutMS = 15000
-       |		consumerGroupId = "Consumer"
-       |		fetchSize = 1048586
-       |		transactionZKServers = "localhost"
-       |		transactionZKPort = 2181
-       |		transactionZKRoot = "/consumers"
-       |		transactionStateUpdateMS = 2000
-       |		deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-       |	}
-       |
-       |	kafkaSource.metric_event_2 {
-       |		schema = {
-       |      metric: string
-       |      timestamp: long
-       |      value: double
-       |    }
-       |		parallism = 1000
-       |		topic = "metric_event_2"
-       |		zkConnection = "localhost:2181"
-       |		zkConnectionTimeoutMS = 15000
-       |		consumerGroupId = "Consumer"
-       |		fetchSize = 1048586
-       |		transactionZKServers = "localhost"
-       |		transactionZKPort = 2181
-       |		transactionZKRoot = "/consumers"
-       |		transactionStateUpdateMS = 2000
-       |		deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-       |	}
-       |
-       |	kafkaSink.metricStore {}
-       |
-       |	alert.alert {
-       |		executor = "alertExecutor"
-       |	}
-       |
-       |	aggregator.aggreator {
-       |		executor = "aggreationExecutor"
-       |	}
-       |
-       |	metric_event_1|metric_event_2 -> alert {}
-       |	metric_event_1|metric_event_2 -> metricStore {}
-       |}
-     """.stripMargin
-
-  DataFlow.getClass.toString should "parse dataflow end-to-end correctly" in {
-    val config = ConfigFactory.parseString(dataFlowConfig)
-    config should not be null
-    val dataflow = DataFlow.parse(config)
-    dataflow should not be null
-    dataflow.getConnectors.size should be(4)
-    dataflow.getProcessors.size should be(5)
-  }
-
-  Identifier.getClass.toString should "parse as definition" in {
-    val defId = Identifier.parse("kafka").asInstanceOf[DefinitionIdentifier]
-    defId.moduleType should be("kafka")
-  }
-
-  Identifier.getClass.toString should "parse node1 -> node2 as connection" in {
-    val id = Identifier.parse("node1 -> node2").asInstanceOf[ConnectionIdentifier]
-    id.fromIds.size should be(1)
-  }
-
-  Identifier.getClass.toString should "parse node1|node2 -> node3" in {
-    val id = Identifier.parse("node1|node2 -> node3").asInstanceOf[ConnectionIdentifier]
-    id.fromIds.size should be(2)
-  }
-
-  Identifier.getClass.toString should "parse node1|node2|node3 -> node4 as connection" in {
-    val id = Identifier.parse("node1|node2|node3 -> node4").asInstanceOf[ConnectionIdentifier]
-    id.fromIds.size should be(3)
-  }
-
-  Identifier.getClass.toString should "parse node1 | node2 | node3 -> node4 as connection" in {
-    val id = Identifier.parse("node1 | node2 | node3 -> node4").asInstanceOf[ConnectionIdentifier]
-    id.fromIds.size should be(3)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
deleted file mode 100644
index 5e2007d..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline
-
-import org.apache.eagle.datastream.ExecutionEnvironments.storm
-import org.scalatest.{FlatSpec, Matchers}
-
-class PipelineSpec extends FlatSpec with Matchers{
-  "Pipeline" should "parse successfully from pipeline_1.conf" in {
-    val pipeline = Pipeline.parseResource("pipeline_1.conf")
-    pipeline should not be null
-  }
-
-  "Pipeline" should "compile successfully from pipeline_2.conf" in {
-    val pipeline = Pipeline.parseResource("pipeline_2.conf")
-    pipeline should not be null
-    val stream = Pipeline.compile(pipeline)
-    stream should not be null
-    // Throw ClassNotFoundException when submit in unit test
-    // stream.submit[storm]
-  }
-}
-
-/**
- * Storm LocalCluster throws ClassNotFoundException when submit in unit test, so here submit in App
- */
-object PipelineSpec_2 extends App{
-  val pipeline = Pipeline(args).parseResource("pipeline_2.conf")
-  val stream = Pipeline.compile(pipeline)
-  stream.submit[storm]
-}
-
-object PipelineSpec_3 extends App {
-  Pipeline(args).submit[storm]("pipeline_3.conf")
-}
-
-object PipelineSpec_4 extends App {
-  Pipeline(args).submit[storm]("pipeline_4.conf")
-}
-
-object PipelineSpec_5 extends App {
-  Pipeline(args).submit[storm]("pipeline_5.conf")
-}
-
-object PipelineCLISpec extends App{
-  Pipeline.main(args)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml b/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml
deleted file mode 100644
index 38f55b8..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml
+++ /dev/null
@@ -1,145 +0,0 @@
-<?xml version="1.0"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>eagle</groupId>
-    <artifactId>eagle-data-process-parent</artifactId>
-    <version>0.3.0</version>
-      <relativePath>../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>eagle-stream-process-api</artifactId>
-  <name>eagle-stream-process-api</name>
-
-  <dependencies>
-      <dependency>
-        <groupId>org.apache.storm</groupId>
-        <artifactId>storm-core</artifactId>
-        <exclusions>
-            <exclusion>
-                <groupId>ch.qos.logback</groupId>
-                <artifactId>logback-classic</artifactId>
-            </exclusion>
-            <exclusion>
-                <groupId>log4j</groupId>
-                <artifactId>log4j</artifactId>
-            </exclusion>
-            <exclusion>
-                <groupId>org.slf4j</groupId>
-                <artifactId>log4j-over-slf4j</artifactId>
-            </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
-          <groupId>org.apache.storm</groupId>
-          <artifactId>storm-kafka</artifactId>
-          <exclusions>
-              <exclusion>
-                  <groupId>ch.qos.logback</groupId>
-                  <artifactId>logback-classic</artifactId>
-              </exclusion>
-              <exclusion>
-                  <groupId>log4j</groupId>
-                  <artifactId>log4j</artifactId>
-              </exclusion>
-              <exclusion>
-                  <groupId>org.slf4j</groupId>
-                  <artifactId>log4j-over-slf4j</artifactId>
-              </exclusion>
-          </exclusions>
-      </dependency>
-      <dependency>
-          <groupId>eagle</groupId>
-          <artifactId>eagle-alert-process</artifactId>
-          <version>${project.version}</version>
-      </dependency>
-      <dependency>
-          <groupId>eagle</groupId>
-          <artifactId>eagle-stream-process-base</artifactId>
-          <version>${project.version}</version>
-      </dependency>
-      <dependency>
-          <groupId>org.scala-lang</groupId>
-          <artifactId>scala-reflect</artifactId>
-      </dependency>
-      <dependency>
-          <groupId>org.scalatest</groupId>
-          <artifactId>scalatest_${scala.version}</artifactId>
-          <scope>test</scope>
-      </dependency>
-
-	<dependency>
-		<groupId>org.apache.kafka</groupId>
-		<artifactId>kafka-clients</artifactId>
-		<version>${kafka-clients.version}</version>
-	</dependency>
-  </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.scala-tools</groupId>
-                <artifactId>maven-scala-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>scala-compile-first</id>
-                        <phase>process-resources</phase>
-                        <goals>
-                            <goal>add-source</goal>
-                            <goal>compile</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>scala-test-compile</id>
-                        <phase>process-test-resources</phase>
-                        <goals>
-                            <goal>testCompile</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <configuration>
-                    <skipTests>true</skipTests>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.scalatest</groupId>
-                <artifactId>scalatest-maven-plugin</artifactId>
-                <configuration>
-                    <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-                    <junitxml>.</junitxml>
-                    <filereports>TestSuite.txt</filereports>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>test</id>
-                        <goals>
-                            <goal>test</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java
deleted file mode 100644
index b17c192..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate;
-
-import org.apache.eagle.policy.ResultRender;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.policy.PolicyPartitioner;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.eagle.policy.executor.PolicyProcessExecutor;
-
-/**
- * @since Dec 16, 2015
- *
- */
-public class AggregateExecutor extends PolicyProcessExecutor<AggregateDefinitionAPIEntity, AggregateEntity> {
-
-	private static final long serialVersionUID = 1L;
-
-	private ResultRender<AggregateDefinitionAPIEntity, AggregateEntity> render = new AggregateResultRender();
-
-	public AggregateExecutor(String executorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq,
-			PolicyDefinitionDAO<AggregateDefinitionAPIEntity> alertDefinitionDao, String[] sourceStreams) {
-		super(executorId, partitioner, numPartitions, partitionSeq, alertDefinitionDao, sourceStreams,
-				AggregateDefinitionAPIEntity.class);
-	}
-
-	@Override
-	public ResultRender<AggregateDefinitionAPIEntity, AggregateEntity> getResultRender() {
-		return render;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
deleted file mode 100644
index 5093685..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
-import org.apache.eagle.policy.DefaultPolicyPartitioner;
-import org.apache.eagle.policy.PolicyPartitioner;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
-import org.apache.eagle.policy.executor.IPolicyExecutor;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * @since Dec 16, 2015
- *
- */
-public class AggregateExecutorFactory {
-
-	private static final Logger LOG = LoggerFactory.getLogger(AggregateExecutorFactory.class);
-	
-	private AggregateExecutorFactory() {}
-	public static final AggregateExecutorFactory Instance = new AggregateExecutorFactory();
-
-
-	public IPolicyExecutor[] createExecutors(List<String> streamNames, String cql) throws Exception {
-		int numPartitions = 1; //loadExecutorConfig(config, executorId, partitionerCls);
-
-		IPolicyExecutor[] executors = new IPolicyExecutor[numPartitions];
-		String[] upStreams = streamNames.toArray(new String[0]);
-		for (int i = 0; i < numPartitions ; i++ ) {
-			executors[i] = new SimpleAggregateExecutor(upStreams, cql, "siddhiCEPEngine", i, numPartitions);
-		}
-
-		return executors;
-	}
-
-	public IPolicyExecutor[] createExecutors(Config config, List<String> streamNames, String executorId) throws Exception {
-		StringBuilder partitionerCls = new StringBuilder(DefaultPolicyPartitioner.class.getCanonicalName());
-        int numPartitions = loadExecutorConfig(config, executorId, partitionerCls);
-		PolicyDefinitionDAO<AggregateDefinitionAPIEntity> policyDefDao = new PolicyDefinitionEntityDAOImpl<>(
-				new EagleServiceConnector(config), Constants.AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME);
-		return newAggregateExecutors(policyDefDao, streamNames, executorId, numPartitions, partitionerCls.toString());
-	}
-	
-	@SuppressWarnings("unchecked")
-	private int loadExecutorConfig(Config config, String executorId, StringBuilder partitionerCls) {
-		int numPartitions = 0;
-		String aggregateExecutorConfigsKey = "aggregateExecutorConfigs";
-        if(config.hasPath(aggregateExecutorConfigsKey)) {
-            Map<String, ConfigValue> analyzeExecutorConfigs = config.getObject(aggregateExecutorConfigsKey);
-            if(analyzeExecutorConfigs !=null && analyzeExecutorConfigs.containsKey(executorId)) {
-                Map<String, Object> alertExecutorConfig = (Map<String, Object>) analyzeExecutorConfigs.get(executorId).unwrapped();
-                int parts = 0;
-                if(alertExecutorConfig.containsKey("parallelism")) parts = (int) (alertExecutorConfig.get("parallelism"));
-                numPartitions = parts == 0 ? 1 : parts;
-                if(alertExecutorConfig.containsKey("partitioner")) {
-                	partitionerCls.setLength(0);
-                	partitionerCls.append((String) alertExecutorConfig.get("partitioner"));
-                }
-            }
-        }
-        return numPartitions;
-	}
-
-//	private List<String> findStreamNames(Config config, String executorId, String dataSource) throws Exception {
-//		// Get map from alertExecutorId to alert stream
-//		// (dataSource) => Map[alertExecutorId:String,streamName:List[String]]
-//		List<String> streamNames = new ArrayList<String>();
-//		// FIXME : here we reuse the executor definition. But the name alert is not ambiguous now.
-//		AlertExecutorDAOImpl alertExecutorDAO = new AlertExecutorDAOImpl(new EagleServiceConnector(config));
-//		List<AlertExecutorEntity> alertExecutorEntities = alertExecutorDAO.findAlertExecutor(dataSource,
-//				executorId);
-//		for (AlertExecutorEntity entity : alertExecutorEntities) {
-//			streamNames.add(entity.getTags().get(Constants.STREAM_NAME));
-//		}
-//		return streamNames;
-//	}
-	
-	private AggregateExecutor[] newAggregateExecutors(PolicyDefinitionDAO<AggregateDefinitionAPIEntity> alertDefDAO,
-			List<String> sourceStreams, String executorID, int numPartitions, String partitionerCls)
-					throws Exception {
-		LOG.info("Creating aggregator executors with executorID: " + executorID + ", numPartitions: "
-				+ numPartitions + ", Partition class is: " + partitionerCls);
-
-		PolicyPartitioner partitioner = (PolicyPartitioner) Class.forName(partitionerCls).newInstance();
-		AggregateExecutor[] alertExecutors = new AggregateExecutor[numPartitions];
-		String[] _sourceStreams = sourceStreams.toArray(new String[sourceStreams.size()]);
-
-		for (int i = 0; i < numPartitions; i++) {
-			alertExecutors[i] = new AggregateExecutor(executorID, partitioner, numPartitions, i, alertDefDAO,
-					_sourceStreams);
-		}
-		return alertExecutors;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java
deleted file mode 100644
index 986885a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.ResultRender;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Created on 12/29/15.
- */
-public class AggregateResultRender implements ResultRender<AggregateDefinitionAPIEntity, AggregateEntity>, Serializable {
-
-
-    @Override
-    public AggregateEntity render(Config config,
-                                  List<Object> rets,
-                                  PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> siddhiAlertContext,
-                                  long timestamp) {
-        AggregateEntity result = new AggregateEntity();
-        for (Object o : rets) {
-            result.add(o);
-        }
-        return result;
-    }
-}



Mime
View raw message