eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [3/5] incubator-eagle git commit: EAGLE-66 Typesafe Streaming DSL and KeyValue based Grouping
Date Wed, 16 Dec 2015 06:01:45 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
new file mode 100644
index 0000000..4d7dcd1
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
@@ -0,0 +1,294 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream.core
+
+import java.util
+import java.util.concurrent.atomic.AtomicInteger
+
+import backtype.storm.topology.base.BaseRichSpout
+import com.typesafe.config.Config
+import org.apache.eagle.alert.entity.AlertAPIEntity
+import org.apache.eagle.datastream._
+import org.apache.eagle.partition.PartitionStrategy
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+/**
+ * StreamProducer = StreamInfo + StreamProtocol
+ *
+ * StreamProducer is processing logic element, used the base class for all other concrete StreamProducer
+ * Stream Producer can be treated as logical processing element but physical
+ * It defines high level type-safe API for user to organize data stream flow
+ *
+ * StreamProducer is independent of execution environment
+ *
+ * @tparam T processed elements type
+ */
+abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[T] {
+  private var graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]] = null
+  private val LOG = LoggerFactory.getLogger(classOf[StreamProducer[T]])
+
+  /**
+   * Should not modify graph when setting it
+   */
+  def initWith(graph:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]],config:Config, hook:Boolean = true):StreamProducer[T]={
+    this.config = config
+    this.graph = graph
+    if(hook && ! this.graph.containsVertex(this)) {
+      this.graph.addVertex(this)
+    }
+    LOG.debug(this.graph.toString)
+    this
+  }
+  
+  /**
+   * Get stream pure metadata info
+   * @return
+   */
+  override def getInfo:StreamInfo = this.asInstanceOf[StreamInfo]
+
+  override def stream(streamId:String):StreamProducer[T] = {
+    this.streamId = streamId
+    this
+  }
+
+  override def filter(fn : T => Boolean): StreamProducer[T] ={
+    val ret = FilterProducer[T](fn)
+    hookup(this, ret)
+    ret
+  }
+
+  override def flatMap[R](flatMapper:FlatMapper[R]): StreamProducer[R] = {
+    val ret = FlatMapProducer[T,R](flatMapper)
+    hookup(this, ret)
+    ret
+  }
+
+  override def foreach(fn : T => Unit) : Unit = {
+    val ret = ForeachProducer[T](fn)
+    hookup(this, ret)
+  }
+
+  override def map[R](fn : T => R) : StreamProducer[R] = {
+    val ret = MapperProducer[T,R](0,fn)
+    hookup(this, ret)
+    ret
+  }
+
+  override def map1[R](fn : T => R): StreamProducer[R] = {
+    val ret = MapperProducer[T,R](1, fn)
+    hookup(this, ret)
+    ret
+  }
+
+  override def map2[R](fn : T => R): StreamProducer[R] = {
+    val ret = MapperProducer[T,R](2, fn)
+    hookup(this, ret)
+    ret
+  }
+
+  override def map3[R](fn : T => R): StreamProducer[R] = {
+    val ret = MapperProducer(3, fn)
+    hookup(this, ret)
+    ret
+  }
+
+  override def map4[R](fn : T => R): StreamProducer[R] = {
+    val ret = MapperProducer(4, fn)
+    hookup(this, ret)
+    ret
+  }
+
+  /**
+   * starting from 0, groupby operator would be upon edge of the graph
+   */
+  override def groupBy(fields : Int*) : StreamProducer[T] = {
+    // validate each field index is greater or equal to 0
+    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
+    val ret = GroupByFieldProducer[T](fields)
+    hookup(this, ret)
+    ret
+  }
+
+  //groupBy java version, starting from 1
+  override def groupBy(fields : java.util.List[Integer]) : StreamProducer[T] = {
+    // validate each field index is greater or equal to 0
+    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
+    val ret = GroupByFieldProducer[T](fields.asScala.toSeq.asInstanceOf[Seq[Int]])
+    hookup(this, ret)
+    ret
+  }
+
+  override def groupByKey(keySelector: T=> Any):StreamProducer[T] = {
+    val ret = GroupByKeyProducer(keySelector)
+    hookup(this,ret)
+    ret
+  }
+
+  override def streamUnion[T2,T3](others : Seq[StreamProducer[T2]]) : StreamProducer[T3] = {
+    val ret = StreamUnionProducer[T, T2, T3](others)
+    hookup(this, ret)
+    ret
+  }
+
+  def streamUnion[T2,T3](other : StreamProducer[T2]) : StreamProducer[T3] = {
+    streamUnion(Seq(other))
+  }
+
+  def streamUnion[T2,T3](others : util.List[StreamProducer[T2]]) : StreamProducer[T3] = streamUnion(others.asScala)
+
+  override def groupBy(strategy : PartitionStrategy) : StreamProducer[T] = {
+    val ret = GroupByStrategyProducer(strategy)
+    hookup(this, ret)
+    ret
+  }
+
+  /**
+   * alert is always sink of data flow
+   */
+  def alertWithConsumer(upStreamNames: util.List[String], alertExecutorId : String) = {
+    alert(upStreamNames.asScala, alertExecutorId,consume = true)
+  }
+
+  def alertWithoutConsumer(upStreamNames: util.List[String], alertExecutorId : String) = {
+    alert(upStreamNames.asScala, alertExecutorId, consume = false)
+  }
+
+  def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean=true, strategy : PartitionStrategy=null) = {
+    val ret = AlertStreamSink(upStreamNames, alertExecutorId, consume, strategy)
+    hookup(this, ret)
+  }
+
+  def alertWithConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
+    alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = true, strategy = strategy)
+  }
+
+  def alertWithConsumer(upStreamName: String, alertExecutorId : String): Unit ={
+    alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = true)
+  }
+
+  def alertWithoutConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
+    alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = false, strategy)
+  }
+
+  def alertWithoutConsumer(upStreamName: String, alertExecutorId : String): Unit ={
+    alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = false)
+  }
+
+  protected def hookup[T1,T2](current: StreamProducer[T1], next: StreamProducer[T2]) = {
+    current.graph.addVertex(next)
+    current.graph.addEdge(current, next, StreamConnector(current, next))
+    passOnContext[T1,T2](current, next)
+  }
+
+  private def passOnContext[T1 ,T2](current: StreamProducer[T1], next: StreamProducer[T2]): Unit ={
+    next.initWith(current.graph,current.config)
+  }
+
+  /**
+   * can be set by programatically or by configuration
+   */
+  override def parallelism(parallelism : Int) : StreamProducer[T] = {
+    this.parallelismNum = parallelism
+    this
+  }
+
+  override def parallelism : Int = this.parallelismNum
+  override def stream:String = this.streamId
+
+  /**
+   * Component name
+   * 
+   * @param componentName component name
+   * @return
+   */
+  override def nameAs(componentName : String) : StreamProducer[T] = {
+    this.name = componentName
+    this
+  }
+}
+
+case class FilterProducer[T](fn : T => Boolean) extends StreamProducer[T]{
+  override def toString: String = s"FilterProducer"
+}
+
+case class FlatMapProducer[T, R](var mapper: FlatMapper[R]) extends StreamProducer[R]{
+  override def toString: String = mapper.toString
+}
+
+case class MapperProducer[T,R](numOutputFields : Int, var fn : T => R) extends StreamProducer[R]{
+  override def toString: String = s"MapperProducer"
+}
+
+case class ForeachProducer[T](var fn : T => Unit) extends StreamProducer[T]
+
+abstract class GroupByProducer[T] extends StreamProducer[T]
+case class GroupByFieldProducer[T](fields : Seq[Int]) extends GroupByProducer[T]
+case class GroupByStrategyProducer[T](partitionStrategy: PartitionStrategy) extends GroupByProducer[T]
+case class GroupByKeyProducer[T](keySelectorFunc:T => Any) extends GroupByProducer[T]{
+  override def toString: String = s"GroupByKey"
+}
+
+object GroupByProducer {
+  def apply[T](fields: Seq[Int]) = new GroupByFieldProducer[T](fields)
+  def apply[T](partitionStrategy : PartitionStrategy) = new GroupByStrategyProducer[T](partitionStrategy)
+  def apply[T](keySelector:T => Any) = new GroupByKeyProducer[T](keySelector)
+}
+
+case class StreamUnionProducer[T1,T2,T3](others: Seq[StreamProducer[T2]]) extends StreamProducer[T3]
+
+case class StormSourceProducer[T](source: BaseRichSpout) extends StreamProducer[T]{
+  var numFields : Int = 0
+
+  /**
+    * rename outputfields to f0, f1, f2, ...
+   * if one spout declare some field names, those fields names will be modified
+   * @param n
+   */
+  def withOutputFields(n : Int): StormSourceProducer[T] ={
+    this.numFields = n
+    this
+  }
+}
+
+case class IterableStreamProducer[T](iterable: Iterable[T],recycle:Boolean = false) extends StreamProducer[T]
+
+case class AlertStreamSink(upStreamNames: util.List[String], alertExecutorId : String, var consume: Boolean=true, strategy: PartitionStrategy=null) extends StreamProducer[AlertAPIEntity] {
+  def consume(consume: Boolean): AlertStreamSink = {
+    this.consume = consume
+    this
+  }
+}
+
+object UniqueId{
+  val id : AtomicInteger = new AtomicInteger(0);
+  def incrementAndGetId() : Int = {
+    id.incrementAndGet()
+  }
+}
+
+trait KeySelector extends Serializable{
+  def key(t:Any):Any
+}
+
+case class KeySelectorWrapper[T](fn:T => Any) extends KeySelector{
+  override def key(t: Any): Any = fn(t.asInstanceOf[T])
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala
new file mode 100644
index 0000000..f3fcc4d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala
@@ -0,0 +1,29 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream.core
+
+trait StreamProducerGraph {
+  def addEdge(from: StreamProducer[Any], to: StreamProducer[Any], streamConnector: StreamConnector[Any,Any])
+  def addVertex(producer: StreamProducer[Any])
+  def iterator() : Iterator[StreamProducer[Any]]
+  def isSource(v : StreamProducer[Any]) : Boolean
+  def outgoingEdgesOf(v : StreamProducer[Any]) : scala.collection.mutable.Set[StreamConnector[Any,Any]]
+  def getNodeByName(name : String) : Option[StreamProducer[Any]]
+  def incomingVertexOf(v: StreamProducer[Any]) : scala.collection.mutable.Set[StreamProducer[Any]]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
new file mode 100644
index 0000000..346f728
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.datastream.core
+
+import com.typesafe.config.Config
+import org.apache.commons.lang3.builder.HashCodeBuilder
+import org.apache.eagle.datastream.FlatMapper
+import org.apache.eagle.partition.PartitionStrategy
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+import scala.reflect.runtime.{universe => ru}
+
+/**
+ * StreamInfo should be fully serializable and having not runtime type information
+ */
+class StreamInfo  extends Serializable{
+  /**
+   * Processing Element Id
+   */
+  val id:Int = UniqueId.incrementAndGetId()
+
+  /**
+   * Processing Element Name
+   */
+  var name: String = null
+
+  var streamId:String=null
+  var parallelismNum: Int = 1
+
+  /**
+   * Keyed input stream
+   */
+  var inKeyed:Boolean = false
+  /**
+   * Keyed output stream
+   */
+  var outKeyed:Boolean = false
+  /**
+   * Output key selector
+   */
+  var keySelector:KeySelector = null
+
+// Type Information
+// ================
+//
+//  /**
+//   * Entity class type of T
+//   */
+//  var typeClass:Class[_] = null
+//
+//  /**
+//   * Type Class Simple Name
+//   * @return
+//   */
+//  def typeClassName = if(typeClass == null) null else typeClass.getSimpleName
+//
+//  @transient private var _typeTag[_] = null
+//
+//  def typeTag[_] = {
+//    if(_typeTag == null) _typeTag = Reflections.typeTag(this.typeClass)
+//    _typeTag
+//  }
+
+  var config: Config = null
+
+  def getInfo = this
+
+  override def hashCode(): Int = new HashCodeBuilder().append(this.id).append(this.getClass).toHashCode
+}
+
+/**
+ * Stream interaction protocol interface
+ *
+ * @tparam T processed elements type
+ */
+trait StreamProtocol[+T <: Any]{
+  /**
+   * Initialize the stream metadata info
+   */
+  def initWith(graph:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]],config:Config, hook:Boolean = true):StreamProducer[T]
+
+  /**
+   * Support Java API
+   *
+   * @param flatMapper
+   * @tparam R
+   * @return
+   */
+  def flatMap[R](flatMapper:FlatMapper[R]): StreamProducer[R]
+
+  /**
+   *
+   * @param fn
+   * @return
+   */
+  def filter(fn : T => Boolean): StreamProducer[T]
+
+  /**
+   *
+   * @param fn
+   */
+  def foreach(fn : T => Unit) : Unit
+
+  /**
+   * Type safe mapper
+   * @param fn
+   * @tparam R
+   * @return
+   */
+  def map[R](fn : T => R): StreamProducer[R]
+
+  /**
+   * Field base mapper
+   * @param fn
+   * @tparam R
+   * @return
+   */
+  def map1[R](fn : T => R) : StreamProducer[R]
+  def map2[R](fn : T => R) : StreamProducer[R]
+  def map3[R](fn : T => R) : StreamProducer[R]
+  def map4[R](fn : T => R) : StreamProducer[R]
+
+  def groupBy(fields : Int*) : StreamProducer[T]
+  def groupBy(fields : java.util.List[Integer]) : StreamProducer[T]
+  def groupBy(strategy : PartitionStrategy) : StreamProducer[T]
+
+  /**
+   * @param keyer key selector function
+   * @return
+   */
+  def groupByKey(keyer:T => Any):StreamProducer[T]
+  def streamUnion[T2,T3](otherStreams : Seq[StreamProducer[T2]]) : StreamProducer[T3]
+  def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean,strategy : PartitionStrategy)
+  /**
+   * Set processing element parallelism setting
+   * @param parallelismNum parallelism value
+   * @return
+   */
+  def parallelism(parallelismNum : Int) : StreamProducer[T]
+  def parallelism : Int
+  /**
+   * Set component name
+   *
+   * @param componentName
+   * @return
+   */
+  def nameAs(componentName : String) : StreamProducer[T]
+
+  /**
+   * Set stream name
+   * @param streamId stream ID
+   * @return
+   */
+  def stream(streamId: String): StreamProducer[T]
+  def stream: String
+
+  def ? (fn:T => Boolean):StreamProducer[T] = this.filter(fn)
+  def ~>[R](flatMapper : FlatMapper[R]) = this.flatMap[R](flatMapper)
+  def ! (upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean = true,strategy: PartitionStrategy = null) = alert(upStreamNames, alertExecutorId, consume,strategy)
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
new file mode 100644
index 0000000..9884e88
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.core
+
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+import scala.reflect.runtime.{universe => ru}
+
+/**
+ * @since  12/7/15
+ */
+trait StreamSourceBuilder {
+  def config:Configuration
+
+  /**
+   * Business logic DAG
+   * @return
+   */
+  def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]
+
+  /**
+   *
+   * @param iterable top level Iterable interface
+   * @param recycle
+   * @tparam T
+   * @return
+   */
+  def from[T:ru.TypeTag](iterable: Iterable[T],recycle:Boolean = false):IterableStreamProducer[T]={
+    val p = IterableStreamProducer[T](iterable,recycle)
+    p.initWith(dag,config.get)
+    p
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala
new file mode 100644
index 0000000..351782b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala
@@ -0,0 +1,71 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream.core
+
+import com.typesafe.config.Config
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * union operator should be expanded
+ */
+case class StreamUnionExpansion(config: Config) extends StreamDAGExpansion(config){
+  val LOG = LoggerFactory.getLogger(classOf[StreamUnionExpansion])
+
+  override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = {
+    val iter = dag.iterator()
+    var toBeAddedEdges = new ListBuffer[StreamConnector[Any,Any]]
+    var toBeRemovedVertex = new ListBuffer[StreamProducer[Any]]
+    while(iter.hasNext) {
+      val current = iter.next()
+      dag.outgoingEdgesOf(current).foreach(edge => {
+        val child = edge.to
+        child match {
+          case StreamUnionProducer(others) => {
+            dag.outgoingEdgesOf(child).foreach(c2 => {
+              toBeAddedEdges += StreamConnector(current, c2.to, edge)
+              others.foreach(o => {
+                toBeAddedEdges += StreamConnector(o, c2.to, edge)
+              })
+            })
+            toBeRemovedVertex += child
+          }
+          case _ =>
+        }
+      })
+    }
+
+    // add back edges
+    toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e))
+    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
+  }
+}
+
+object StreamUnionExpansion{
+  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamUnionExpansion ={
+    val e = StreamUnionExpansion(config)
+    e.expand(dag)
+    e
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/JsonMessageDeserializer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/JsonMessageDeserializer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/JsonMessageDeserializer.scala
deleted file mode 100644
index 754abe0..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/JsonMessageDeserializer.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.kafka
-
-import java.io.IOException
-import java.util
-import java.util.Properties
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer
-import org.slf4j.{Logger, LoggerFactory}
-
-/**
- * @since  11/6/15
- */
-case class JsonMessageDeserializer(props:Properties) extends SpoutKafkaMessageDeserializer{
-  private val objectMapper: ObjectMapper = new ObjectMapper
-  private val LOG: Logger = LoggerFactory.getLogger(classOf[JsonMessageDeserializer])
-
-  override def deserialize(bytes: Array[Byte]): AnyRef = {
-    var map: util.Map[String, _] = null
-    try {
-      map = objectMapper.readValue(bytes, classOf[util.TreeMap[String, _]])
-    } catch {
-      case e: IOException => {
-        LOG.error("Failed to deserialize json from: " + new String(bytes), e)
-      }
-    }
-    map
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/KafkaStreamMonitor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/KafkaStreamMonitor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/KafkaStreamMonitor.scala
deleted file mode 100644
index eff35fc..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/KafkaStreamMonitor.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.kafka
-
-import org.apache.eagle.datastream.StormStreamApp
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider
-
-/**
- * @since  11/6/15
- */
-class KafkaStreamMonitorApp extends StormStreamApp{
-  val streamName = get[String]("eagle.stream.name","eventStream")
-  val streamExecutorId = get[String]("eagle.stream.executor",s"${streamName}Executor")
-
-  set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)
-
-  source(new KafkaSourcedSpoutProvider).renameOutputFields(1).withName(streamName)
-    .alertWithConsumer(streamName, streamExecutorId)
-}
-
-object KafkaStreamMonitor extends KafkaStreamMonitorApp
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
new file mode 100644
index 0000000..e109118
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+import backtype.storm.task.{OutputCollector, TopologyContext}
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichBolt
+import backtype.storm.tuple.{Fields, Tuple}
+import org.apache.eagle.datastream.core.StreamInfo
+import org.apache.eagle.datastream.utils.NameConstants
+import org.slf4j.LoggerFactory
+
+/**
+ *
+ * @param fieldsNum zero-fieldsNum may means something different
+ * @param ack
+ * @param streamInfo
+ * @tparam T
+ */
+abstract class AbstractStreamBolt[T](val fieldsNum:Int=0, val ack:Boolean = true)(implicit streamInfo:StreamInfo) extends BaseRichBolt{
+  private var _collector: OutputCollector = null
+  private val LOG = LoggerFactory.getLogger(classOf[AbstractStreamBolt[T]])
+
+  /**
+   * If outKeyed then
+   *  Fields = ("key","value"]
+   * elsif num > 0
+   *  Fields = ["f0","f1",..,"fn"]
+   * elsif num == 0
+   *  Fields = ["f0"]
+   * end
+   *
+   * @param declarer
+   */
+  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
+    if(streamInfo.outKeyed) {
+      declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE))
+    }else{
+      if(fieldsNum > 0) {
+        val fields = new util.ArrayList[String]()
+        var i: Int = 0
+        while (i < fieldsNum) {
+          fields.add(NameConstants.FIELD_PREFIX + i)
+          i += 1
+        }
+        declarer.declare(new Fields(fields))
+      }else if(fieldsNum == 0){
+        declarer.declare(new Fields(NameConstants.FIELD_PREFIX + 0))
+      }
+    }
+  }
+
+  def emit(values:util.List[AnyRef])(implicit input:Tuple){
+    if (streamInfo.outKeyed) {
+      _collector.emit(input, util.Arrays.asList(streamInfo.keySelector.key(values).asInstanceOf[AnyRef], values))
+    } else {
+      _collector.emit(input, values)
+    }
+  }
+
+  def emit(value:Any)(implicit input:Tuple){
+    if(streamInfo.outKeyed) {
+      _collector.emit(input, util.Arrays.asList(streamInfo.keySelector.key(value).asInstanceOf[AnyRef],value.asInstanceOf[AnyRef]))
+    }else{
+      _collector.emit(input,util.Arrays.asList(value.asInstanceOf[AnyRef]))
+    }
+  }
+
+  override def execute(input: Tuple): Unit = {
+    try {
+      implicit val _input = input
+      if (streamInfo.inKeyed) {
+        val key = input.getValueByField(NameConstants.FIELD_KEY)
+        val value = input.getValueByField(NameConstants.FIELD_VALUE).asInstanceOf[T]
+        onKeyValue(key, value)
+      } else {
+        onValues(input.getValues)
+      }
+      if(ack) _collector.ack(input)
+    }catch {
+      case t: Throwable => {
+        LOG.error(s"Got exception when processing $input",t)
+        _collector.fail(input)
+      }
+    }
+  }
+
+  /**
+   * Handle keyed stream value
+   */
+  def onKeyValue(key:Any,value:T)(implicit input:Tuple)
+
+  /**
+   * Handle general stream values list
+   *
+   * @param values
+   */
+  def onValues(values:util.List[AnyRef])(implicit input:Tuple)
+
+  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
+    _collector = collector
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala
new file mode 100644
index 0000000..b175b41
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+import backtype.storm.tuple.Tuple
+import org.apache.eagle.datastream.core.StreamInfo
+
+case class FilterBoltWrapper(fn:Any => Boolean)(implicit info:StreamInfo) extends AbstractStreamBolt[Any](fieldsNum = 1){
+  /**
+   * Handle keyed stream value
+   */
+  override def onKeyValue(key: Any, value: Any)(implicit input:Tuple): Unit = {
+    if(fn(value)) emit(value)
+  }
+
+  /**
+   * Handle general stream values list
+   *
+   * @param values
+   */
+  override def onValues(values: util.List[AnyRef])(implicit input:Tuple): Unit = {
+    val value = values.get(0)
+    if(fn(value)) emit(value)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala
new file mode 100644
index 0000000..4c105b2
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+import backtype.storm.tuple.Tuple
+import org.apache.eagle.datastream.core.StreamInfo
+
+/**
+ * @since  12/6/15
+ */
+case class ForeachBoltWrapper(fn:Any=>Unit)(implicit info:StreamInfo) extends AbstractStreamBolt[Any]  {
+  /**
+   * Handle keyed stream value
+   * @param value
+   */
+  override def onKeyValue(key:Any,value: Any)(implicit input:Tuple): Unit = {
+    fn(value)
+  }
+
+  /**
+   * Handle non-keyed stream values list
+   *
+   * @param values
+   */
+  override def onValues(values: util.List[AnyRef])(implicit input:Tuple): Unit = {
+    fn(values)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
new file mode 100644
index 0000000..e3b6e25
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+import backtype.storm.spout.SpoutOutputCollector
+import backtype.storm.task.TopologyContext
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichSpout
+import backtype.storm.tuple.Fields
+import backtype.storm.utils.Utils
+import org.apache.eagle.datastream.core.StreamInfo
+import org.apache.eagle.datastream.utils.NameConstants
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+/**
+ * @since  12/6/15
+ */
+case class IterableStreamSpout(iterable: Iterable[Any],recycle:Boolean = true)(implicit info:StreamInfo) extends BaseRichSpout {
+  val LOG = LoggerFactory.getLogger(classOf[IterableStreamSpout])
+  var _collector:SpoutOutputCollector=null
+  var _iterator:Iterator[Any] = null
+
+  override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
+    this._collector = collector
+    this._iterator = iterable.iterator
+  }
+
+  override def nextTuple(): Unit = {
+    if(_iterator.hasNext){
+      val current = _iterator.next().asInstanceOf[AnyRef]
+      if(info.outKeyed) {
+        _collector.emit(List(info.keySelector.key(current),current).asJava.asInstanceOf[util.List[AnyRef]])
+      }else{
+        _collector.emit(List(current).asJava)
+      }
+    }else if(recycle){
+      LOG.info("Recycling the iterator")
+      _iterator = iterable.iterator
+    }else{
+      LOG.info("No tuple left, sleep forever")
+      this.deactivate()
+      Utils.sleep(Long.MaxValue)
+    }
+  }
+
+  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
+    if(info.outKeyed) {
+      declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE))
+    }else{
+      declarer.declare(new Fields(NameConstants.FIELD_PREFIX))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala
new file mode 100644
index 0000000..1c0d42e
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+import backtype.storm.task.{OutputCollector, TopologyContext}
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichBolt
+import backtype.storm.tuple.{Fields, Tuple}
+import org.apache.eagle.datastream.{Collector, EagleTuple, JavaStormStreamExecutor}
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+case class JavaStormBoltWrapper(worker : JavaStormStreamExecutor[EagleTuple]) extends BaseRichBolt{
+  val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
+  var _collector : OutputCollector = null
+
+  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
+    _collector = collector
+    worker.init
+  }
+
+  override def execute(input : Tuple): Unit ={
+    worker.flatMap(input.getValues, new Collector[EagleTuple](){
+      def collect(t: EagleTuple): Unit ={
+        _collector.emit(input, t.getList.asJava)
+      }
+    })
+    _collector.ack(input)
+  }
+
+  override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
+    val fields = worker.fields
+    LOG.info("output fields for worker " + worker + " : " + fields.toList)
+    declarer.declare(new Fields(fields:_*))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
new file mode 100644
index 0000000..0001d2f
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import java.io.IOException
+import java.util
+import java.util.Properties
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * @since  11/6/15
+ */
+case class JsonMessageDeserializer(props:Properties) extends SpoutKafkaMessageDeserializer{
+  private val objectMapper: ObjectMapper = new ObjectMapper
+  private val LOG: Logger = LoggerFactory.getLogger(classOf[JsonMessageDeserializer])
+
+  override def deserialize(bytes: Array[Byte]): AnyRef = {
+    var map: util.Map[String, _] = null
+    try {
+      map = objectMapper.readValue(bytes, classOf[util.TreeMap[String, _]])
+    } catch {
+      case e: IOException => {
+        LOG.error("Failed to deserialize json from: " + new String(bytes), e)
+      }
+    }
+    map
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala
new file mode 100644
index 0000000..8c92590
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider
+import org.apache.eagle.datastream.ExecutionEnvironments
+
+class KafkaStreamMonitorApp extends App {
+  val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
+  val streamName = env.config.get[String]("eagle.stream.name","eventStream")
+  val streamExecutorId = env.config.get[String]("eagle.stream.executor",s"${streamName}Executor")
+  env.config.set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)
+  env.fromSpout(new KafkaSourcedSpoutProvider()).parallelism(1).nameAs(streamName) ! (Seq(streamName),streamExecutorId)
+  env.execute()
+}
+
+object KafkaStreamMonitor extends KafkaStreamMonitorApp
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala
new file mode 100644
index 0000000..1119e08
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+import backtype.storm.tuple.Tuple
+import org.apache.eagle.datastream.core.StreamInfo
+
+/**
+ * @param num if num is zero, then means that it's using type-safe way, because to map operation, it must require at least one output field
+ * @param fn
+ * @param streamInfo
+ */
+case class MapBoltWrapper(num: Int, fn: Any => Any)(implicit streamInfo: StreamInfo) extends AbstractStreamBolt[Any](fieldsNum = num){
+  /**
+   * Handle keyed stream value
+   */
+  override def onKeyValue(key: Any, value: Any)(implicit input:Tuple): Unit = {
+    emit(fn(value))
+  }
+
+  /**
+   * Handle general stream values list
+   *
+   * @param values
+   */
+  override def onValues(values: util.List[AnyRef])(implicit input:Tuple): Unit = {
+    val size = values.size()
+    if(size == 0) return
+    if(num == 0) {
+      emit(fn(values.get(0)))
+    } else {
+      var tuple: AnyRef = null
+      size match {
+        case 1 => tuple = scala.Tuple1[AnyRef](values.get(0))
+        case 2 => tuple = scala.Tuple2(values.get(0), values.get(1))
+        case 3 => tuple = scala.Tuple3(values.get(0), values.get(1), values.get(2))
+        case 4 => tuple = scala.Tuple4(values.get(0), values.get(1), values.get(2), values.get(3))
+        case _ => throw new IllegalArgumentException(s"Exceed max supported tuple size $size > 4")
+      }
+      val output = fn(tuple)
+      output match {
+        case scala.Tuple1(a) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef]))
+        case scala.Tuple2(a, b) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef]))
+        case scala.Tuple3(a, b, c) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef]))
+        case scala.Tuple4(a, b, c, d) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef], d.asInstanceOf[AnyRef]))
+        case a => emit(util.Arrays.asList(a.asInstanceOf[AnyRef]))
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/SpoutProxy.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/SpoutProxy.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/SpoutProxy.scala
new file mode 100644
index 0000000..37622e4
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/SpoutProxy.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+import backtype.storm.spout.{ISpoutOutputCollector, SpoutOutputCollector}
+import backtype.storm.task.TopologyContext
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichSpout
+import backtype.storm.tuple.Fields
+import org.apache.eagle.datastream.core.{KeySelector, StreamInfo}
+import org.apache.eagle.datastream.utils.NameConstants
+import java.util.{List => JList}
+
+/**
+ * Declare delegated BaseRichSpout with given field names
+ *
+ * @param delegate delegated BaseRichSpout
+ * @param outputFields given field names
+ */
+case class SpoutProxy(delegate: BaseRichSpout, outputFields: java.util.List[String]) extends BaseRichSpout{
+  def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector) {
+    this.delegate.open(conf, context, collector)
+  }
+
+  def nextTuple {
+    this.delegate.nextTuple
+  }
+
+  override def ack(msgId: AnyRef) {
+    this.delegate.ack(msgId)
+  }
+
+  override def fail(msgId: AnyRef) {
+    this.delegate.fail(msgId)
+  }
+
+  override def deactivate {
+    this.delegate.deactivate
+  }
+
+  override def declareOutputFields(declarer: OutputFieldsDeclarer) {
+    declarer.declare(new Fields(outputFields))
+  }
+
+  override def close {
+    this.delegate.close
+  }
+}
+
+case class KeyedSpoutOutputCollector(delegate: ISpoutOutputCollector,keyer:KeySelector) extends SpoutOutputCollector(delegate) {
+  override def emitDirect (taskId: Int, streamId: String, tuple: JList[AnyRef], messageId: AnyRef):Unit = {
+    val kv = toKeyValue(tuple)
+    delegate.emitDirect(taskId, streamId,kv, messageId)
+  }
+
+  override def emit(streamId: String, tuple: JList[AnyRef], messageId: AnyRef):JList[Integer] = {
+    val kv = toKeyValue(tuple)
+    delegate.emit(streamId,kv, messageId)
+  }
+
+  def toKeyValue(tuple: JList[AnyRef]) = util.Arrays.asList(keyer.key(tuple.get(0)),tuple.get(0)).asInstanceOf[JList[AnyRef]]
+}
+
+case class KeyedSpoutProxy(delegate: BaseRichSpout)(implicit streamInfo:StreamInfo) extends BaseRichSpout{
+  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector) {
+    if(!streamInfo.outKeyed) throw new IllegalArgumentException(s"$streamInfo is not key-based")
+    if(streamInfo.keySelector == null) throw new NullPointerException(s"KeySelector $streamInfo is null")
+
+    this.delegate.open(conf, context, KeyedSpoutOutputCollector(collector,streamInfo.keySelector))
+  }
+
+  override def nextTuple {
+    this.delegate.nextTuple
+  }
+
+  override def ack(msgId: AnyRef) {
+    this.delegate.ack(msgId)
+  }
+
+  override def fail(msgId: AnyRef) {
+    this.delegate.fail(msgId)
+  }
+
+  override def deactivate {
+    this.delegate.deactivate
+  }
+
+  override def declareOutputFields(declarer: OutputFieldsDeclarer) {
+    declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE))
+  }
+
+  override def close {
+    this.delegate.close
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
new file mode 100644
index 0000000..b048b90
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
@@ -0,0 +1,53 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream.storm
+
+import backtype.storm.topology.base.BaseRichBolt
+import com.typesafe.config.Config
+import org.apache.eagle.datastream._
+import org.apache.eagle.datastream.core._
+
+object StormBoltFactory {
+  def getBoltWrapper(graph: StreamProducerGraph, producer : StreamProducer[Any], config : Config) : BaseRichBolt = {
+    implicit val streamInfo = producer.getInfo
+    producer match{
+      case FlatMapProducer(worker) => {
+        if(worker.isInstanceOf[JavaStormStreamExecutor[EagleTuple]]){
+          worker.asInstanceOf[JavaStormStreamExecutor[EagleTuple]].prepareConfig(config)
+          JavaStormBoltWrapper(worker.asInstanceOf[JavaStormStreamExecutor[EagleTuple]])
+        }else if(worker.isInstanceOf[StormStreamExecutor[EagleTuple]]){
+          worker.asInstanceOf[StormStreamExecutor[EagleTuple]].prepareConfig(config)
+          StormBoltWrapper(worker.asInstanceOf[StormStreamExecutor[EagleTuple]])
+        }else {
+          throw new UnsupportedOperationException
+        }
+      }
+      case filter:FilterProducer[Any] => {
+        FilterBoltWrapper(filter.fn)
+      }
+      case mapper:MapperProducer[Any,Any] => {
+        MapBoltWrapper(mapper.numOutputFields, mapper.fn)
+      }
+      case foreach:ForeachProducer[Any] => {
+        ForeachBoltWrapper(foreach.fn)
+      }
+      case _ => throw new UnsupportedOperationException(s"Unsupported producer: ${producer.toString}")
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala
new file mode 100644
index 0000000..3b91e77
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+import backtype.storm.task.{OutputCollector, TopologyContext}
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichBolt
+import backtype.storm.tuple.{Fields, Tuple}
+import org.apache.eagle.datastream.{Collector, EagleTuple, StormStreamExecutor}
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+case class StormBoltWrapper(worker : StormStreamExecutor[EagleTuple]) extends BaseRichBolt{
+  val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
+  var _collector : OutputCollector = null
+
+  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
+    _collector = collector
+    worker.init
+  }
+
+  override def execute(input : Tuple): Unit = {
+    try {
+      worker.flatMap(input.getValues.asScala, new Collector[EagleTuple] {
+        override def collect(t: EagleTuple): Unit = {
+          _collector.emit(input, t.getList.asJava)
+        }
+      })
+    }catch{
+      case ex: Exception => {
+        LOG.error("fail executing", ex)
+        _collector.fail(input)
+        throw new RuntimeException(ex)
+      }
+    }
+    _collector.ack(input)
+  }
+
+  override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
+    val fields = worker.fields
+    LOG.info("Output fields for worker " + worker + " : " + fields.toList)
+    declarer.declare(new Fields(fields:_*))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
new file mode 100644
index 0000000..4e7d743
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import backtype.storm.topology.base.BaseRichSpout
+import com.typesafe.config.Config
+import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider
+import org.apache.eagle.datastream.core.{ExecutionEnvironmentBase, StormSourceProducer, StreamDAG}
+
+
+/**
+ * @since  12/7/15
+ */
+case class StormExecutionEnvironment(private val conf:Config) extends ExecutionEnvironmentBase(conf){
+
+
+  override def execute(dag: StreamDAG) : Unit = {
+    StormTopologyCompiler(config.get, dag).buildTopology.execute
+  }
+
+  def fromSpout[T](source: BaseRichSpout): StormSourceProducer[T] = {
+    val ret = StormSourceProducer[T](source)
+    ret.initWith(dag,config.get)
+    ret
+  }
+
+  def fromSpout[T](sourceProvider: StormSpoutProvider):StormSourceProducer[T] = fromSpout(sourceProvider.getSpout(config.get))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
new file mode 100644
index 0000000..1b0d133
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+import com.typesafe.config.Config
+import org.apache.eagle.datastream._
+
+case class StormExecutorForAlertWrapper(delegate: StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]], streamName: String)
+  extends StormStreamExecutor3[String, String, util.SortedMap[Object, Object]]{
+  override def prepareConfig(config: Config): Unit = {
+    delegate.prepareConfig(config)
+  }
+
+  override def init: Unit = {
+    delegate.init
+  }
+
+  override def flatMap(input: Seq[AnyRef], collector: Collector[Tuple3[String, String, util.SortedMap[Object, Object]]]): Unit = {
+    delegate.flatMap(input, new Collector[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]] {
+      override def collect(r: Tuple2[String, util.SortedMap[AnyRef, AnyRef]]): Unit = {
+        collector.collect(Tuple3(r.f0, streamName, r.f1))
+      }
+    })
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
new file mode 100644
index 0000000..74ed11d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
@@ -0,0 +1,67 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+import backtype.storm.topology.base.BaseRichSpout
+import com.typesafe.config.Config
+import org.apache.eagle.datastream.core.{IterableStreamProducer, StormSourceProducer, StreamProducer}
+import org.apache.eagle.datastream.utils.NameConstants
+
+object StormSpoutFactory {
+  def createSpout(config: Config, from: StreamProducer[Any]): BaseRichSpout = {
+    implicit val streamInfo = from.getInfo
+    from match {
+      case p@StormSourceProducer(source) =>
+        if(p.outKeyed) {
+          createKeyedProxySpout(p)
+        }else {
+          createProxySpout(p)
+        }
+      case p@IterableStreamProducer(iterable,recycle) =>
+        IterableStreamSpout(iterable,recycle)
+      case _ =>
+        throw new IllegalArgumentException(s"Cannot compile unknown $from to a Storm Spout")
+    }
+  }
+
+  /**
+   * @param sourceProducer source producer
+   * @return
+   */
+  def createProxySpout(sourceProducer: StormSourceProducer[Any]): BaseRichSpout = {
+    val numFields = sourceProducer.numFields
+    if (numFields <= 0) {
+      sourceProducer.source
+    } else {
+      var i = 0
+      val ret = new util.ArrayList[String]
+      while (i < numFields) {
+        ret.add(NameConstants.FIELD_PREFIX + i)
+        i += 1
+      }
+      SpoutProxy(sourceProducer.source, ret)
+    }
+  }
+
+  def createKeyedProxySpout(sourceProducer: StormSourceProducer[Any]):BaseRichSpout = {
+    KeyedSpoutProxy(sourceProducer.source)(sourceProducer.getInfo)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
new file mode 100644
index 0000000..18f52cb
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
@@ -0,0 +1,118 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+import backtype.storm.topology.base.BaseRichBolt
+import backtype.storm.topology.{BoltDeclarer, TopologyBuilder}
+import backtype.storm.tuple.Fields
+import com.typesafe.config.Config
+import org.apache.eagle.dataproc.impl.storm.partition.CustomPartitionGrouping
+import org.apache.eagle.datastream.core._
+import org.apache.eagle.datastream.utils.NameConstants
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable.ListBuffer
+
+case class StormTopologyCompiler(config: Config, graph: StreamProducerGraph) extends AbstractTopologyCompiler{
+  val LOG = LoggerFactory.getLogger(StormTopologyCompiler.getClass)
+  val boltCache = scala.collection.mutable.Map[StreamProducer[Any], StormBoltWrapper]()
+
+  override def buildTopology: AbstractTopologyExecutor ={
+    val builder = new TopologyBuilder();
+    val iter = graph.iterator()
+    val boltDeclarerCache = scala.collection.mutable.Map[String, BoltDeclarer]()
+    val stormTopologyGraph = ListBuffer[String]()
+    while(iter.hasNext){
+      val from = iter.next()
+      val fromName = from.name
+      if(graph.isSource(from)){
+        val spout = StormSpoutFactory.createSpout(config, from)
+        builder.setSpout(fromName, spout, from.parallelism)
+        LOG.info("Spout: " + fromName + " with parallelism " + from.parallelism)
+      } else {
+        LOG.debug("Bolt" + fromName)
+      }
+      val edges = graph.outgoingEdgesOf(from)
+      edges.foreach(sc => {
+        val toName = sc.to.name
+        var boltDeclarer: BoltDeclarer = null
+        val toBolt = createBoltIfAbsent(toName)
+        boltDeclarerCache.get(toName) match {
+          case None => {
+            var finalParallelism = 1
+            graph.getNodeByName(toName) match {
+              case Some(p) => finalParallelism = p.parallelism
+              case None => finalParallelism = 1
+            }
+            boltDeclarer = builder.setBolt(toName, toBolt, finalParallelism)
+            LOG.info("Bolt: " + toName + " with parallelism " + finalParallelism)
+            boltDeclarerCache.put(toName, boltDeclarer)
+          }
+          case Some(bt) => boltDeclarer = bt
+        }
+
+        sc match {
+          case GroupbyFieldsConnector(_, _, groupByFields) =>
+            boltDeclarer.fieldsGrouping(fromName, new Fields(fields(groupByFields)))
+          case GroupbyStrategyConnector(_, _, strategy) =>
+            boltDeclarer.customGrouping(fromName, new CustomPartitionGrouping(strategy));
+          case GroupbyKeyConnector(_, _, keySelector) =>
+            boltDeclarer.fieldsGrouping(fromName, new Fields(NameConstants.FIELD_KEY));
+          case ShuffleConnector(_, _) => {
+            boltDeclarer.shuffleGrouping(fromName)
+          }
+          case _ => throw new UnsupportedOperationException(s"Supported stream connector $sc")
+        }
+
+        if (graph.isSource(from)) {
+          stormTopologyGraph += s"Spout{$fromName}{${from.parallelism}) ~> Bolt{$toName}{${from.parallelism}} in $sc"
+        } else {
+          stormTopologyGraph += s"Bolt{$fromName}{${from.parallelism}) ~> Bolt{$toName}{${from.parallelism}} in $sc"
+        }
+      })
+    }
+    LOG.info(s"Storm topology DAG\n{\n \t${stormTopologyGraph.mkString("\n\t")} \n}")
+    new StormTopologyExecutorImpl(builder.createTopology, config)
+  }
+
+  def fields(fields : Seq[Int]): java.util.List[String] ={
+    val ret = new util.ArrayList[String]
+    fields.map(n => ret.add(NameConstants.FIELD_PREFIX + n))
+    ret
+  }
+
+  def createBoltIfAbsent(name : String) : BaseRichBolt = {
+    val producer = graph.getNodeByName(name)
+    producer match{
+      case Some(p) => createBoltIfAbsent(graph, p)
+      case None => throw new IllegalArgumentException("please check bolt name " + name)
+    }
+  }
+
+  def createBoltIfAbsent(graph: StreamProducerGraph, producer : StreamProducer[Any]): BaseRichBolt ={
+    boltCache.get(producer) match{
+      case Some(bolt) => bolt
+      case None => {
+        StormBoltFactory.getBoltWrapper(graph, producer, config)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
new file mode 100644
index 0000000..11ec46d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import java.io.{File, FileInputStream}
+
+import _root_.storm.trident.spout.RichSpoutBatchExecutor
+import backtype.storm.generated.StormTopology
+import backtype.storm.utils.Utils
+import backtype.storm.{Config, LocalCluster, StormSubmitter}
+import org.apache.eagle.datastream.core.AbstractTopologyExecutor
+import org.yaml.snakeyaml.Yaml
+
+case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesafe.config.Config) extends AbstractTopologyExecutor {
+  @throws(classOf[Exception])
+  def execute {
+    val localMode: Boolean = config.getString("envContextConfig.mode").equalsIgnoreCase("local")
+    val conf: Config = new Config
+    conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024))
+    conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8))
+    conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32))
+    conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384))
+    conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384))
+
+    if(config.hasPath("envContextConfig.stormConfigFile")) {
+      val file = new File(config.getString("envContextConfig.stormConfigFile"))
+      if(file.exists()) {
+        val inputFileStream = new FileInputStream(file)
+        val yaml = new Yaml()
+        try {
+          val stormConf = yaml.load(inputFileStream).asInstanceOf[java.util.LinkedHashMap[String, Object]]
+          if(stormConf != null) conf.putAll(stormConf)
+        } catch {
+          case _: Throwable => ()
+        } finally {
+          if(inputFileStream != null) inputFileStream.close()
+        }
+      }
+    }
+
+    val topologyName = config.getString("envContextConfig.topologyName")
+    if (!localMode) {
+      StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology)
+    } else {
+      val cluster: LocalCluster = new LocalCluster
+      cluster.submitTopology(topologyName, conf, topology)
+      while(true) {
+        try {
+          Utils.sleep(Integer.MAX_VALUE)
+        }
+        catch {
+          case _: Throwable => () // Do nothing
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
new file mode 100644
index 0000000..67818e1
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
@@ -0,0 +1,76 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream.utils
+
+import java.util
+
+import org.apache.eagle.alert.dedup.{AlertEmailDeduplicationExecutor, AlertEntityDeduplicationExecutor}
+import org.apache.eagle.alert.notification.AlertNotificationExecutor
+import org.apache.eagle.alert.persist.AlertPersistExecutor
+import org.apache.eagle.datastream.core.{StreamConnector, FlatMapProducer, StreamProducer}
+import org.apache.eagle.executor.AlertExecutor
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Create alert executors and provide callback for programmer to link alert executor to immediate parent executors
+ *
+ * <br/><br/>
+ * Explanations for programId, alertExecutorId and policy<br/><br/>
+ * - programId - distributed or single-process program for example one storm topology<br/>
+ * - alertExecutorId - one process/thread which executes multiple policies<br/>
+ * - policy - some rules to be evaluated<br/>
+ *
+ * <br/>
+ *
+ * Normally the mapping is like following:
+ * <pre>
+ * programId (1:N) alertExecutorId
+ * alertExecutorId (1:N) policy
+ * </pre>
+ */
+
+object AlertExecutorConsumerUtils {
+  private val LOG: Logger = LoggerFactory.getLogger(AlertExecutorConsumerUtils.getClass)
+
+  def setupAlertConsumers(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], alertStreamProducers: List[StreamProducer[Any]]): Unit = {
+    val alertExecutorIdList: java.util.List[String] = new util.ArrayList[String]()
+    alertStreamProducers.map(x =>
+      alertExecutorIdList.add(x.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getAlertExecutorId));
+    val alertDefDao = alertStreamProducers.head.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getAlertDefinitionDao
+    val entityDedupExecutor: AlertEntityDeduplicationExecutor = new AlertEntityDeduplicationExecutor(alertExecutorIdList, alertDefDao)
+    val emailDedupExecutor: AlertEmailDeduplicationExecutor = new AlertEmailDeduplicationExecutor(alertExecutorIdList, alertDefDao)
+    val notificationExecutor: AlertNotificationExecutor = new AlertNotificationExecutor(alertExecutorIdList, alertDefDao)
+    val persistExecutor: AlertPersistExecutor = new AlertPersistExecutor
+
+    val entityDedupStreamProducer = FlatMapProducer(entityDedupExecutor)
+    val persistStreamProducer = FlatMapProducer(persistExecutor)
+    val emailDedupStreamProducer = FlatMapProducer(emailDedupExecutor)
+    val notificationStreamProducer = FlatMapProducer(notificationExecutor)
+    toBeAddedEdges += StreamConnector(entityDedupStreamProducer, persistStreamProducer)
+    toBeAddedEdges += StreamConnector(emailDedupStreamProducer, notificationStreamProducer)
+
+    alertStreamProducers.foreach(sp => {
+      toBeAddedEdges += StreamConnector(sp, entityDedupStreamProducer)
+      toBeAddedEdges += StreamConnector(sp, emailDedupStreamProducer)
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/GraphPrinter.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/GraphPrinter.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/GraphPrinter.scala
new file mode 100644
index 0000000..33a805c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/GraphPrinter.scala
@@ -0,0 +1,59 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream.utils
+
+import org.apache.eagle.datastream.core.{StreamConnector, StreamProducer}
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * @see scala.tools.nsc.DotDiagramGenerator
+ */
+object GraphPrinter {
+  private val LOG = LoggerFactory.getLogger(GraphPrinter.getClass)
+  def print(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any, Any]], message:String = "Print stream DAG graph"): Unit = {
+    val graphStr = ListBuffer[String]()
+    val iter = dag.iterator()
+    while (iter.hasNext) {
+      val current = iter.next()
+      dag.outgoingEdgesOf(current).foreach(edge => {
+        graphStr += s"${edge.from.name}{${edge.from.parallelism}} ~> ${edge.to.name}{${edge.to.parallelism}} in ${edge.toString}"
+      })
+    }
+    LOG.info(message+"\n{ \n\t" + graphStr.mkString("\n\t") + "\n}")
+  }
+  
+  def printDotDigraph(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any, Any]], title:String = "dag", message:String = "Print DOT digraph (copy and visualize with http://www.webgraphviz.com/)"): String = {
+    val graphStr = ListBuffer[String]()
+    val iter = dag.iterator()
+    while (iter.hasNext) {
+      val current = iter.next()
+      dag.outgoingEdgesOf(current).foreach(edge => {
+        graphStr += s""""${edge.from.name} x ${edge.from.parallelismNum}" -> "${edge.to.name} x ${edge.from.parallelismNum}" [label = "$edge"];"""
+      })
+    }
+    val dotDigraph = s"""digraph $title { \n\t${graphStr.mkString("\n\t")} \n}"""
+    LOG.info(s"""$message\n\n$dotDigraph\n""")
+    dotDigraph
+  }
+}
\ No newline at end of file



Mime
View raw message