eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [4/5] incubator-eagle git commit: EAGLE-66 Typesafe Streaming DSL and KeyValue based Grouping
Date Wed, 16 Dec 2015 06:01:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala
deleted file mode 100644
index df763b9..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import java.io.{FileInputStream, File}
-
-import backtype.storm.generated.StormTopology
-import backtype.storm.utils.Utils
-import backtype.storm.{Config, LocalCluster, StormSubmitter}
-import org.yaml.snakeyaml.Yaml
-import storm.trident.spout.RichSpoutBatchExecutor
-
-case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesafe.config.Config) extends AbstractTopologyExecutor {
-  @throws(classOf[Exception])
-  def execute {
-    val localMode: Boolean = config.getString("envContextConfig.mode").equalsIgnoreCase("local")
-    val conf: Config = new Config
-    conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024))
-    conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8))
-    conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32))
-    conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384))
-    conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384))
-
-    if(config.hasPath("envContextConfig.stormConfigFile")) {
-      //val inputFileStream = {
-      //  StormTopologyExecutorImpl.getClass.getClassLoader.getResourceAsStream(config.getString("envContextConfig.stormConfigFile"))
-      //}
-      val file = new File(config.getString("envContextConfig.stormConfigFile"))
-      if(file.exists()) {
-        val inputFileStream = new FileInputStream(file)
-        val yaml = new Yaml()
-        try {
-          val stormConf = yaml.load(inputFileStream).asInstanceOf[java.util.LinkedHashMap[String, Object]]
-          if(stormConf != null) conf.putAll(stormConf)
-        } catch {
-          case _ => ()
-        } finally {
-          if(inputFileStream != null) inputFileStream.close()
-        }
-      }
-    }
-
-    val topologyName = config.getString("envContextConfig.topologyName")
-    if (!localMode) {
-      StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology)
-    }
-    else {
-      val cluster: LocalCluster = new LocalCluster
-      cluster.submitTopology(topologyName, conf, topology)
-      while(true) {
-        try {
-          Utils.sleep(Integer.MAX_VALUE)
-        }
-        catch {
-          case _ => () // Do nothing
-        }
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
deleted file mode 100644
index 913939a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream
-
-import java.util
-
-import com.typesafe.config.Config
-import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl
-import org.apache.eagle.executor.AlertExecutorCreationUtils
-import org.apache.eagle.service.client.EagleServiceConnector
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
-
-/**
- * The constraints for alert is:
- * 1. only 3 StreamProducers can be put immediately before MapProducer, FlatMapProducer, StreamUnionProducer
- * 2. For StreamUnionProducer, the only supported unioned producers are MapProducer and FlatMapProducer
- * 3. the output for MapProducer and FlatMapProducer is 2-field tuple, key and value, key is string, value has to be SortedMap
- * 4. the framework will wrapper original MapProducer and FlatMapProducer to emit 3-field tuple, {key, streamName and value}
- * 5. the framework will automatically partition traffic with first field
- *
- *
- * 2 steps
- * step 1: wrapper previous StreamProducer with one more field "streamName"
- * step 2: partition alert executor by policy partitioner class
- */
-
-class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
-  val LOG = LoggerFactory.getLogger(classOf[StreamAlertExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]): Unit ={
-    val iter = dag.iterator()
-    val toBeAddedEdges = new ListBuffer[StreamConnector]
-    val toBeRemovedVertex = new ListBuffer[StreamProducer]
-    while(iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        val child = edge.to
-        onIteration(toBeAddedEdges, toBeRemovedVertex, dag, current, child)
-      })
-    }
-    // add back edges
-    toBeAddedEdges.foreach(e => {
-      dag.addVertex(e.from)
-      dag.addVertex(e.to)
-      dag.addEdge(e.from, e.to, e)
-    })
-    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
-  }
-
-  def onIteration(toBeAddedEdges: ListBuffer[StreamConnector], toBeRemovedVertex: ListBuffer[StreamProducer],
-               dag: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, child: StreamProducer): Unit = {
-    child match {
-      case AlertStreamSink(id, upStreamNames, alertExecutorId, withConsumer, strategy) => {
-        /**
-         * step 1: wrapper previous StreamProducer with one more field "streamName"
-         * for AlertStreamSink, we check previous StreamProducer and replace that
-         */
-        val newStreamProducers = new ListBuffer[StreamProducer]
-        current match {
-          case StreamUnionProducer(id, others) => {
-            val incomingEdges = dag.incomingEdgesOf(current)
-            incomingEdges.foreach(e => newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, e.from, upStreamNames.get(0)))
-            var i: Int = 1
-            others.foreach(o => {
-              newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, o, upStreamNames.get(i))
-              i += 1
-            })
-          }
-          case _: FlatMapProducer[AnyRef, AnyRef] => {
-            newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
-          }
-          case _: MapProducer => {
-            newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
-          }
-          case s: StreamProducer if dag.inDegreeOf(s) == 0 => {
-            newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
-          }
-          case p@_ => throw new IllegalStateException(s"$p can not be put before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and MapProducer are supported")
-        }
-
-        /**
-         * step 2: partition alert executor by policy partitioner class
-         */
-        val alertExecutors = AlertExecutorCreationUtils.createAlertExecutors(config, new AlertDefinitionDAOImpl(new EagleServiceConnector(config)), upStreamNames, alertExecutorId)
-        var alertProducers = new scala.collection.mutable.MutableList[StreamProducer]
-        alertExecutors.foreach(exec => {
-          val t = FlatMapProducer(UniqueId.incrementAndGetId(), exec).withName(exec.getAlertExecutorId() + "_" + exec.getPartitionSeq())
-          t.setConfig(config)
-          t.setGraph(dag)
-          alertProducers += t
-          if (strategy == null) {
-             newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).groupBy(Seq(0)))
-          }
-          else {
-            newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).customGroupBy(strategy))
-          }
-        })
-
-        // remove AlertStreamSink
-        toBeRemovedVertex += child
-
-        // add alert consumer if necessary
-        if (withConsumer) {
-          AlertExecutorConsumerUtils.setupAlertConsumers(toBeAddedEdges, alertProducers.toList)
-        }
-      }
-      case _ =>
-    }
-  }
-
-  private def replace(toBeAddedEdges: ListBuffer[StreamConnector], toBeRemovedVertex: ListBuffer[StreamProducer],
-                      dag: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, upStreamName: String) : StreamProducer= {
-    var newsp: StreamProducer = null
-    current match {
-      case _: FlatMapProducer[AnyRef, AnyRef] => {
-        val mapper = current.asInstanceOf[FlatMapProducer[_, _]].mapper
-        mapper match {
-          case a: JavaStormStreamExecutor[EagleTuple] => {
-            val newmapper = new JavaStormExecutorForAlertWrapper(a.asInstanceOf[JavaStormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
-            newsp = FlatMapProducer(UniqueId.incrementAndGetId(), newmapper)
-            newsp.setGraph(dag)
-            newsp.setConfig(config)
-          }
-          case b: StormStreamExecutor[EagleTuple] => {
-            val newmapper = StormExecutorForAlertWrapper(b.asInstanceOf[StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
-            newsp = FlatMapProducer(UniqueId.incrementAndGetId(), newmapper)
-            newsp.setGraph(dag)
-            newsp.setConfig(config)
-          }
-          case _ => throw new IllegalArgumentException
-        }
-        // remove old StreamProducer and replace that with new StreamProducer
-        val incomingEdges = dag.incomingEdgesOf(current)
-        incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp))
-        val outgoingEdges = dag.outgoingEdgesOf(current)
-        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to))
-        toBeRemovedVertex += current
-      }
-      case _: MapProducer => {
-        val mapper = current.asInstanceOf[MapProducer].fn
-        val newfun: (AnyRef => AnyRef) = {
-          a => mapper(a) match {
-            case scala.Tuple2(x1, x2) => (x1, upStreamName, x2)
-            case _ => throw new IllegalArgumentException
-          }
-        }
-        current match {
-          case MapProducer(id, 2, fn) => newsp = MapProducer(UniqueId.incrementAndGetId(), 3, newfun)
-          case _ => throw new IllegalArgumentException
-        }
-        val incomingEdges = dag.incomingEdgesOf(current)
-        incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp))
-        val outgoingEdges = dag.outgoingEdgesOf(current)
-        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to))
-        toBeRemovedVertex += current
-      }
-      case s: StreamProducer if dag.inDegreeOf(s) == 0 => {
-        val fn:(AnyRef => AnyRef) = {
-          n => {
-            n match {
-              case scala.Tuple3 => n
-              case scala.Tuple2(x1,x2) => (x1,upStreamName,x2)
-              case scala.Tuple1(x1) => (if(x1 == null) null else x1.hashCode(),upStreamName,x1)
-              case _ => (if(n == null) null else n.hashCode(),upStreamName,n)
-            }
-          }
-        }
-        newsp = MapProducer(UniqueId.incrementAndGetId(),3,fn)
-        toBeAddedEdges += StreamConnector(current,newsp)
-        val outgoingEdges = dag.outgoingEdgesOf(current)
-        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp,e.to))
-      }
-      case _ => throw new IllegalArgumentException("Only FlatMapProducer and MapProducer can be replaced before AlertStreamSink")
-    }
-    newsp
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAppDSL.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAppDSL.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAppDSL.scala
deleted file mode 100644
index c6a3d3d..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAppDSL.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import backtype.storm.topology.base.BaseRichSpout
-import com.typesafe.config._
-import org.apache.eagle.dataproc.impl.storm.AbstractStormSpoutProvider
-import org.apache.eagle.dataproc.util.ConfigOptionParser
-
-import scala.reflect.runtime.universe._
-
-/**
- * @since  11/6/15
- */
-trait ConfigContext{
-  def set(config:Config)
-  def config:Config
-
-  def set[T<:AnyRef](key:String,value:T): Unit = {
-    set(config.withValue(key,ConfigValueFactory.fromAnyRef(value)))
-  }
-
-  /**
-   *
-   * @param key config key
-   * @param default default value
-   * @tparam T return type
-   * @return
-   */
-  def get[T](key:String,default:T=null)(implicit tag:TypeTag[T]):T = {
-    if(config.hasPath(key)) {
-      get(key)
-    } else default
-  }
-
-  def get[T](key:String)(implicit tag: TypeTag[T]):T = tag.tpe match {
-    case STRING_TYPE => config.getString(key).asInstanceOf[T]
-    case TypeTag.Double => config.getDouble(key).asInstanceOf[T]
-    case TypeTag.Long => config.getLong(key).asInstanceOf[T]
-    case TypeTag.Int => config.getInt(key).asInstanceOf[T]
-    case TypeTag.Byte => config.getBytes(key).asInstanceOf[T]
-    case TypeTag.Boolean => config.getBoolean(key).asInstanceOf[T]
-    case NUMBER_TYPE => config.getNumber(key).asInstanceOf[T]
-    case OBJECT_TYPE => config.getObject(key).asInstanceOf[T]
-    case VALUE_TYPE => config.getValue(key).asInstanceOf[T]
-    case ANY_REF_TYPE => config.getAnyRef(key).asInstanceOf[T]
-    case INT_LIST_TYPE => config.getIntList(key).asInstanceOf[T]
-    case DOUBLE_LIST_TYPE => config.getDoubleList(key).asInstanceOf[T]
-    case BOOL_LIST_TYPE => config.getBooleanList(key).asInstanceOf[T]
-    case LONG_LIST_TYPE => config.getLongList(key).asInstanceOf[T]
-    case _ => throw new UnsupportedOperationException(s"$tag is not supported yet")
-  }
-
-  val STRING_TYPE = typeOf[String]
-  val NUMBER_TYPE = typeOf[Number]
-  val INT_LIST_TYPE = typeOf[List[Int]]
-  val BOOL_LIST_TYPE = typeOf[List[Boolean]]
-  val DOUBLE_LIST_TYPE = typeOf[List[Double]]
-  val LONG_LIST_TYPE = typeOf[List[Double]]
-  val OBJECT_TYPE = typeOf[ConfigObject]
-  val VALUE_TYPE = typeOf[ConfigValue]
-  val ANY_REF_TYPE = typeOf[AnyRef]
-}
-
-/**
- * Stream APP DSL
- * @tparam E
- */
-trait StreamApp[+E<:ExecutionEnvironment] extends App with ConfigContext{
-  private var _executed = false
-  private var _config:Config = null
-
-  override def config:Config = _config
-
-  override def set(config:Config) = _config = config
-
-  def env:E
-
-  def execute() {
-    env.execute()
-    _executed = true
-  }
-
-  override def main(args: Array[String]): Unit = {
-    _config = new ConfigOptionParser().load(args)
-    super.main(args)
-    if(!_executed) execute()
-  }
-}
-
-trait StormStreamApp extends StreamApp[StormExecutionEnvironment]{
-  private var _env:StormExecutionEnvironment = null
-  def source(sourceProvider: AbstractStormSpoutProvider) = {
-    val spout = sourceProvider.getSpout(config)
-    env.newSource(spout)
-  }
-
-  def source(spout:BaseRichSpout) = env.newSource(spout)
-
-  override def env:StormExecutionEnvironment = {
-    if(_env == null){
-      _env = ExecutionEnvironmentFactory.getStorm(config)
-    }
-    _env
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
deleted file mode 100644
index 083a5af..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream
-
-import org.apache.eagle.partition.PartitionStrategy
-
-case class StreamConnector(from: StreamProducer, to: StreamProducer) {
-  var groupByFields : Seq[Int] = Nil
-  var customGroupBy : PartitionStrategy = null
-
-  def groupBy(fields : Seq[Int]) : StreamConnector = {
-    groupByFields = fields
-    this
-  }
-
-  def customGroupBy(custom : PartitionStrategy) : StreamConnector = {
-    customGroupBy = custom
-    this
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamDAGExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamDAGExpansion.scala
deleted file mode 100644
index 7e15233..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamDAGExpansion.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream
-
-import com.typesafe.config.Config
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-abstract class StreamDAGExpansion(config: Config) {
-  def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector])
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
deleted file mode 100644
index caf71e3..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream
-
-import com.typesafe.config.Config
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
-
-/**
- * Replace GroupByProducer(Vertex) with StreamConnector (Edge)
- * @param config context configuration
- */
-class StreamGroupbyExpansion(config: Config) extends StreamDAGExpansion(config){
-  val LOG = LoggerFactory.getLogger(classOf[StreamGroupbyExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = {
-    val iter = dag.iterator()
-    var toBeAddedEdges = new ListBuffer[StreamConnector]
-    var toBeRemovedVertex = new ListBuffer[StreamProducer]
-    while(iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        val child = edge.to
-        child match {
-          case p : GroupByProducer => {
-            dag.outgoingEdgesOf(p).foreach(c2 => {
-              if (p.fields != Nil) {
-                toBeAddedEdges += StreamConnector(current, c2.to).groupBy(p.fields)
-              }
-              else if (p.partitionStrategy != null) {
-                toBeAddedEdges += StreamConnector(current, c2.to).customGroupBy(p.partitionStrategy)
-              }
-              else toBeAddedEdges += StreamConnector(current, c2.to);
-            })
-            toBeRemovedVertex += p
-          }
-          case _ =>
-        }
-      })
-    }
-
-    // add back edges
-    toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e))
-    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamNameExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamNameExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamNameExpansion.scala
deleted file mode 100644
index 5208a97..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamNameExpansion.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream
-
-import com.typesafe.config.Config
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-/**
- * to set name for each StreamProducer
- * 1. if name is given programatically, then use this name
- * 2. otherwise use name generated by scala internally
- */
-class StreamNameExpansion(config: Config) extends StreamDAGExpansion(config){
-  val LOG = LoggerFactory.getLogger(classOf[StreamNameExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = {
-    val iter = dag.iterator()
-    while(iter.hasNext){
-      val sp = iter.next()
-      sp.name = NodeNameSelector(sp).getName
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamParallelismConfigExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamParallelismConfigExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamParallelismConfigExpansion.scala
deleted file mode 100644
index 0264ca6..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamParallelismConfigExpansion.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream
-
-import java.util.regex.Pattern
-
-import com.typesafe.config.{ConfigValue, ConfigObject, Config}
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-import scala.collection.JavaConverters._
-
-class StreamParallelismConfigExpansion(config: Config) extends StreamDAGExpansion(config){
-  val LOG = LoggerFactory.getLogger(classOf[StreamParallelismConfigExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = {
-    val map = getParallelismMap(config)
-    val iter = dag.iterator()
-    while(iter.hasNext){
-      val streamProducer = iter.next()
-      if(streamProducer.name != null) {
-        map.foreach(tuple => {
-          tuple._1.matcher(streamProducer.name).find() match {
-            case true => streamProducer.parallelism = tuple._2
-            case false =>
-          }
-        })
-      }
-    }
-  }
-
-  private def getParallelismMap(config: Config) : Map[Pattern, Int]= {
-    val parallelismConfig: ConfigObject = config.getObject("envContextConfig.parallelismConfig")
-    LOG.info("Found parallelismConfig ? " + (if (parallelismConfig == null) "no" else "yes"))
-    parallelismConfig.asScala.toMap map {
-      case (name, value) => (Pattern.compile(name), value.asInstanceOf[ConfigValue].unwrapped().asInstanceOf[Int])
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
deleted file mode 100644
index 796e486..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream
-
-import java.util
-import java.util.concurrent.atomic.AtomicInteger
-
-import backtype.storm.topology.base.BaseRichSpout
-import com.typesafe.config.Config
-import org.apache.eagle.partition.PartitionStrategy
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-/**
- * StreamProducer is the base class for all other concrete StreamProducer
- * It defines high level API for user to organize data stream flow
- *
- * StreamProducer is independent of execution environment
- */
-
-trait StreamProducer{
-  var name: String = null
-  var parallelism: Int = 1
-  var graph: DirectedAcyclicGraph[StreamProducer, StreamConnector] = null
-  var config: Config = null
-
-  private def incrementAndGetId() = UniqueId.incrementAndGetId()
-
-  def setGraph(graph: DirectedAcyclicGraph[StreamProducer, StreamConnector]): Unit = this.graph = graph
-  def getGraph: DirectedAcyclicGraph[StreamProducer, StreamConnector] = graph
-  def setConfig(config: Config) : Unit = this.config = config
-  def getConfig: Config = config
-
-  def filter(fn : AnyRef => Boolean): StreamProducer ={
-    val ret = FilterProducer(incrementAndGetId(), fn)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  def flatMap[T, R](mapper : FlatMapper[T, R]) : StreamProducer = {
-    val ret = FlatMapProducer(incrementAndGetId(), mapper)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  def map1(fn : AnyRef => AnyRef) : StreamProducer = {
-    val ret = MapProducer(incrementAndGetId(), 1, fn)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  def map2(fn : AnyRef => AnyRef) : StreamProducer = {
-    val ret = MapProducer(incrementAndGetId(), 2, fn)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  def map3(fn : AnyRef => AnyRef) : StreamProducer = {
-    val ret = MapProducer(incrementAndGetId(), 3, fn)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  def map4(fn : AnyRef => AnyRef) : StreamProducer = {
-    val ret = MapProducer(incrementAndGetId(), 4, fn)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  /**
-   * starting from 0, groupby operator would be upon edge of the graph
-   */
-  def groupBy(fields : Int*) : StreamProducer = {
-    // validate each field index is greater or equal to 0
-    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
-    val ret = GroupByProducer(incrementAndGetId(), fields)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  //groupBy java version, starting from 1
-  def groupBy(fields : java.util.List[Integer]) : StreamProducer = {
-    // validate each field index is greater or equal to 0
-    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
-    val ret = GroupByProducer(incrementAndGetId(), fields.asScala.toSeq.asInstanceOf[Seq[Int]])
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  def customGroupBy(strategy : PartitionStrategy) : StreamProducer = {
-    val ret = GroupByProducer(incrementAndGetId(), strategy)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  def streamUnion(others : util.List[StreamProducer]) : StreamProducer = {
-    streamUnion(others);
-  }
-
-  def streamUnion(others : Seq[StreamProducer]) : StreamProducer = {
-    val ret = StreamUnionProducer(incrementAndGetId(), others)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  def streamUnion(other : StreamProducer) : StreamProducer = {
-    streamUnion(Seq(other))
-  }
-
-  /**
-   * alert is always sink of data flow
-   */
-  def alertWithConsumer(upStreamNames: util.List[String], alertExecutorId : String) = {
-    alert(upStreamNames, alertExecutorId, true)
-  }
-
-  def alertWithoutConsumer(upStreamNames: util.List[String], alertExecutorId : String) = {
-    alert(upStreamNames, alertExecutorId, false)
-  }
-
-  def alert(upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true, strategy : PartitionStrategy=null ) = {
-    val ret = AlertStreamSink(incrementAndGetId(), upStreamNames, alertExecutorId, withConsumer, strategy)
-    hookupDAG(graph, this, ret)
-  }
-
-  def alertWithConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
-    alert(util.Arrays.asList(upStreamName), alertExecutorId, true, strategy)
-  }
-
-  def alertWithConsumer(upStreamName: String, alertExecutorId : String): Unit ={
-    alert(util.Arrays.asList(upStreamName), alertExecutorId, true)
-  }
-
-  def alertWithoutConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
-    alert(util.Arrays.asList(upStreamName), alertExecutorId, false, strategy)
-  }
-
-  def alertWithoutConsumer(upStreamName: String, alertExecutorId : String): Unit ={
-    alert(util.Arrays.asList(upStreamName), alertExecutorId, false)
-  }
-
-  def hookupDAG(graph: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, next: StreamProducer) = {
-    current.getGraph.addVertex(next)
-    current.getGraph.addEdge(current, next, StreamConnector(current, next))
-    passOnContext(current, next)
-  }
-
-  private def passOnContext(current: StreamProducer, next: StreamProducer): Unit ={
-    next.graph = current.graph
-    next.config = current.config
-  }
-
-  /**
-   * can be set by programatically or by configuration
-   */
-  def withParallelism(parallelism : Int) : StreamProducer = {
-    this.parallelism = parallelism
-    this
-  }
-
-  def withName(name : String) : StreamProducer = {
-    this.name = name
-    this
-  }
-}
-
-case class FilterProducer(id: Int, fn : AnyRef => Boolean) extends StreamProducer
-
-case class FlatMapProducer[T, R](id: Int, var mapper: FlatMapper[T, R]) extends StreamProducer {
-  override def toString() = mapper.toString + "_" + id
-}
-
-case class MapProducer(id: Int, numOutputFields : Int, var fn : AnyRef => AnyRef) extends StreamProducer
-
-case class GroupByProducer(id: Int, fields : Seq[Int], partitionStrategy: PartitionStrategy) extends StreamProducer
-
-object GroupByProducer {
-  def apply(id: Int, fields: Seq[Int]) = new GroupByProducer(id, fields, null)
-  def apply(id: Int, partitionStrategy : PartitionStrategy) = new GroupByProducer(id, Nil, partitionStrategy)
-}
-
-case class StreamUnionProducer(id: Int, others: Seq[StreamProducer]) extends StreamProducer
-
-case class StormSourceProducer(id: Int, source : BaseRichSpout) extends StreamProducer{
-  var numFields : Int = 0
-  /**
-    * rename outputfields to f0, f1, f2, ...
-   * if one spout declare some field names, those fields names will be modified
-   * @param n
-   */
-  def renameOutputFields(n : Int): StormSourceProducer ={
-    this.numFields = n
-    this
-  }
-}
-
-case class AlertStreamSink(id: Int, upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true, strategy: PartitionStrategy=null) extends StreamProducer
-
-object UniqueId{
-  val id : AtomicInteger = new AtomicInteger(0);
-  def incrementAndGetId() : Int = {
-    id.incrementAndGet()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamUnionExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamUnionExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamUnionExpansion.scala
deleted file mode 100644
index 83a83fe..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamUnionExpansion.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream
-
-import com.typesafe.config.Config
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
-
-/**
- * union operator should be expanded
- */
-class StreamUnionExpansion(config: Config) extends StreamDAGExpansion(config){
-  val LOG = LoggerFactory.getLogger(classOf[StreamUnionExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = {
-    val iter = dag.iterator()
-    var toBeAddedEdges = new ListBuffer[StreamConnector]
-    var toBeRemovedVertex = new ListBuffer[StreamProducer]
-    while(iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        val child = edge.to
-        val groupByFields = edge.groupByFields;
-        child match {
-          case StreamUnionProducer(id, others) => {
-            dag.outgoingEdgesOf(child).foreach(c2 => {
-              toBeAddedEdges += StreamConnector(current, c2.to).groupBy(groupByFields)
-              others.foreach(o => {
-                toBeAddedEdges += StreamConnector(o, c2.to).groupBy(groupByFields)
-              })
-            })
-            toBeRemovedVertex += child
-          }
-          case _ =>
-        }
-      })
-    }
-
-    // add back edges
-    toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e))
-    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/UnionUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/UnionUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/UnionUtils.scala
deleted file mode 100644
index 02e211b..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/UnionUtils.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream
-
-import java.util
-
-import scala.collection.JavaConverters._
-
-object UnionUtils {
-  def join(producers : StreamProducer*) : StreamProducer = {
-    producers.head.streamUnion(producers.drop(1))
-  }
-
-  def join(producers : java.util.List[StreamProducer]) : StreamProducer = {
-    val newList = new util.ArrayList(producers)
-    val head = newList.get(0)
-    newList.remove(0)
-    head.streamUnion(newList.asScala);
-  }
-
-  def join(producers : List[StreamProducer]) : StreamProducer = {
-    val head = producers.head
-    head.streamUnion(producers.tail);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala
new file mode 100644
index 0000000..46f4738
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.core
+
+trait AbstractTopologyCompiler{
+  def buildTopology : AbstractTopologyExecutor
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala
new file mode 100644
index 0000000..1e1664a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.core
+
+trait AbstractTopologyExecutor {
+  def execute
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala
new file mode 100644
index 0000000..e3f3050
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.datastream.core
+
+import com.typesafe.config.{Config, _}
+
+import scala.reflect.runtime.universe._
+
+/**
+ * @since  12/4/15
+ */
+case class Configuration(private var config:Config) extends Serializable{
+  def get:Config = config
+
+  def set[T<:AnyRef](key:String,value:T): Unit = {
+    config = config.withValue(key,ConfigValueFactory.fromAnyRef(value))
+  }
+
+  /**
+   *
+   * @param key config key
+   * @param default default value
+   * @tparam T return type
+   * @return
+   */
+  def get[T](key:String,default:T=null)(implicit tag:TypeTag[T]):T = {
+    if(get.hasPath(key)) {
+      get(key)
+    } else default
+  }
+
+  def get[T](key:String)(implicit tag: TypeTag[T]):T = tag.tpe match {
+    case STRING_TYPE => config.getString(key).asInstanceOf[T]
+    case TypeTag.Double => get.getDouble(key).asInstanceOf[T]
+    case TypeTag.Long => get.getLong(key).asInstanceOf[T]
+    case TypeTag.Int => get.getInt(key).asInstanceOf[T]
+    case TypeTag.Byte => get.getBytes(key).asInstanceOf[T]
+    case TypeTag.Boolean => get.getBoolean(key).asInstanceOf[T]
+    case NUMBER_TYPE => get.getNumber(key).asInstanceOf[T]
+    case OBJECT_TYPE => get.getObject(key).asInstanceOf[T]
+    case VALUE_TYPE => get.getValue(key).asInstanceOf[T]
+    case ANY_REF_TYPE => get.getAnyRef(key).asInstanceOf[T]
+    case INT_LIST_TYPE => get.getIntList(key).asInstanceOf[T]
+    case DOUBLE_LIST_TYPE => get.getDoubleList(key).asInstanceOf[T]
+    case BOOL_LIST_TYPE => get.getBooleanList(key).asInstanceOf[T]
+    case LONG_LIST_TYPE => get.getLongList(key).asInstanceOf[T]
+    case _ => throw new UnsupportedOperationException(s"$tag is not supported yet")
+  }
+
+  val STRING_TYPE = typeOf[String]
+  val NUMBER_TYPE = typeOf[Number]
+  val INT_LIST_TYPE = typeOf[List[Int]]
+  val BOOL_LIST_TYPE = typeOf[List[Boolean]]
+  val DOUBLE_LIST_TYPE = typeOf[List[Double]]
+  val LONG_LIST_TYPE = typeOf[List[Double]]
+  val OBJECT_TYPE = typeOf[ConfigObject]
+  val VALUE_TYPE = typeOf[ConfigValue]
+  val ANY_REF_TYPE = typeOf[AnyRef]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
new file mode 100644
index 0000000..7394b75
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.datastream.core
+
+import com.typesafe.config.Config
+import org.apache.eagle.datastream.utils.GraphPrinter
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+/**
+ * @since 0.3.0
+ */
+trait ExecutionEnvironment {
+  def config:Configuration
+
+  /**
+   * Business logic DAG
+   * @return
+   */
+  def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]
+
+  /**
+   * Start to execute
+   */
+  def execute():Unit
+
+  /**
+   * Support Java Style Config
+   *
+   * @return
+   */
+  def getConfig:Config = config.get
+}
+
+/**
+ * @todo Use Configuration instead of Config
+ *
+ * @param conf
+ */
+abstract class ExecutionEnvironmentBase(private val conf:Config)  extends ExecutionEnvironment with StreamSourceBuilder {
+  implicit private val _dag = new DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]](classOf[StreamConnector[Any,Any]])
+  private val _config:Configuration = Configuration(conf)
+
+  override def dag = _dag
+  override def config = _config
+
+  override def execute(): Unit = {
+    implicit val i_conf = _config.get
+    StreamNameExpansion()
+    GraphPrinter.print(dag,message="Before expanded DAG ")
+    StreamAlertExpansion()
+    StreamUnionExpansion()
+    StreamGroupbyExpansion()
+    StreamParallelismConfigExpansion()
+    StreamNameExpansion()
+    GraphPrinter.print(dag,message="After expanded DAG ")
+
+    GraphPrinter.printDotDigraph(dag)
+
+    val streamDAG = StreamDAGTransformer.transform(dag)
+    execute(streamDAG)
+  }
+
+  protected def execute(dag: StreamDAG)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
new file mode 100644
index 0000000..84532dc
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.datastream.core
+
+import java.util
+
+import com.typesafe.config.Config
+import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl
+import org.apache.eagle.datastream._
+import org.apache.eagle.datastream.storm.StormExecutorForAlertWrapper
+import org.apache.eagle.datastream.utils.AlertExecutorConsumerUtils
+import org.apache.eagle.executor.AlertExecutorCreationUtils
+import org.apache.eagle.service.client.EagleServiceConnector
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * The constraints for alert is:
+ * 1. only 3 StreamProducers can be put immediately before MapProducer, FlatMapProducer, StreamUnionProducer
+ * 2. For StreamUnionProducer, the only supported unioned producers are MapProducer and FlatMapProducer
+ * 3. the output for MapProducer and FlatMapProducer is 2-field tuple, key and value, key is string, value has to be SortedMap
+ * 4. the framework will wrapper original MapProducer and FlatMapProducer to emit 3-field tuple, {key, streamName and value}
+ * 5. the framework will automatically partition traffic with first field
+ *
+ *
+ * 2 steps
+ * step 1: wrapper previous StreamProducer with one more field "streamName"
+ * step 2: partition alert executor by policy partitioner class
+ */
+
+case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
+  val LOG = LoggerFactory.getLogger(classOf[StreamAlertExpansion])
+
+  override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): Unit ={
+    val iter = dag.iterator()
+    val toBeAddedEdges = new ListBuffer[StreamConnector[Any,Any]]
+    val toBeRemovedVertex = new ListBuffer[StreamProducer[Any]]
+    while(iter.hasNext) {
+      val current = iter.next()
+      dag.outgoingEdgesOf(current).foreach(edge => {
+        val child = edge.to
+        onIteration(toBeAddedEdges, toBeRemovedVertex, dag, current, child)
+      })
+    }
+    // add back edges
+    toBeAddedEdges.foreach(e => {
+      dag.addVertex(e.from)
+      dag.addVertex(e.to)
+      dag.addEdge(e.from, e.to, e)
+    })
+    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
+  }
+
+  def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
+               dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], child: StreamProducer[Any]): Unit = {
+    child match {
+      case AlertStreamSink(upStreamNames, alertExecutorId, withConsumer,strategy) => {
+        /**
+         * step 1: wrapper previous StreamProducer with one more field "streamName"
+         * for AlertStreamSink, we check previous StreamProducer and replace that
+         */
+        val newStreamProducers = new ListBuffer[StreamProducer[Any]]
+        current match {
+          case StreamUnionProducer(others) => {
+            val incomingEdges = dag.incomingEdgesOf(current)
+            incomingEdges.foreach(e => newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, e.from, upStreamNames.get(0)))
+            var i: Int = 1
+            others.foreach(o => {
+              newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, o, upStreamNames.get(i))
+              i += 1
+            })
+          }
+          case _: FlatMapProducer[AnyRef, AnyRef] => {
+            newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+          }
+          case _: MapperProducer[AnyRef,AnyRef] => {
+            newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+          }
+          case s: StreamProducer[AnyRef] if dag.inDegreeOf(s) == 0 => {
+            newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+          }
+          case p@_ => throw new IllegalStateException(s"$p can not be put before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and MapProducer are supported")
+        }
+
+        /**
+         * step 2: partition alert executor by policy partitioner class
+         */
+        val alertExecutors = AlertExecutorCreationUtils.createAlertExecutors(config, new AlertDefinitionDAOImpl(new EagleServiceConnector(config)), upStreamNames, alertExecutorId)
+        var alertProducers = new scala.collection.mutable.MutableList[StreamProducer[Any]]
+        alertExecutors.foreach(exec => {
+          val t = FlatMapProducer(exec).nameAs(exec.getAlertExecutorId + "_" + exec.getPartitionSeq).initWith(dag,config,hook = false)
+          alertProducers += t
+          newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector[Any,Any](newsp, t,Seq(0)))
+          if (strategy == null) {
+             newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp,t,Seq(0)))
+          }
+          else {
+            newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp,t,strategy))
+          }
+        })
+
+        // remove AlertStreamSink
+        toBeRemovedVertex += child
+
+        // add alert consumer if necessary
+        if (withConsumer) {
+          AlertExecutorConsumerUtils.setupAlertConsumers(toBeAddedEdges, alertProducers.toList)
+        }
+      }
+      case _ =>
+    }
+  }
+
+  private def replace(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
+                      dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], upStreamName: String) : StreamProducer[Any]= {
+    var newsp: StreamProducer[Any] = null
+    current match {
+      case _: FlatMapProducer[AnyRef, AnyRef] => {
+        val mapper = current.asInstanceOf[FlatMapProducer[_, _]].mapper
+        mapper match {
+          case a: JavaStormStreamExecutor[EagleTuple] => {
+            val newmapper = new JavaStormExecutorForAlertWrapper(a.asInstanceOf[JavaStormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
+            newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false)
+          }
+          case b: StormStreamExecutor[EagleTuple] => {
+            val newmapper = StormExecutorForAlertWrapper(b.asInstanceOf[StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
+            newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false)
+          }
+          case _ => throw new IllegalArgumentException
+        }
+        // remove old StreamProducer and replace that with new StreamProducer
+        val incomingEdges = dag.incomingEdgesOf(current)
+        incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp))
+        val outgoingEdges = dag.outgoingEdgesOf(current)
+        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to))
+        toBeRemovedVertex += current
+      }
+      case _: MapperProducer[Any,Any] => {
+        val mapper = current.asInstanceOf[MapperProducer[Any,Any]].fn
+        val newfun: (Any => Any) = {
+          a => mapper(a) match {
+            case scala.Tuple2(x1, x2) => (x1, upStreamName, x2)
+            case _ => throw new IllegalArgumentException
+          }
+        }
+        current match {
+          case MapperProducer(2, fn) => newsp = MapperProducer(3, newfun)
+          case _ => throw new IllegalArgumentException
+        }
+        val incomingEdges = dag.incomingEdgesOf(current)
+        incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp))
+        val outgoingEdges = dag.outgoingEdgesOf(current)
+        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to))
+        toBeRemovedVertex += current
+      }
+      case s: StreamProducer[Any] if dag.inDegreeOf(s) == 0 => {
+        val fn:(AnyRef => AnyRef) = {
+          n => {
+            n match {
+              case scala.Tuple3 => n
+              case scala.Tuple2(x1,x2) => (x1,upStreamName,x2)
+              case scala.Tuple1(x1) => (if(x1 == null) null else x1.hashCode(),upStreamName,x1)
+              case _ => (if(n == null) null else n.hashCode(),upStreamName,n)
+            }
+          }
+        }
+        newsp = MapperProducer(3,fn)
+        toBeAddedEdges += StreamConnector(current,newsp)
+        val outgoingEdges = dag.outgoingEdgesOf(current)
+        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp,e.to))
+      }
+      case _ => throw new IllegalArgumentException("Only FlatMapProducer and MapProducer can be replaced before AlertStreamSink")
+    }
+    newsp
+  }
+}
+
+object StreamAlertExpansion{
+  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamAlertExpansion ={
+    val e = StreamAlertExpansion(config)
+    e.expand(dag)
+    e
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala
new file mode 100644
index 0000000..ce9d82c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala
@@ -0,0 +1,97 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream.core
+
+import org.apache.eagle.partition.PartitionStrategy
+
+abstract class StreamConnector[+T1 <: Any,+T2 <: Any](val from: StreamProducer[T1], val to: StreamProducer[T2]) extends Serializable
+
+case class ShuffleConnector[+T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2])
+  extends StreamConnector[T1,T2](from,to){
+  override def toString: String = "shuffleGroup"
+}
+
+case class GroupbyFieldsConnector[+T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2],groupByFields : Seq[Int])
+  extends StreamConnector[T1,T2](from,to){
+  override def toString: String = s"groupByFields( $groupByFields )"
+}
+
+case class GroupbyKeyConnector[T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2],keySelector: T1 => Any)
+  extends StreamConnector[T1,T2](from,to){
+  override def toString: String = s"groupByKey($keySelector)"
+}
+
+case class GroupbyStrategyConnector[+T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2],customGroupBy:PartitionStrategy)
+  extends StreamConnector[T1,T2](from,to){
+  override def toString: String = s"groupByStrategy( $customGroupBy )"
+}
+
+object StreamConnector{
+  /**
+   *
+   * @param from
+   * @param to
+   * @tparam T1
+   * @tparam T2
+   * @return
+   */
+  def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2]):ShuffleConnector[T1,T2] = ShuffleConnector(from,to)
+
+  /**
+   * Clone connector from old connector to apply to new processing element, return ShuffleConnector by default
+   *
+   * @param from
+   * @param to
+   * @param connector
+   * @tparam T1
+   * @tparam T2
+   * @return
+   */
+  def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2],connector: StreamConnector[Any,Any]):StreamConnector[T1,T2] = connector match {
+      case GroupbyFieldsConnector(_,_,fields) => GroupbyFieldsConnector[T1,T2](from,to,fields)
+      case GroupbyKeyConnector(_,_,keySelector) => GroupbyKeyConnector[T1,T2](from,to,keySelector)
+      case GroupbyStrategyConnector(_,_,strategy) => GroupbyStrategyConnector[T1,T2](from,to,strategy)
+      case null | ShuffleConnector(_,_) => ShuffleConnector[T1,T2](from,to)
+      case c@_ => throw new IllegalArgumentException(s"Unknown type of stream connector $c")
+  }
+
+  /**
+   *
+   * @param from
+   * @param to
+   * @param groupByFields
+   * @tparam T1
+   * @tparam T2
+   * @return
+   */
+  def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2],groupByFields : Seq[Int]):GroupbyFieldsConnector[T1,T2] = GroupbyFieldsConnector(from,to,groupByFields)
+
+  /**
+   *
+   * @param from
+   * @param to
+   * @param customGroupBy
+   * @tparam T1
+   * @tparam T2
+   * @return
+   */
+  def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2],customGroupBy: PartitionStrategy):GroupbyStrategyConnector[T1,T2] = GroupbyStrategyConnector(from,to,customGroupBy)
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
new file mode 100644
index 0000000..255f031
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
@@ -0,0 +1,68 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream.core
+
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+import scala.collection.JavaConverters._
+import scala.collection.{JavaConversions, mutable}
+
+/**
+ * wrapper of DAG, used for storm topology compiler
+ */
+class StreamDAG(val graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) extends StreamProducerGraph {
+  var nodeMap: mutable.Map[String, StreamProducer[Any]] = null
+
+  override def addEdge(from: StreamProducer[Any], to: StreamProducer[Any], streamConnector: StreamConnector[Any,Any]): Unit = {
+    graph.addEdge(from, to, streamConnector)
+  }
+
+  override def addVertex(producer: StreamProducer[Any]): Unit = {
+    graph.addVertex(producer)
+  }
+
+  override def iterator(): Iterator[StreamProducer[Any]] = {
+    JavaConversions.asScalaIterator(graph.iterator())
+  }
+
+  override def isSource(v: StreamProducer[Any]): Boolean = {
+    graph.inDegreeOf(v) match {
+      case 0 => true
+      case _ => false
+    }
+  }
+
+  override def outgoingEdgesOf(v: StreamProducer[Any]): scala.collection.mutable.Set[StreamConnector[Any,Any]] = {
+    JavaConversions.asScalaSet(graph.outgoingEdgesOf(v))
+  }
+
+  override def getNodeByName(name: String): Option[StreamProducer[Any]] = {
+    nodeMap.get(name)
+  }
+
+  def setNodeMap(nodeMap: mutable.Map[String, StreamProducer[Any]]): Unit = {
+    this.nodeMap = nodeMap
+  }
+
+  override def incomingVertexOf(v: StreamProducer[Any]): scala.collection.mutable.Set[StreamProducer[Any]] = {
+    val set = mutable.Set[StreamProducer[Any]]()
+    graph.incomingEdgesOf(v).asScala.foreach(e => set += graph.getEdgeSource(e))
+    set
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala
new file mode 100644
index 0000000..fcff639
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala
@@ -0,0 +1,27 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream.core
+
+import com.typesafe.config.Config
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+abstract class StreamDAGExpansion(config: Config) {
+  def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]])
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
new file mode 100644
index 0000000..32947b9
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.datastream.core
+
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+import scala.collection.mutable
+
+/**
+ * convert generic DAG data structure to Storm specific DAG data structure for easy topology compiler
+ */
+object StreamDAGTransformer {
+  /**
+   * Transform DirectedAcyclicGraph[StreamProducer, StreamConnector] into StormStreamDAG
+   *
+   * @param dag DirectedAcyclicGraph[StreamProducer, StreamConnector]
+   * @return StormStreamDAG
+   */
+  def transform(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) : StreamDAG = {
+    val stormDAG = new StreamDAG(dag)
+    val nodeMap = mutable.HashMap[String, StreamProducer[Any]]()
+    val iter = dag.iterator()
+    while(iter.hasNext){
+      val sp = iter.next()
+      nodeMap.put(sp.name, sp)
+    }
+    stormDAG.setNodeMap(nodeMap)
+    stormDAG
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala
new file mode 100644
index 0000000..1a07e3f
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.datastream.core
+
+import com.typesafe.config.Config
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Replace GroupByProducer(Vertex) with StreamConnector (Edge)
+ *
+ * For example as to Storm, it's mainly for grouping method
+ *
+ * @param config context configuration
+ */
+case class StreamGroupbyExpansion(config: Config) extends StreamDAGExpansion(config){
+  override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = {
+    val iter = dag.iterator()
+    var toBeAddedEdges = new ListBuffer[StreamConnector[Any,Any]]
+    var toBeRemovedVertex = new ListBuffer[StreamProducer[Any]]
+    while(iter.hasNext) {
+      val current = iter.next()
+      dag.outgoingEdgesOf(current).foreach(edge => {
+        val child = edge.to
+        child match {
+          case p : GroupByProducer[Any] => {
+            dag.outgoingEdgesOf(p).foreach(c2 => {
+              p match {
+                case GroupByFieldProducer(fields) =>
+                  toBeAddedEdges += GroupbyFieldsConnector(current, c2.to,fields)
+                case GroupByStrategyProducer(strategy) =>
+                  toBeAddedEdges += GroupbyStrategyConnector(current, c2.to,strategy)
+                case GroupByKeyProducer(keySelector) =>
+                  current.outKeyed = true
+                  current.keySelector = KeySelectorWrapper(keySelector)
+                  c2.to.inKeyed = true
+                  toBeAddedEdges += GroupbyKeyConnector(current, c2.to,keySelector)
+                case _ => toBeAddedEdges += ShuffleConnector(current, c2.to)
+              }
+            })
+            toBeRemovedVertex += p
+          }
+          case _ =>
+        }
+      })
+    }
+
+    // add back edges
+    toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e))
+    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
+  }
+}
+
+object StreamGroupbyExpansion{
+  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamGroupbyExpansion ={
+    val e = StreamGroupbyExpansion(config)
+    e.expand(dag)
+    e
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala
new file mode 100644
index 0000000..4bc1812
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.datastream.core
+
+import com.typesafe.config.Config
+import org.apache.eagle.datastream.utils.NodeNameSelector
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+
+/**
+ * to set name for each StreamProducer
+ * 1. if name is given programatically, then use this name
+ * 2. otherwise use name generated by scala internally
+ */
+case class StreamNameExpansion(config: Config) extends StreamDAGExpansion(config){
+  val LOG = LoggerFactory.getLogger(classOf[StreamNameExpansion])
+
+  override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = {
+    val iter = dag.iterator()
+    while(iter.hasNext){
+      val sp = iter.next()
+      sp.name = NodeNameSelector(sp).getName
+    }
+  }
+}
+
+
+object StreamNameExpansion{
+  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamNameExpansion ={
+    val e = StreamNameExpansion(config)
+    e.expand(dag)
+    e
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
new file mode 100644
index 0000000..e01ffa4
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.datastream.core
+
+import java.util.regex.Pattern
+
+import com.typesafe.config.{Config, ConfigObject, ConfigValue}
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+case class StreamParallelismConfigExpansion(config: Config) extends StreamDAGExpansion(config){
+  val LOG = LoggerFactory.getLogger(classOf[StreamParallelismConfigExpansion])
+
+  override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = {
+    val map = getParallelismMap(config)
+    val iter = dag.iterator()
+    while(iter.hasNext){
+      val streamProducer = iter.next()
+      if(streamProducer.name != null) {
+        map.foreach(tuple => {
+          tuple._1.matcher(streamProducer.name).find() match {
+            case true => streamProducer.parallelism(tuple._2)
+            case false =>
+          }
+        })
+      }
+    }
+  }
+
+  private def getParallelismMap(config: Config) : Map[Pattern, Int]= {
+    val parallelismConfig: ConfigObject = config.getObject("envContextConfig.parallelismConfig")
+    parallelismConfig.asScala.toMap map {
+      case (name, value) => (Pattern.compile(name), value.asInstanceOf[ConfigValue].unwrapped().asInstanceOf[Int])
+    }
+  }
+}
+
+object StreamParallelismConfigExpansion{
+  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamParallelismConfigExpansion ={
+    val e = StreamParallelismConfigExpansion(config)
+    e.expand(dag)
+    e
+  }
+}
\ No newline at end of file



Mime
View raw message