eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [06/13] incubator-eagle git commit: EAGLE-341 clean inner process alert engine code clean inner process alert engine code
Date Sun, 14 Aug 2016 06:23:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
index c511484..444559a 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
@@ -22,7 +22,7 @@ import com.typesafe.config.Config
 
 trait StreamContextAdapter{
   def submit(context:StreamContext):Unit = {
-    execute(context.build)
+    execute(null)
   }
   def execute(dag: StreamDAG)
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
deleted file mode 100644
index 9564a0d..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.Config
-import org.apache.eagle.dataproc.impl.aggregate.AggregateExecutorFactory
-import org.apache.eagle.datastream.FlatMapper
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-import scala.collection.JavaConversions.asScalaSet
-import scala.collection.mutable.ListBuffer
-
-/**
- * The expansion job for stream analyze
- * 
- * TODO : should re-use flow with stream alert expansion, make code cleaner
- */
-class StreamAggregateExpansion(config: Config) extends StreamAlertExpansion(config) {
-
-  override def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any, Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
-    dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any, Any]], current: StreamProducer[Any],
-    child: StreamProducer[Any]): Unit = {
-    child match {
-      case AggregateProducer(upStreamNames, analyzerId, cepQl, strategy) => {
-        /**
-         * Rewrite the tree to add output field wrapper since policy executors accept only fixed tuple format 
-         */
-        val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames)
-
-        val analyzeExecutors = if (cepQl != null) {
-          AggregateExecutorFactory.Instance.createExecutors(upStreamNames, cepQl)
-        } else {
-          AggregateExecutorFactory.Instance.createExecutors(config, upStreamNames, analyzerId)
-        }
-
-        analyzeExecutors.foreach(exec => {
-          val t = FlatMapProducer(exec.asInstanceOf[FlatMapper[Any]]).initWith(dag,config, hook = false).nameAs(exec.getExecutorId + "_" + exec.getPartitionSeq).stream(child.stream)
-
-          // connect with previous
-          if (strategy == null) {
-            newStreamProducers.foreach(s => toBeAddedEdges += StreamConnector(s, t))
-          } else {
-            newStreamProducers.foreach(s => toBeAddedEdges += StreamConnector(s, t, strategy))
-          }
-
-          // connect with next
-          val outgoingEdges = dag.outgoingEdgesOf(child)
-          outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(t, e.to, e))
-        })
-        
-        // remote current child
-        toBeRemovedVertex += child
-      }
-      case _ => 
-    }
-  }
-}
-
-object StreamAggregateExpansion{
-  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamAggregateExpansion ={
-    val e = new StreamAggregateExpansion(config)
-    e.expand(dag)
-    e
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
deleted file mode 100644
index 618bba3..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
-
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import java.util
-
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity
-import org.apache.eagle.alert.executor.AlertExecutorCreationUtils
-import org.apache.eagle.policy.common.Constants
-import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl
-
-import scala.collection.JavaConversions.asScalaSet
-import scala.collection.mutable.ListBuffer
-import org.apache.eagle.datastream.JavaStormExecutorForAlertWrapper
-import org.apache.eagle.datastream.JavaStormStreamExecutor
-import org.apache.eagle.datastream.StormStreamExecutor
-import org.apache.eagle.datastream.storm.StormExecutorForAlertWrapper
-import org.apache.eagle.datastream.utils.AlertExecutorConsumerUtils
-import org.apache.eagle.service.client.EagleServiceConnector
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-import com.typesafe.config.Config
-
-/**
- * The constraints for alert is:
- * 1. only 3 StreamProducers can be put immediately before MapProducer, FlatMapProducer, StreamUnionProducer
- * 2. For StreamUnionProducer, the only supported unioned producers are MapProducer and FlatMapProducer
- * 3. the output for MapProducer and FlatMapProducer is 2-field tuple, key and value, key is string, value has to be SortedMap
- * 4. the framework will wrapper original MapProducer and FlatMapProducer to emit 3-field tuple, {key, streamName and value}
- * 5. the framework will automatically partition traffic with first field
- *
- *
- * 2 steps
- * step 1: wrapper previous StreamProducer with one more field "streamName"
- * step 2: partition alert executor by policy partitioner class
- */
-
-case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
-  val LOG = LoggerFactory.getLogger(classOf[StreamAlertExpansion])
-  import StreamAlertExpansion._
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): Unit ={
-    val iter = dag.iterator()
-    val toBeAddedEdges = new ListBuffer[StreamConnector[Any,Any]]
-    val toBeRemovedVertex = new ListBuffer[StreamProducer[Any]]
-    while(iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        val child = edge.to
-        onIteration(toBeAddedEdges, toBeRemovedVertex, dag, current, child)
-      })
-    }
-    // add back edges
-    toBeAddedEdges.foreach(e => {
-      dag.addVertex(e.from)
-      dag.addVertex(e.to)
-      dag.addEdge(e.from, e.to, e)
-    })
-    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
-  }
-
-  def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]], 
-               dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], child: StreamProducer[Any]): Unit = {
-    child match {
-      case AlertStreamProducer(upStreamNames, alertExecutorId, withConsumer,strategy) => {
-        /**
-         * step 1: wrapper previous StreamProducer with one more field "streamName"
-         * for AlertStreamSink, we check previous StreamProducer and replace that
-         */
-        val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames)
-
-        /**
-         * step 2: partition alert executor by policy partitioner class
-         */
-        val alertExecutors = AlertExecutorCreationUtils.createAlertExecutors(config,
-          new PolicyDefinitionEntityDAOImpl[AlertDefinitionAPIEntity](new EagleServiceConnector(config), Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME),
-          upStreamNames, alertExecutorId)
-        var alertProducers = new scala.collection.mutable.MutableList[StreamProducer[Any]]
-        alertExecutors.foreach(exec => {
-          val t = FlatMapProducer(exec).nameAs(exec.getExecutorId + "_" + exec.getPartitionSeq).initWith(dag,config, hook = false)
-          alertProducers += t
-          newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector[Any,Any](newsp, t,Seq(0)))
-          if (strategy == null) {
-             newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp,t,Seq(0)))
-          }
-          else {
-            newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp,t,strategy))
-          }
-        })
-
-        // remove AlertStreamSink
-        toBeRemovedVertex += child
-
-        // add alert consumer if necessary
-        if (withConsumer) {
-          AlertExecutorConsumerUtils.setupAlertConsumers(toBeAddedEdges, alertProducers.toList)
-        }
-      }
-      case _ =>
-    }
-  }
-
-  protected def rewriteWithStreamOutputWrapper(current: org.apache.eagle.datastream.core.StreamProducer[Any], dag: org.jgrapht.experimental.dag.DirectedAcyclicGraph[org.apache.eagle.datastream.core.StreamProducer[Any],org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeAddedEdges: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeRemovedVertex: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamProducer[Any]], upStreamNames: java.util.List[String]) = {
-    if(upStreamNames == null) throw new NullPointerException("upStreamNames is null")
-
-    /**
-     * step 1: wrapper previous StreamProducer with one more field "streamName"
-     * for AlertStreamSink, we check previous StreamProducer and replace that
-     */
-    val newStreamProducers = new ListBuffer[StreamProducer[Any]]
-    current match {
-      case StreamUnionProducer(others) => {
-        val incomingEdges = dag.incomingEdgesOf(current)
-        incomingEdges.foreach(e => newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, e.from, upStreamNames.get(0)))
-        var i: Int = 1
-        others.foreach(o => {
-          newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, o, upStreamNames.get(i))
-          i += 1
-        })
-      }
-      case p: FlatMapProducer[AnyRef, AnyRef] => {
-        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(p,upStreamNames))
-      }
-      case p: MapperProducer[AnyRef,AnyRef] => {
-        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(p,upStreamNames))
-      }
-      case s: StreamProducer[AnyRef] if dag.inDegreeOf(s) == 0 => {
-        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(s,upStreamNames))
-      }
-      case p@_ => throw new IllegalStateException(s"$p can not be put before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and MapProducer are supported")
-    }
-    newStreamProducers
-  }
-
-
-  protected def replace(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
-                      dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], upStreamName: String) : StreamProducer[Any]= {
-    var newsp: StreamProducer[Any] = null
-    current match {
-      case _: FlatMapProducer[AnyRef, AnyRef] => {
-        val mapper = current.asInstanceOf[FlatMapProducer[_, _]].mapper
-        mapper match {
-          case a: JavaStormStreamExecutor[AnyRef] => {
-            val newmapper = new JavaStormExecutorForAlertWrapper(a.asInstanceOf[JavaStormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
-            newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false).stream(current.streamId)
-          }
-          case b: StormStreamExecutor[AnyRef] => {
-            val newmapper = StormExecutorForAlertWrapper(b.asInstanceOf[StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
-            newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false).stream(current.streamId)
-          }
-          case _ => throw new IllegalArgumentException
-        }
-        // remove old StreamProducer and replace that with new StreamProducer
-        val incomingEdges = dag.incomingEdgesOf(current)
-        incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp))
-        val outgoingEdges = dag.outgoingEdgesOf(current)
-        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to))
-        toBeRemovedVertex += current
-      }
-      case _: MapperProducer[Any,Any] => {
-        val mapper = current.asInstanceOf[MapperProducer[Any,Any]].fn
-        val newfun: (Any => Any) = { a =>
-          val result = mapper(a)
-          result match {
-            case scala.Tuple1(x1) => (null, upStreamName, x1)
-            case scala.Tuple2(x1, x2) => (x1, upStreamName, x2)
-            case scala.Tuple3(_, _, _) => result
-            case _ => throw new IllegalArgumentException(s"Illegal message :$result, Tuple1/Tuple2/Tuple3 are supported")
-          }
-        }
-        current match {
-          case MapperProducer(_, fn) => newsp = MapperProducer(3, newfun).initWith(dag,config,hook = false).stream(current.stream)
-          case _ => throw new IllegalArgumentException(s"Illegal producer $current")
-        }
-        val incomingEdges = dag.incomingEdgesOf(current)
-        incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp))
-        val outgoingEdges = dag.outgoingEdgesOf(current)
-        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to))
-        toBeRemovedVertex += current
-      }
-      case s: StreamProducer[Any] if dag.inDegreeOf(s) == 0 => {
-        val fn:(AnyRef => AnyRef) = {
-          n => {
-            n match {
-              case scala.Tuple3 => n
-              case scala.Tuple2(x1,x2) => (x1,upStreamName,x2)
-              case scala.Tuple1(x1) => (if(x1 == null) null else x1.hashCode(),upStreamName,x1)
-              case _ => (if(n == null) null else n.hashCode(),upStreamName,n)
-            }
-          }
-        }
-        newsp = MapperProducer(3,fn).initWith(dag,config,hook = false).stream(s.stream)
-        toBeAddedEdges += StreamConnector(current,newsp)
-        val outgoingEdges = dag.outgoingEdgesOf(current)
-        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp,e.to))
-      }
-      case _ => throw new IllegalArgumentException("Only FlatMapProducer and MapProducer can be replaced before AlertStreamSink")
-    }
-    newsp
-  }
-}
-
-object StreamAlertExpansion{
-  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamAlertExpansion ={
-    val e = StreamAlertExpansion(config)
-    e.expand(dag)
-    e
-  }
-
-  /**
-    * Try upStreamNames firstly, otherwise try producer.streamId
-    *
-    * @param producer
-    * @param upStreamNames
-    * @return
-    */
-  private def recognizeSingleStreamName(producer: StreamProducer[AnyRef],upStreamNames:util.List[String]):String = {
-    if(upStreamNames == null){
-      producer.streamId
-    }else if(upStreamNames.size()>1){
-      if(producer.streamId == null) {
-        if (upStreamNames.size() > 1)
-          throw new IllegalStateException("Too many (more than 1) upStreamNames " + upStreamNames + " given for " + producer)
-        else
-          upStreamNames.get(0)
-      } else {
-        producer.streamId
-      }
-    } else if(upStreamNames.size() == 1){
-      upStreamNames.get(0)
-    }else {
-      if(producer.streamId == null){
-        throw new IllegalArgumentException("No stream name found for "+producer)
-      } else
-        producer.streamId
-    }
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
index 6e21bcc..fcd89c0 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
@@ -18,17 +18,17 @@ package org.apache.eagle.datastream.core
 
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.eagle.dataproc.util.ConfigOptionParser
-import org.apache.eagle.datastream.ExecutionEnvironments
-import org.apache.eagle.datastream.utils.GraphPrinter
 import org.jgrapht.experimental.dag.DirectedAcyclicGraph
 
+import scala.reflect
 import scala.reflect.runtime.universe._
 
 trait StreamContextBuilder extends StreamSourceBuilder {
   def config:Configuration
   /**
    * Business logic DAG
-   * @return
+    *
+    * @return
    */
   def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]
   /**
@@ -48,31 +48,23 @@ class StreamContext(private val conf:Config) extends StreamContextBuilder{
   private val _config:Configuration = Configuration(conf)
   override def dag = _dag
   override def config = _config
-  override def build: StreamDAG = {
-    implicit val i_conf = _config.get
-    StreamNameExpansion()
-    GraphPrinter.print(dag,message="Before expanded DAG ")
-    StreamAggregateExpansion()
-    StreamAlertExpansion()
-    StreamUnionExpansion()
-    StreamGroupbyExpansion()
-    StreamParallelismConfigExpansion()
-    StreamNameExpansion()
-    GraphPrinter.print(dag,message="After expanded DAG ")
-    GraphPrinter.printDotDigraph(dag)
-    StreamDAGTransformer.transform(dag)
+
+  def build:StreamDAG = {
+    null
   }
 
+
   override def submit(env: ExecutionEnvironment): Unit = {
     env.submit(this)
   }
 
-  override def submit(clazz: Class[ExecutionEnvironment]): Unit = {
-    ExecutionEnvironments.get(clazz,conf).submit(this)
+  def submit[E<:ExecutionEnvironment](implicit typeTag:TypeTag[E]):Unit = {
+
   }
 
-  override def submit[E <: ExecutionEnvironment](implicit typeTag: TypeTag[E]): Unit = {
-    ExecutionEnvironments.getWithConfig[E](conf).submit(this)
+
+  override def submit(clazz: Class[ExecutionEnvironment]): Unit = {
+
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala
deleted file mode 100644
index ce9d82c..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream.core
-
-import org.apache.eagle.partition.PartitionStrategy
-
-abstract class StreamConnector[+T1 <: Any,+T2 <: Any](val from: StreamProducer[T1], val to: StreamProducer[T2]) extends Serializable
-
-case class ShuffleConnector[+T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2])
-  extends StreamConnector[T1,T2](from,to){
-  override def toString: String = "shuffleGroup"
-}
-
-case class GroupbyFieldsConnector[+T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2],groupByFields : Seq[Int])
-  extends StreamConnector[T1,T2](from,to){
-  override def toString: String = s"groupByFields( $groupByFields )"
-}
-
-case class GroupbyKeyConnector[T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2],keySelector: T1 => Any)
-  extends StreamConnector[T1,T2](from,to){
-  override def toString: String = s"groupByKey($keySelector)"
-}
-
-case class GroupbyStrategyConnector[+T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2],customGroupBy:PartitionStrategy)
-  extends StreamConnector[T1,T2](from,to){
-  override def toString: String = s"groupByStrategy( $customGroupBy )"
-}
-
-object StreamConnector{
-  /**
-   *
-   * @param from
-   * @param to
-   * @tparam T1
-   * @tparam T2
-   * @return
-   */
-  def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2]):ShuffleConnector[T1,T2] = ShuffleConnector(from,to)
-
-  /**
-   * Clone connector from old connector to apply to new processing element, return ShuffleConnector by default
-   *
-   * @param from
-   * @param to
-   * @param connector
-   * @tparam T1
-   * @tparam T2
-   * @return
-   */
-  def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2],connector: StreamConnector[Any,Any]):StreamConnector[T1,T2] = connector match {
-      case GroupbyFieldsConnector(_,_,fields) => GroupbyFieldsConnector[T1,T2](from,to,fields)
-      case GroupbyKeyConnector(_,_,keySelector) => GroupbyKeyConnector[T1,T2](from,to,keySelector)
-      case GroupbyStrategyConnector(_,_,strategy) => GroupbyStrategyConnector[T1,T2](from,to,strategy)
-      case null | ShuffleConnector(_,_) => ShuffleConnector[T1,T2](from,to)
-      case c@_ => throw new IllegalArgumentException(s"Unknown type of stream connector $c")
-  }
-
-  /**
-   *
-   * @param from
-   * @param to
-   * @param groupByFields
-   * @tparam T1
-   * @tparam T2
-   * @return
-   */
-  def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2],groupByFields : Seq[Int]):GroupbyFieldsConnector[T1,T2] = GroupbyFieldsConnector(from,to,groupByFields)
-
-  /**
-   *
-   * @param from
-   * @param to
-   * @param customGroupBy
-   * @tparam T1
-   * @tparam T2
-   * @return
-   */
-  def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2],customGroupBy: PartitionStrategy):GroupbyStrategyConnector[T1,T2] = GroupbyStrategyConnector(from,to,customGroupBy)
-
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
deleted file mode 100644
index 7845740..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream.core
-
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-import scala.collection.JavaConverters._
-import scala.collection.{JavaConversions, mutable}
-
-/**
- * wrapper of DAG, used for storm topology compiler
- */
-class StreamDAG(val graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) extends StreamProducerGraph {
-  var nodeMap: mutable.Map[String, StreamProducer[Any]] = mutable.Map[String,StreamProducer[Any]]()
-  graph.iterator().asScala.foreach(p=> nodeMap.put(p.name,p))
-
-  override def addEdge(from: StreamProducer[Any], to: StreamProducer[Any], streamConnector: StreamConnector[Any,Any]): Unit = {
-    graph.addEdge(from, to, streamConnector)
-  }
-
-  override def addVertex(producer: StreamProducer[Any]): Unit = {
-    graph.addVertex(producer)
-    nodeMap.put(producer.name,producer)
-  }
-
-  override def iterator(): Iterator[StreamProducer[Any]] = {
-    JavaConversions.asScalaIterator(graph.iterator())
-  }
-
-  override def isSource(v: StreamProducer[Any]): Boolean = {
-    graph.inDegreeOf(v) match {
-      case 0 => true
-      case _ => false
-    }
-  }
-
-  override def outgoingEdgesOf(v: StreamProducer[Any]): scala.collection.mutable.Set[StreamConnector[Any,Any]] = {
-    JavaConversions.asScalaSet(graph.outgoingEdgesOf(v))
-  }
-
-  override def getNodeByName(name: String): Option[StreamProducer[Any]] = {
-    nodeMap.get(name)
-  }
-
-  def setNodeMap(nodeMap: mutable.Map[String, StreamProducer[Any]]): Unit = {
-    this.nodeMap = nodeMap
-  }
-
-  override def incomingVertexOf(v: StreamProducer[Any]): scala.collection.mutable.Set[StreamProducer[Any]] = {
-    val set = mutable.Set[StreamProducer[Any]]()
-    graph.incomingEdgesOf(v).asScala.foreach(e => set += graph.getEdgeSource(e))
-    set
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala
deleted file mode 100644
index fcff639..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.Config
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-abstract class StreamDAGExpansion(config: Config) {
-  def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]])
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
deleted file mode 100644
index 6b20bf2..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-import scala.collection.mutable
-
-/**
- * convert generic DAG data structure to Storm specific DAG data structure for easy topology compiler
- */
-object StreamDAGTransformer {
-  /**
-   * Transform DirectedAcyclicGraph[StreamProducer, StreamConnector] into StormStreamDAG
-   *
-   * @param dag DirectedAcyclicGraph[StreamProducer, StreamConnector]
-   * @return StormStreamDAG
-   */
-  @deprecated("Use StreamDAG(dag) will transform directly")
-  def transform(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) : StreamDAG = {
-    val stormDAG = new StreamDAG(dag)
-    val nodeMap = mutable.HashMap[String, StreamProducer[Any]]()
-    val iter = dag.iterator()
-    while(iter.hasNext){
-      val sp = iter.next()
-      nodeMap.put(sp.name, sp)
-    }
-    stormDAG.setNodeMap(nodeMap)
-    stormDAG
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala
deleted file mode 100644
index 1a07e3f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.Config
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
-
-/**
- * Replace GroupByProducer(Vertex) with StreamConnector (Edge)
- *
- * For example as to Storm, it's mainly for grouping method
- *
- * @param config context configuration
- */
-case class StreamGroupbyExpansion(config: Config) extends StreamDAGExpansion(config){
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = {
-    val iter = dag.iterator()
-    var toBeAddedEdges = new ListBuffer[StreamConnector[Any,Any]]
-    var toBeRemovedVertex = new ListBuffer[StreamProducer[Any]]
-    while(iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        val child = edge.to
-        child match {
-          case p : GroupByProducer[Any] => {
-            dag.outgoingEdgesOf(p).foreach(c2 => {
-              p match {
-                case GroupByFieldProducer(fields) =>
-                  toBeAddedEdges += GroupbyFieldsConnector(current, c2.to,fields)
-                case GroupByStrategyProducer(strategy) =>
-                  toBeAddedEdges += GroupbyStrategyConnector(current, c2.to,strategy)
-                case GroupByKeyProducer(keySelector) =>
-                  current.outKeyed = true
-                  current.keySelector = KeySelectorWrapper(keySelector)
-                  c2.to.inKeyed = true
-                  toBeAddedEdges += GroupbyKeyConnector(current, c2.to,keySelector)
-                case _ => toBeAddedEdges += ShuffleConnector(current, c2.to)
-              }
-            })
-            toBeRemovedVertex += p
-          }
-          case _ =>
-        }
-      })
-    }
-
-    // add back edges
-    toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e))
-    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
-  }
-}
-
-object StreamGroupbyExpansion{
-  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamGroupbyExpansion ={
-    val e = StreamGroupbyExpansion(config)
-    e.expand(dag)
-    e
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala
deleted file mode 100644
index 4bc1812..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.Config
-import org.apache.eagle.datastream.utils.NodeNameSelector
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-/**
- * to set name for each StreamProducer
- * 1. if name is given programatically, then use this name
- * 2. otherwise use name generated by scala internally
- */
-case class StreamNameExpansion(config: Config) extends StreamDAGExpansion(config){
-  val LOG = LoggerFactory.getLogger(classOf[StreamNameExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = {
-    val iter = dag.iterator()
-    while(iter.hasNext){
-      val sp = iter.next()
-      sp.name = NodeNameSelector(sp).getName
-    }
-  }
-}
-
-
-object StreamNameExpansion{
-  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamNameExpansion ={
-    val e = StreamNameExpansion(config)
-    e.expand(dag)
-    e
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
deleted file mode 100644
index 8699da6..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import java.util.regex.Pattern
-
-import com.typesafe.config.{Config, ConfigObject, ConfigValue}
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-case class StreamParallelismConfigExpansion(config: Config) extends StreamDAGExpansion(config){
-  val LOG = LoggerFactory.getLogger(classOf[StreamParallelismConfigExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = {
-    val map = getParallelismMap(config)
-    val iter = dag.iterator()
-    while(iter.hasNext){
-      val streamProducer = iter.next()
-      if(streamProducer.name != null) {
-        map.foreach(tuple => {
-          tuple._1.matcher(streamProducer.name).find() match {
-            case true => streamProducer.parallelism(tuple._2)
-            case false =>
-          }
-        })
-      }
-    }
-  }
-
-  private def getParallelismMap(config: Config) : Map[Pattern, Int]= {
-    if(config.hasPath("envContextConfig.parallelismConfig")) {
-      val parallelismConfig: ConfigObject = config.getObject("envContextConfig.parallelismConfig")
-      parallelismConfig.asScala.toMap map {
-        case (name, value) => (Pattern.compile(name), value.asInstanceOf[ConfigValue].unwrapped().asInstanceOf[Int])
-      }
-    }else{
-      Map[Pattern,Int]()
-    }
-  }
-}
-
-object StreamParallelismConfigExpansion{
-  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamParallelismConfigExpansion ={
-    val e = StreamParallelismConfigExpansion(config)
-    e.expand(dag)
-    e
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
index 6445643..33cc2dd 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
@@ -173,16 +173,6 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
     ret
   }
 
-  /**
-   * alert is always sink of data flow
-   */
-  def alertWithConsumer(upStreamNames: util.List[String], alertExecutorId : String) = {
-    alert(upStreamNames.asScala, alertExecutorId,consume = true)
-  }
-
-  def alertWithoutConsumer(upStreamNames: util.List[String], alertExecutorId : String) = {
-    alert(upStreamNames.asScala, alertExecutorId, consume = false)
-  }
 
   override def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean=true, strategy : PartitionStrategy=null):AlertStreamProducer = {
     val ret = AlertStreamProducer(upStreamNames, alertExecutorId, consume, strategy)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala
deleted file mode 100644
index f3fcc4d..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream.core
-
-trait StreamProducerGraph {
-  def addEdge(from: StreamProducer[Any], to: StreamProducer[Any], streamConnector: StreamConnector[Any,Any])
-  def addVertex(producer: StreamProducer[Any])
-  def iterator() : Iterator[StreamProducer[Any]]
-  def isSource(v : StreamProducer[Any]) : Boolean
-  def outgoingEdgesOf(v : StreamProducer[Any]) : scala.collection.mutable.Set[StreamConnector[Any,Any]]
-  def getNodeByName(name : String) : Option[StreamProducer[Any]]
-  def incomingVertexOf(v: StreamProducer[Any]) : scala.collection.mutable.Set[StreamProducer[Any]]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
deleted file mode 100644
index b54b21f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.Config
-import org.apache.commons.lang3.builder.HashCodeBuilder
-import org.apache.eagle.datastream.{Collector, FlatMapper}
-import org.apache.eagle.partition.PartitionStrategy
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-/**
- * StreamInfo should be fully serializable and having not runtime type information
- */
-class StreamInfo  extends Serializable{
-  /**
-   * Processing Element Id
-   */
-  val id:Int = UniqueId.incrementAndGetId()
-
-  /**
-   * Processing Element Name
-   */
-  var name: String = null
-
-  /**
-    * Output stream id, equals to name by default
-    */
-  var streamId:String=null
-
-  var parallelismNum: Int = 1
-
-  /**
-   * Keyed input stream
-   */
-  var inKeyed:Boolean = false
-  /**
-   * Keyed output stream
-   */
-  var outKeyed:Boolean = false
-  /**
-   * Output key selector
-   */
-  var keySelector:KeySelector = null
-
-// Type Information
-// ================
-//
-//  /**
-//   * Entity class type of T
-//   */
-//  var typeClass:Class[_] = null
-//
-//  /**
-//   * Type Class Simple Name
-//   * @return
-//   */
-//  def typeClassName = if(typeClass == null) null else typeClass.getSimpleName
-//
-//  @transient private var _typeTag[_] = null
-//
-//  def typeTag[_] = {
-//    if(_typeTag == null) _typeTag = Reflections.typeTag(this.typeClass)
-//    _typeTag
-//  }
-
-  var config: Config = null
-
-  def getInfo = this
-
-  override def hashCode(): Int = new HashCodeBuilder().append(this.id).append(this.getClass).toHashCode
-}
-
-
-object StorageType extends Enumeration {
-  type StorageType = Value
-  val KAFKA, DRUID, HBASE = Value
-}
-
-/**
- * Stream interaction protocol interface
- *
- * @tparam T processed elements type
- */
-trait StreamProtocol[+T <: Any]{
-  /**
-   * Initialize the stream metadata info
-   */
-  def initWith(graph:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]],config:Config, hook:Boolean = true):StreamProducer[T]
-
-  /**
-   * Support Java API
-   *
-   * @param flatMapper
-   * @tparam R
-   * @return
-   */
-  def flatMap[R](flatMapper:FlatMapper[R]): StreamProducer[R]
-  def flatMap[R](func:(Any,Collector[R])=>Unit): StreamProducer[R]
-
-  /**
-   *
-   * @param fn
-   * @return
-   */
-  def filter(fn : T => Boolean): StreamProducer[T]
-
-  /**
-   *
-   * @param fn
-   */
-  def foreach(fn : T => Unit) : Unit
-
-  /**
-   * Type safe mapper
-   * @param fn
-   * @tparam R
-   * @return
-   */
-  def map[R](fn : T => R): StreamProducer[R]
-
-  /**
-   * Field base mapper
-   * @param fn
-   * @tparam R
-   * @return
-   */
-  def map1[R](fn : T => R) : StreamProducer[R]
-  def map2[R](fn : T => R) : StreamProducer[R]
-  def map3[R](fn : T => R) : StreamProducer[R]
-  def map4[R](fn : T => R) : StreamProducer[R]
-
-  def groupBy(fields : Int*) : StreamProducer[T]
-  def groupBy(fields : java.util.List[Integer]) : StreamProducer[T]
-  def groupBy(strategy : PartitionStrategy) : StreamProducer[T]
-
-  /**
-   * @param keyer key selector function
-   * @return
-   */
-  def groupByKey(keyer:T => Any):StreamProducer[T]
-
-  def streamUnion[T2,T3](otherStreams : Seq[StreamProducer[T2]]) : StreamProducer[T3]
-  def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean,strategy : PartitionStrategy):AlertStreamProducer
-
-  def aggregate(upStreamNames: java.util.List[String], executorId :String, strategy:PartitionStrategy): StreamProducer[T]
-
-  def aggregateDirect(upStreamNames: java.util.List[String], cql : String, strategy:PartitionStrategy): StreamProducer[T]
-
-  def persist(executorId : String, storageType: StorageType.StorageType): StreamProducer[T]
-  
-  /**
-   * Set processing element parallelism setting
-   * @param parallelismNum parallelism value
-   * @return
-   */
-  def parallelism(parallelismNum : Int) : StreamProducer[T]
-  def parallelism : Int
-  /**
-   * Set component name
-   *
-   * @param componentName
-   * @return
-   */
-  def nameAs(componentName : String) : StreamProducer[T]
-
-  /**
-   * Set stream name
-   * @param streamId stream ID
-   * @return
-   */
-  def stream(streamId: String): StreamProducer[T]
-  def stream: String
-
-  def ? (fn:T => Boolean):StreamProducer[T] = this.filter(fn)
-  def ~>[R](flatMapper : FlatMapper[R]) = this.flatMap[R](flatMapper)
-  def ! (upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean = true,strategy: PartitionStrategy = null) = alert(upStreamNames, alertExecutorId, consume,strategy)
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
deleted file mode 100644
index 5f3bd22..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.core
-
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-import scala.reflect.runtime.{universe => ru}
-
-/**
- * @since  12/7/15
- */
-trait StreamSourceBuilder {
-  def config:Configuration
-
-  /**
-   * Business logic DAG
-   * @return
-   */
-  def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]
-
-  /**
-   *
-   * @param iterable top level Iterable interface
-   * @param recycle
-   * @tparam T
-   * @return
-   */
-  def from[T:ru.TypeTag](iterable: Iterable[T],recycle:Boolean = false):IterableStreamProducer[T]={
-    val p = IterableStreamProducer[T](iterable,recycle)
-    p.initWith(dag,config.get)
-    p
-  }
-
-  def from[T:ru.TypeTag](iterator: Iterator[T],recycle:Boolean):IteratorStreamProducer[T]={
-    val p = IteratorStreamProducer[T](iterator)
-    p.initWith(dag,config.get)
-    p
-  }
-
-  def from(product: Product):IteratorStreamProducer[Any]={
-    val p = IteratorStreamProducer[Any](product.productIterator)
-    p.initWith(dag,config.get)
-    p
-  }
-
-  def register[T](producer:StreamProducer[T]):Unit = {
-    producer.initWith(dag,config.get)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala
deleted file mode 100644
index 351782b..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.Config
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
-
-/**
- * union operator should be expanded
- */
-case class StreamUnionExpansion(config: Config) extends StreamDAGExpansion(config){
-  val LOG = LoggerFactory.getLogger(classOf[StreamUnionExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = {
-    val iter = dag.iterator()
-    var toBeAddedEdges = new ListBuffer[StreamConnector[Any,Any]]
-    var toBeRemovedVertex = new ListBuffer[StreamProducer[Any]]
-    while(iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        val child = edge.to
-        child match {
-          case StreamUnionProducer(others) => {
-            dag.outgoingEdgesOf(child).foreach(c2 => {
-              toBeAddedEdges += StreamConnector(current, c2.to, edge)
-              others.foreach(o => {
-                toBeAddedEdges += StreamConnector(o, c2.to, edge)
-              })
-            })
-            toBeRemovedVertex += child
-          }
-          case _ =>
-        }
-      })
-    }
-
-    // add back edges
-    toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e))
-    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
-  }
-}
-
-object StreamUnionExpansion{
-  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamUnionExpansion ={
-    val e = StreamUnionExpansion(config)
-    e.expand(dag)
-    e
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
deleted file mode 100644
index 64b5f0f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.task.{OutputCollector, TopologyContext}
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.tuple.{Fields, Tuple}
-import org.apache.eagle.datastream.core.StreamInfo
-import org.apache.eagle.datastream.utils.NameConstants
-import org.slf4j.LoggerFactory
-
-/**
- *
- * @param fieldsNum zero-fieldsNum may means something different
- * @param ack
- * @param streamInfo
- * @tparam T
- */
-abstract class AbstractStreamBolt[T](val fieldsNum:Int=1, val ack:Boolean = true)(implicit streamInfo:StreamInfo) extends BaseRichBolt{
-  private var _collector: OutputCollector = null
-  private val LOG = LoggerFactory.getLogger(classOf[AbstractStreamBolt[T]])
-
-  /**
-   * If outKeyed then
-   *  Fields = ("key","value"]
-   * elsif num > 0
-   *  Fields = ["f0","f1",..,"fn"]
-   * elsif num == 0
-   *  Fields = ["f0"]
-   * end
-   *
-   * @param declarer
-   */
-  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
-    if(streamInfo.outKeyed) {
-      declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE))
-    }else{
-      if(fieldsNum > 0) {
-        val fields = new util.ArrayList[String]()
-        var i: Int = 0
-        while (i < fieldsNum) {
-          fields.add(NameConstants.FIELD_PREFIX + i)
-          i += 1
-        }
-        declarer.declare(new Fields(fields))
-      }else if(fieldsNum == 0){
-        declarer.declare(new Fields(NameConstants.FIELD_PREFIX + 0))
-      }
-    }
-  }
-
-  def emit(values:util.List[AnyRef])(implicit input:Tuple){
-    if (streamInfo.outKeyed) {
-      _collector.emit(input, util.Arrays.asList(streamInfo.keySelector.key(values).asInstanceOf[AnyRef], values))
-    } else {
-      _collector.emit(input, values)
-    }
-  }
-
-  def emit(value:Any)(implicit input:Tuple){
-    if(streamInfo.outKeyed) {
-      _collector.emit(input, util.Arrays.asList(streamInfo.keySelector.key(value).asInstanceOf[AnyRef],value.asInstanceOf[AnyRef]))
-    }else{
-      _collector.emit(input,util.Arrays.asList(value.asInstanceOf[AnyRef]))
-    }
-  }
-
-  override def execute(input: Tuple): Unit = {
-    try {
-      implicit val _input = input
-      if (streamInfo.inKeyed) {
-        val key = input.getValueByField(NameConstants.FIELD_KEY)
-        val value = input.getValueByField(NameConstants.FIELD_VALUE).asInstanceOf[T]
-        onKeyValue(key, value)
-      } else {
-        onValues(input.getValues)
-      }
-      if(ack) _collector.ack(input)
-    }catch {
-      case t: Throwable => {
-        LOG.error(s"Got exception when processing $input",t)
-        _collector.fail(input)
-      }
-    }
-  }
-
-  /**
-   * Handle keyed stream value
-   */
-  def onKeyValue(key:Any,value:T)(implicit input:Tuple)
-
-  /**
-   * Handle general stream values list
-   *
-   * @param values
-   */
-  def onValues(values:util.List[AnyRef])(implicit input:Tuple)
-
-  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
-    _collector = collector
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala
deleted file mode 100644
index b175b41..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.tuple.Tuple
-import org.apache.eagle.datastream.core.StreamInfo
-
-case class FilterBoltWrapper(fn:Any => Boolean)(implicit info:StreamInfo) extends AbstractStreamBolt[Any](fieldsNum = 1){
-  /**
-   * Handle keyed stream value
-   */
-  override def onKeyValue(key: Any, value: Any)(implicit input:Tuple): Unit = {
-    if(fn(value)) emit(value)
-  }
-
-  /**
-   * Handle general stream values list
-   *
-   * @param values
-   */
-  override def onValues(values: util.List[AnyRef])(implicit input:Tuple): Unit = {
-    val value = values.get(0)
-    if(fn(value)) emit(value)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala
deleted file mode 100644
index 4c105b2..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.tuple.Tuple
-import org.apache.eagle.datastream.core.StreamInfo
-
-/**
- * @since  12/6/15
- */
-case class ForeachBoltWrapper(fn:Any=>Unit)(implicit info:StreamInfo) extends AbstractStreamBolt[Any]  {
-  /**
-   * Handle keyed stream value
-   * @param value
-   */
-  override def onKeyValue(key:Any,value: Any)(implicit input:Tuple): Unit = {
-    fn(value)
-  }
-
-  /**
-   * Handle non-keyed stream values list
-   *
-   * @param values
-   */
-  override def onValues(values: util.List[AnyRef])(implicit input:Tuple): Unit = {
-    fn(values)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
deleted file mode 100644
index c64ea83..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.spout.SpoutOutputCollector
-import backtype.storm.task.TopologyContext
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichSpout
-import backtype.storm.tuple.Fields
-import backtype.storm.utils.Utils
-import org.apache.eagle.datastream.core.StreamInfo
-import org.apache.eagle.datastream.utils.NameConstants
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-/**
- * @since  12/6/15
- */
-case class IterableStreamSpout(iterable: Iterable[Any],recycle:Boolean = true)(implicit info:StreamInfo) extends BaseRichSpout {
-  val LOG = LoggerFactory.getLogger(classOf[IterableStreamSpout])
-  var _collector:SpoutOutputCollector=null
-  var _iterator:Iterator[Any] = null
-
-  override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
-    this._collector = collector
-    this._iterator = iterable.iterator
-  }
-
-  override def nextTuple(): Unit = {
-    if(_iterator.hasNext){
-      val current = _iterator.next().asInstanceOf[AnyRef]
-      if(info.outKeyed) {
-        _collector.emit(List(info.keySelector.key(current),current).asJava.asInstanceOf[util.List[AnyRef]])
-      }else{
-        _collector.emit(List(current).asJava)
-      }
-    }else if(recycle){
-      if(LOG.isDebugEnabled) LOG.debug("Recycling the iterator")
-      _iterator = iterable.iterator
-    }else{
-      if(LOG.isDebugEnabled) LOG.debug("No tuple left, sleep forever")
-      this.deactivate()
-      Utils.sleep(Long.MaxValue)
-    }
-  }
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
-    if(info.outKeyed) {
-      declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE))
-    }else{
-      declarer.declare(new Fields(s"${NameConstants.FIELD_PREFIX}0"))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala
deleted file mode 100644
index ea6d658..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.spout.SpoutOutputCollector
-import backtype.storm.task.TopologyContext
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichSpout
-import backtype.storm.tuple.Fields
-import backtype.storm.utils.Utils
-import org.apache.eagle.datastream.core.StreamInfo
-import org.apache.eagle.datastream.utils.NameConstants
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-case class IteratorStreamSpout(iterator: Iterator[Any])(implicit info:StreamInfo) extends BaseRichSpout {
-  val LOG = LoggerFactory.getLogger(classOf[IterableStreamSpout])
-  var _collector:SpoutOutputCollector=null
-  var _iterator:Iterator[Any] = null
-
-  override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
-    this._collector = collector
-    this._iterator = iterator
-  }
-
-  override def nextTuple(): Unit = {
-    if(_iterator.hasNext){
-      val current = _iterator.next().asInstanceOf[AnyRef]
-      if(info.outKeyed) {
-        _collector.emit(List(info.keySelector.key(current),current).asJava.asInstanceOf[util.List[AnyRef]])
-      }else{
-        _collector.emit(List(current).asJava)
-      }
-    }else{
-      LOG.info("No tuple left, sleep forever")
-      this.deactivate()
-      Utils.sleep(Long.MaxValue)
-    }
-  }
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
-    if(info.outKeyed) {
-      declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE))
-    }else{
-      declarer.declare(new Fields(s"${NameConstants.FIELD_PREFIX}0"))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala
deleted file mode 100644
index 802c782..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.task.{OutputCollector, TopologyContext}
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.tuple.{Fields, Tuple}
-import org.apache.eagle.datastream.{Collector, JavaStormStreamExecutor}
-import org.slf4j.LoggerFactory
-
-case class JavaStormBoltWrapper(worker : JavaStormStreamExecutor[AnyRef]) extends BaseRichBolt{
-  val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
-  var _collector : OutputCollector = null
-
-  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
-    _collector = collector
-    worker.init
-  }
-
-  override def execute(input : Tuple): Unit ={
-    worker.flatMap(input.getValues, new Collector[AnyRef](){
-      def collect(t: AnyRef): Unit ={
-        _collector.emit(input, StormWrapperUtils.productAsJavaList(t.asInstanceOf[Product]))
-      }
-    })
-    _collector.ack(input)
-  }
-
-  override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
-    val fields = worker.fields
-    LOG.info("output fields for worker " + worker + " : " + fields.toList)
-    declarer.declare(new Fields(fields:_*))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
deleted file mode 100644
index 19305fa..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.io.IOException
-import java.util
-import java.util.Properties
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer
-import org.slf4j.{Logger, LoggerFactory}
-
-/**
- * @since  11/6/15
- */
-case class JsonMessageDeserializer(props:Properties) extends SpoutKafkaMessageDeserializer{
-  private val objectMapper: ObjectMapper = new ObjectMapper
-  private val LOG: Logger = LoggerFactory.getLogger(classOf[JsonMessageDeserializer])
-
-  override def deserialize(bytes: Array[Byte]): AnyRef = {
-    var map: util.Map[String, _] = null
-    if(bytes.length == 0 || bytes == null){
-     if(LOG.isDebugEnabled) LOG.warn("Skip empty message")
-    }else {
-      try {
-        map = objectMapper.readValue(bytes, classOf[util.TreeMap[String, _]])
-      } catch {
-        case e: IOException => {
-          LOG.error("Failed to deserialize json from: " + new String(bytes), e)
-        }
-      }
-    }
-    map
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala
deleted file mode 100644
index 8c92590..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider
-import org.apache.eagle.datastream.ExecutionEnvironments
-
-class KafkaStreamMonitorApp extends App {
-  val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
-  val streamName = env.config.get[String]("eagle.stream.name","eventStream")
-  val streamExecutorId = env.config.get[String]("eagle.stream.executor",s"${streamName}Executor")
-  env.config.set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)
-  env.fromSpout(new KafkaSourcedSpoutProvider()).parallelism(1).nameAs(streamName) ! (Seq(streamName),streamExecutorId)
-  env.execute()
-}
-
-object KafkaStreamMonitor extends KafkaStreamMonitorApp
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala
deleted file mode 100644
index 1119e08..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.tuple.Tuple
-import org.apache.eagle.datastream.core.StreamInfo
-
-/**
- * @param num if num is zero, then means that it's using type-safe way, because to map operation, it must require at least one output field
- * @param fn
- * @param streamInfo
- */
-case class MapBoltWrapper(num: Int, fn: Any => Any)(implicit streamInfo: StreamInfo) extends AbstractStreamBolt[Any](fieldsNum = num){
-  /**
-   * Handle keyed stream value
-   */
-  override def onKeyValue(key: Any, value: Any)(implicit input:Tuple): Unit = {
-    emit(fn(value))
-  }
-
-  /**
-   * Handle general stream values list
-   *
-   * @param values
-   */
-  override def onValues(values: util.List[AnyRef])(implicit input:Tuple): Unit = {
-    val size = values.size()
-    if(size == 0) return
-    if(num == 0) {
-      emit(fn(values.get(0)))
-    } else {
-      var tuple: AnyRef = null
-      size match {
-        case 1 => tuple = scala.Tuple1[AnyRef](values.get(0))
-        case 2 => tuple = scala.Tuple2(values.get(0), values.get(1))
-        case 3 => tuple = scala.Tuple3(values.get(0), values.get(1), values.get(2))
-        case 4 => tuple = scala.Tuple4(values.get(0), values.get(1), values.get(2), values.get(3))
-        case _ => throw new IllegalArgumentException(s"Exceed max supported tuple size $size > 4")
-      }
-      val output = fn(tuple)
-      output match {
-        case scala.Tuple1(a) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef]))
-        case scala.Tuple2(a, b) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef]))
-        case scala.Tuple3(a, b, c) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef]))
-        case scala.Tuple4(a, b, c, d) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef], d.asInstanceOf[AnyRef]))
-        case a => emit(util.Arrays.asList(a.asInstanceOf[AnyRef]))
-      }
-    }
-  }
-}
\ No newline at end of file


Mime
View raw message