eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [36/51] [partial] incubator-eagle git commit: EAGLE-184 Migrate eagle website from https://github.com/eaglemonitoring/eaglemonitoring.github.io to document branch
Date Thu, 03 Mar 2016 18:10:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala
deleted file mode 100644
index 1e1664a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.core
-
-trait AbstractTopologyExecutor {
-  def execute
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala
deleted file mode 100644
index e3f3050..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.{Config, _}
-
-import scala.reflect.runtime.universe._
-
-/**
- * @since  12/4/15
- */
-case class Configuration(private var config:Config) extends Serializable{
-  def get:Config = config
-
-  def set[T<:AnyRef](key:String,value:T): Unit = {
-    config = config.withValue(key,ConfigValueFactory.fromAnyRef(value))
-  }
-
-  /**
-   *
-   * @param key config key
-   * @param default default value
-   * @tparam T return type
-   * @return
-   */
-  def get[T](key:String,default:T=null)(implicit tag:TypeTag[T]):T = {
-    if(get.hasPath(key)) {
-      get(key)
-    } else default
-  }
-
-  def get[T](key:String)(implicit tag: TypeTag[T]):T = tag.tpe match {
-    case STRING_TYPE => config.getString(key).asInstanceOf[T]
-    case TypeTag.Double => get.getDouble(key).asInstanceOf[T]
-    case TypeTag.Long => get.getLong(key).asInstanceOf[T]
-    case TypeTag.Int => get.getInt(key).asInstanceOf[T]
-    case TypeTag.Byte => get.getBytes(key).asInstanceOf[T]
-    case TypeTag.Boolean => get.getBoolean(key).asInstanceOf[T]
-    case NUMBER_TYPE => get.getNumber(key).asInstanceOf[T]
-    case OBJECT_TYPE => get.getObject(key).asInstanceOf[T]
-    case VALUE_TYPE => get.getValue(key).asInstanceOf[T]
-    case ANY_REF_TYPE => get.getAnyRef(key).asInstanceOf[T]
-    case INT_LIST_TYPE => get.getIntList(key).asInstanceOf[T]
-    case DOUBLE_LIST_TYPE => get.getDoubleList(key).asInstanceOf[T]
-    case BOOL_LIST_TYPE => get.getBooleanList(key).asInstanceOf[T]
-    case LONG_LIST_TYPE => get.getLongList(key).asInstanceOf[T]
-    case _ => throw new UnsupportedOperationException(s"$tag is not supported yet")
-  }
-
-  val STRING_TYPE = typeOf[String]
-  val NUMBER_TYPE = typeOf[Number]
-  val INT_LIST_TYPE = typeOf[List[Int]]
-  val BOOL_LIST_TYPE = typeOf[List[Boolean]]
-  val DOUBLE_LIST_TYPE = typeOf[List[Double]]
-  val LONG_LIST_TYPE = typeOf[List[Double]]
-  val OBJECT_TYPE = typeOf[ConfigObject]
-  val VALUE_TYPE = typeOf[ConfigValue]
-  val ANY_REF_TYPE = typeOf[AnyRef]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
deleted file mode 100644
index c511484..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.Config
-
-trait StreamContextAdapter{
-  def submit(context:StreamContext):Unit = {
-    execute(context.build)
-  }
-  def execute(dag: StreamDAG)
-}
-
-/**
- * TODO: Decouple execution environment with stream context
- *
- * @since 0.3.0
- */
-abstract class ExecutionEnvironment(private val conf:Config)
-  extends StreamContext(conf) with StreamContextAdapter     // Continue to support old API
-  with StreamSourceBuilder
-{
-  /**
-   * Start to execute
-   */
-  def execute():Unit = {
-    submit(this)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
deleted file mode 100644
index 9564a0d..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.Config
-import org.apache.eagle.dataproc.impl.aggregate.AggregateExecutorFactory
-import org.apache.eagle.datastream.FlatMapper
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-import scala.collection.JavaConversions.asScalaSet
-import scala.collection.mutable.ListBuffer
-
-/**
- * The expansion job for stream analyze
- * 
- * TODO : should re-use flow with stream alert expansion, make code cleaner
- */
-class StreamAggregateExpansion(config: Config) extends StreamAlertExpansion(config) {
-
-  override def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any, Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
-    dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any, Any]], current: StreamProducer[Any],
-    child: StreamProducer[Any]): Unit = {
-    child match {
-      case AggregateProducer(upStreamNames, analyzerId, cepQl, strategy) => {
-        /**
-         * Rewrite the tree to add output field wrapper since policy executors accept only fixed tuple format 
-         */
-        val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames)
-
-        val analyzeExecutors = if (cepQl != null) {
-          AggregateExecutorFactory.Instance.createExecutors(upStreamNames, cepQl)
-        } else {
-          AggregateExecutorFactory.Instance.createExecutors(config, upStreamNames, analyzerId)
-        }
-
-        analyzeExecutors.foreach(exec => {
-          val t = FlatMapProducer(exec.asInstanceOf[FlatMapper[Any]]).initWith(dag,config, hook = false).nameAs(exec.getExecutorId + "_" + exec.getPartitionSeq).stream(child.stream)
-
-          // connect with previous
-          if (strategy == null) {
-            newStreamProducers.foreach(s => toBeAddedEdges += StreamConnector(s, t))
-          } else {
-            newStreamProducers.foreach(s => toBeAddedEdges += StreamConnector(s, t, strategy))
-          }
-
-          // connect with next
-          val outgoingEdges = dag.outgoingEdgesOf(child)
-          outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(t, e.to, e))
-        })
-        
-        // remote current child
-        toBeRemovedVertex += child
-      }
-      case _ => 
-    }
-  }
-}
-
-object StreamAggregateExpansion{
-  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamAggregateExpansion ={
-    val e = new StreamAggregateExpansion(config)
-    e.expand(dag)
-    e
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
deleted file mode 100644
index 618bba3..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
-
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import java.util
-
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity
-import org.apache.eagle.alert.executor.AlertExecutorCreationUtils
-import org.apache.eagle.policy.common.Constants
-import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl
-
-import scala.collection.JavaConversions.asScalaSet
-import scala.collection.mutable.ListBuffer
-import org.apache.eagle.datastream.JavaStormExecutorForAlertWrapper
-import org.apache.eagle.datastream.JavaStormStreamExecutor
-import org.apache.eagle.datastream.StormStreamExecutor
-import org.apache.eagle.datastream.storm.StormExecutorForAlertWrapper
-import org.apache.eagle.datastream.utils.AlertExecutorConsumerUtils
-import org.apache.eagle.service.client.EagleServiceConnector
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-import com.typesafe.config.Config
-
-/**
- * The constraints for alert is:
- * 1. only 3 StreamProducers can be put immediately before MapProducer, FlatMapProducer, StreamUnionProducer
- * 2. For StreamUnionProducer, the only supported unioned producers are MapProducer and FlatMapProducer
- * 3. the output for MapProducer and FlatMapProducer is 2-field tuple, key and value, key is string, value has to be SortedMap
- * 4. the framework will wrapper original MapProducer and FlatMapProducer to emit 3-field tuple, {key, streamName and value}
- * 5. the framework will automatically partition traffic with first field
- *
- *
- * 2 steps
- * step 1: wrapper previous StreamProducer with one more field "streamName"
- * step 2: partition alert executor by policy partitioner class
- */
-
-case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
-  val LOG = LoggerFactory.getLogger(classOf[StreamAlertExpansion])
-  import StreamAlertExpansion._
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): Unit ={
-    val iter = dag.iterator()
-    val toBeAddedEdges = new ListBuffer[StreamConnector[Any,Any]]
-    val toBeRemovedVertex = new ListBuffer[StreamProducer[Any]]
-    while(iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        val child = edge.to
-        onIteration(toBeAddedEdges, toBeRemovedVertex, dag, current, child)
-      })
-    }
-    // add back edges
-    toBeAddedEdges.foreach(e => {
-      dag.addVertex(e.from)
-      dag.addVertex(e.to)
-      dag.addEdge(e.from, e.to, e)
-    })
-    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
-  }
-
-  def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]], 
-               dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], child: StreamProducer[Any]): Unit = {
-    child match {
-      case AlertStreamProducer(upStreamNames, alertExecutorId, withConsumer,strategy) => {
-        /**
-         * step 1: wrapper previous StreamProducer with one more field "streamName"
-         * for AlertStreamSink, we check previous StreamProducer and replace that
-         */
-        val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames)
-
-        /**
-         * step 2: partition alert executor by policy partitioner class
-         */
-        val alertExecutors = AlertExecutorCreationUtils.createAlertExecutors(config,
-          new PolicyDefinitionEntityDAOImpl[AlertDefinitionAPIEntity](new EagleServiceConnector(config), Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME),
-          upStreamNames, alertExecutorId)
-        var alertProducers = new scala.collection.mutable.MutableList[StreamProducer[Any]]
-        alertExecutors.foreach(exec => {
-          val t = FlatMapProducer(exec).nameAs(exec.getExecutorId + "_" + exec.getPartitionSeq).initWith(dag,config, hook = false)
-          alertProducers += t
-          newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector[Any,Any](newsp, t,Seq(0)))
-          if (strategy == null) {
-             newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp,t,Seq(0)))
-          }
-          else {
-            newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp,t,strategy))
-          }
-        })
-
-        // remove AlertStreamSink
-        toBeRemovedVertex += child
-
-        // add alert consumer if necessary
-        if (withConsumer) {
-          AlertExecutorConsumerUtils.setupAlertConsumers(toBeAddedEdges, alertProducers.toList)
-        }
-      }
-      case _ =>
-    }
-  }
-
-  protected def rewriteWithStreamOutputWrapper(current: org.apache.eagle.datastream.core.StreamProducer[Any], dag: org.jgrapht.experimental.dag.DirectedAcyclicGraph[org.apache.eagle.datastream.core.StreamProducer[Any],org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeAddedEdges: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeRemovedVertex: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamProducer[Any]], upStreamNames: java.util.List[String]) = {
-    if(upStreamNames == null) throw new NullPointerException("upStreamNames is null")
-
-    /**
-     * step 1: wrapper previous StreamProducer with one more field "streamName"
-     * for AlertStreamSink, we check previous StreamProducer and replace that
-     */
-    val newStreamProducers = new ListBuffer[StreamProducer[Any]]
-    current match {
-      case StreamUnionProducer(others) => {
-        val incomingEdges = dag.incomingEdgesOf(current)
-        incomingEdges.foreach(e => newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, e.from, upStreamNames.get(0)))
-        var i: Int = 1
-        others.foreach(o => {
-          newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, o, upStreamNames.get(i))
-          i += 1
-        })
-      }
-      case p: FlatMapProducer[AnyRef, AnyRef] => {
-        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(p,upStreamNames))
-      }
-      case p: MapperProducer[AnyRef,AnyRef] => {
-        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(p,upStreamNames))
-      }
-      case s: StreamProducer[AnyRef] if dag.inDegreeOf(s) == 0 => {
-        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(s,upStreamNames))
-      }
-      case p@_ => throw new IllegalStateException(s"$p can not be put before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and MapProducer are supported")
-    }
-    newStreamProducers
-  }
-
-
-  protected def replace(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
-                      dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], upStreamName: String) : StreamProducer[Any]= {
-    var newsp: StreamProducer[Any] = null
-    current match {
-      case _: FlatMapProducer[AnyRef, AnyRef] => {
-        val mapper = current.asInstanceOf[FlatMapProducer[_, _]].mapper
-        mapper match {
-          case a: JavaStormStreamExecutor[AnyRef] => {
-            val newmapper = new JavaStormExecutorForAlertWrapper(a.asInstanceOf[JavaStormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
-            newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false).stream(current.streamId)
-          }
-          case b: StormStreamExecutor[AnyRef] => {
-            val newmapper = StormExecutorForAlertWrapper(b.asInstanceOf[StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
-            newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false).stream(current.streamId)
-          }
-          case _ => throw new IllegalArgumentException
-        }
-        // remove old StreamProducer and replace that with new StreamProducer
-        val incomingEdges = dag.incomingEdgesOf(current)
-        incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp))
-        val outgoingEdges = dag.outgoingEdgesOf(current)
-        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to))
-        toBeRemovedVertex += current
-      }
-      case _: MapperProducer[Any,Any] => {
-        val mapper = current.asInstanceOf[MapperProducer[Any,Any]].fn
-        val newfun: (Any => Any) = { a =>
-          val result = mapper(a)
-          result match {
-            case scala.Tuple1(x1) => (null, upStreamName, x1)
-            case scala.Tuple2(x1, x2) => (x1, upStreamName, x2)
-            case scala.Tuple3(_, _, _) => result
-            case _ => throw new IllegalArgumentException(s"Illegal message :$result, Tuple1/Tuple2/Tuple3 are supported")
-          }
-        }
-        current match {
-          case MapperProducer(_, fn) => newsp = MapperProducer(3, newfun).initWith(dag,config,hook = false).stream(current.stream)
-          case _ => throw new IllegalArgumentException(s"Illegal producer $current")
-        }
-        val incomingEdges = dag.incomingEdgesOf(current)
-        incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp))
-        val outgoingEdges = dag.outgoingEdgesOf(current)
-        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to))
-        toBeRemovedVertex += current
-      }
-      case s: StreamProducer[Any] if dag.inDegreeOf(s) == 0 => {
-        val fn:(AnyRef => AnyRef) = {
-          n => {
-            n match {
-              case scala.Tuple3 => n
-              case scala.Tuple2(x1,x2) => (x1,upStreamName,x2)
-              case scala.Tuple1(x1) => (if(x1 == null) null else x1.hashCode(),upStreamName,x1)
-              case _ => (if(n == null) null else n.hashCode(),upStreamName,n)
-            }
-          }
-        }
-        newsp = MapperProducer(3,fn).initWith(dag,config,hook = false).stream(s.stream)
-        toBeAddedEdges += StreamConnector(current,newsp)
-        val outgoingEdges = dag.outgoingEdgesOf(current)
-        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp,e.to))
-      }
-      case _ => throw new IllegalArgumentException("Only FlatMapProducer and MapProducer can be replaced before AlertStreamSink")
-    }
-    newsp
-  }
-}
-
-object StreamAlertExpansion{
-  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamAlertExpansion ={
-    val e = StreamAlertExpansion(config)
-    e.expand(dag)
-    e
-  }
-
-  /**
-    * Try upStreamNames firstly, otherwise try producer.streamId
-    *
-    * @param producer
-    * @param upStreamNames
-    * @return
-    */
-  private def recognizeSingleStreamName(producer: StreamProducer[AnyRef],upStreamNames:util.List[String]):String = {
-    if(upStreamNames == null){
-      producer.streamId
-    }else if(upStreamNames.size()>1){
-      if(producer.streamId == null) {
-        if (upStreamNames.size() > 1)
-          throw new IllegalStateException("Too many (more than 1) upStreamNames " + upStreamNames + " given for " + producer)
-        else
-          upStreamNames.get(0)
-      } else {
-        producer.streamId
-      }
-    } else if(upStreamNames.size() == 1){
-      upStreamNames.get(0)
-    }else {
-      if(producer.streamId == null){
-        throw new IllegalArgumentException("No stream name found for "+producer)
-      } else
-        producer.streamId
-    }
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
deleted file mode 100644
index 6e21bcc..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.eagle.dataproc.util.ConfigOptionParser
-import org.apache.eagle.datastream.ExecutionEnvironments
-import org.apache.eagle.datastream.utils.GraphPrinter
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-import scala.reflect.runtime.universe._
-
-trait StreamContextBuilder extends StreamSourceBuilder {
-  def config:Configuration
-  /**
-   * Business logic DAG
-   * @return
-   */
-  def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]
-  /**
-   * Support Java Style Config
-   *
-   * @return
-   */
-  def getConfig:Config = config.get
-  def build:StreamDAG
-  def submit[E<:ExecutionEnvironment](implicit typeTag:TypeTag[E]):Unit
-  def submit(env:ExecutionEnvironment):Unit
-  def submit(clazz:Class[ExecutionEnvironment]):Unit
-}
-
-class StreamContext(private val conf:Config) extends StreamContextBuilder{
-  implicit private val _dag = new DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]](classOf[StreamConnector[Any,Any]])
-  private val _config:Configuration = Configuration(conf)
-  override def dag = _dag
-  override def config = _config
-  override def build: StreamDAG = {
-    implicit val i_conf = _config.get
-    StreamNameExpansion()
-    GraphPrinter.print(dag,message="Before expanded DAG ")
-    StreamAggregateExpansion()
-    StreamAlertExpansion()
-    StreamUnionExpansion()
-    StreamGroupbyExpansion()
-    StreamParallelismConfigExpansion()
-    StreamNameExpansion()
-    GraphPrinter.print(dag,message="After expanded DAG ")
-    GraphPrinter.printDotDigraph(dag)
-    StreamDAGTransformer.transform(dag)
-  }
-
-  override def submit(env: ExecutionEnvironment): Unit = {
-    env.submit(this)
-  }
-
-  override def submit(clazz: Class[ExecutionEnvironment]): Unit = {
-    ExecutionEnvironments.get(clazz,conf).submit(this)
-  }
-
-  override def submit[E <: ExecutionEnvironment](implicit typeTag: TypeTag[E]): Unit = {
-    ExecutionEnvironments.getWithConfig[E](conf).submit(this)
-  }
-}
-
-object StreamContext {
-  /**
-   * @return
-   */
-  def apply():StreamContext = {
-    new StreamContext(ConfigFactory.load())
-  }
-
-  /**
-   *
-   * @param args
-   * @return
-   */
-  def apply(args:Array[String]):StreamContext ={
-    new StreamContext(new ConfigOptionParser().load(args))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala
deleted file mode 100644
index ce9d82c..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream.core
-
-import org.apache.eagle.partition.PartitionStrategy
-
-abstract class StreamConnector[+T1 <: Any,+T2 <: Any](val from: StreamProducer[T1], val to: StreamProducer[T2]) extends Serializable
-
-case class ShuffleConnector[+T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2])
-  extends StreamConnector[T1,T2](from,to){
-  override def toString: String = "shuffleGroup"
-}
-
-case class GroupbyFieldsConnector[+T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2],groupByFields : Seq[Int])
-  extends StreamConnector[T1,T2](from,to){
-  override def toString: String = s"groupByFields( $groupByFields )"
-}
-
-case class GroupbyKeyConnector[T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2],keySelector: T1 => Any)
-  extends StreamConnector[T1,T2](from,to){
-  override def toString: String = s"groupByKey($keySelector)"
-}
-
-case class GroupbyStrategyConnector[+T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2],customGroupBy:PartitionStrategy)
-  extends StreamConnector[T1,T2](from,to){
-  override def toString: String = s"groupByStrategy( $customGroupBy )"
-}
-
-object StreamConnector{
-  /**
-   *
-   * @param from
-   * @param to
-   * @tparam T1
-   * @tparam T2
-   * @return
-   */
-  def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2]):ShuffleConnector[T1,T2] = ShuffleConnector(from,to)
-
-  /**
-   * Clone connector from old connector to apply to new processing element, return ShuffleConnector by default
-   *
-   * @param from
-   * @param to
-   * @param connector
-   * @tparam T1
-   * @tparam T2
-   * @return
-   */
-  def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2],connector: StreamConnector[Any,Any]):StreamConnector[T1,T2] = connector match {
-      case GroupbyFieldsConnector(_,_,fields) => GroupbyFieldsConnector[T1,T2](from,to,fields)
-      case GroupbyKeyConnector(_,_,keySelector) => GroupbyKeyConnector[T1,T2](from,to,keySelector)
-      case GroupbyStrategyConnector(_,_,strategy) => GroupbyStrategyConnector[T1,T2](from,to,strategy)
-      case null | ShuffleConnector(_,_) => ShuffleConnector[T1,T2](from,to)
-      case c@_ => throw new IllegalArgumentException(s"Unknown type of stream connector $c")
-  }
-
-  /**
-   *
-   * @param from
-   * @param to
-   * @param groupByFields
-   * @tparam T1
-   * @tparam T2
-   * @return
-   */
-  def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2],groupByFields : Seq[Int]):GroupbyFieldsConnector[T1,T2] = GroupbyFieldsConnector(from,to,groupByFields)
-
-  /**
-   *
-   * @param from
-   * @param to
-   * @param customGroupBy
-   * @tparam T1
-   * @tparam T2
-   * @return
-   */
-  def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2],customGroupBy: PartitionStrategy):GroupbyStrategyConnector[T1,T2] = GroupbyStrategyConnector(from,to,customGroupBy)
-
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
deleted file mode 100644
index 7845740..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream.core
-
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-import scala.collection.JavaConverters._
-import scala.collection.{JavaConversions, mutable}
-
-/**
- * wrapper of DAG, used for storm topology compiler
- */
-class StreamDAG(val graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) extends StreamProducerGraph {
-  var nodeMap: mutable.Map[String, StreamProducer[Any]] = mutable.Map[String,StreamProducer[Any]]()
-  graph.iterator().asScala.foreach(p=> nodeMap.put(p.name,p))
-
-  override def addEdge(from: StreamProducer[Any], to: StreamProducer[Any], streamConnector: StreamConnector[Any,Any]): Unit = {
-    graph.addEdge(from, to, streamConnector)
-  }
-
-  override def addVertex(producer: StreamProducer[Any]): Unit = {
-    graph.addVertex(producer)
-    nodeMap.put(producer.name,producer)
-  }
-
-  override def iterator(): Iterator[StreamProducer[Any]] = {
-    JavaConversions.asScalaIterator(graph.iterator())
-  }
-
-  override def isSource(v: StreamProducer[Any]): Boolean = {
-    graph.inDegreeOf(v) match {
-      case 0 => true
-      case _ => false
-    }
-  }
-
-  override def outgoingEdgesOf(v: StreamProducer[Any]): scala.collection.mutable.Set[StreamConnector[Any,Any]] = {
-    JavaConversions.asScalaSet(graph.outgoingEdgesOf(v))
-  }
-
-  override def getNodeByName(name: String): Option[StreamProducer[Any]] = {
-    nodeMap.get(name)
-  }
-
-  def setNodeMap(nodeMap: mutable.Map[String, StreamProducer[Any]]): Unit = {
-    this.nodeMap = nodeMap
-  }
-
-  override def incomingVertexOf(v: StreamProducer[Any]): scala.collection.mutable.Set[StreamProducer[Any]] = {
-    val set = mutable.Set[StreamProducer[Any]]()
-    graph.incomingEdgesOf(v).asScala.foreach(e => set += graph.getEdgeSource(e))
-    set
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala
deleted file mode 100644
index fcff639..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.Config
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-abstract class StreamDAGExpansion(config: Config) {
-  def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]])
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
deleted file mode 100644
index 6b20bf2..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-import scala.collection.mutable
-
-/**
- * convert generic DAG data structure to Storm specific DAG data structure for easy topology compiler
- */
-object StreamDAGTransformer {
-  /**
-   * Transform DirectedAcyclicGraph[StreamProducer, StreamConnector] into StormStreamDAG
-   *
-   * @param dag DirectedAcyclicGraph[StreamProducer, StreamConnector]
-   * @return StormStreamDAG
-   */
-  @deprecated("Use StreamDAG(dag) will transform directly")
-  def transform(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) : StreamDAG = {
-    val stormDAG = new StreamDAG(dag)
-    val nodeMap = mutable.HashMap[String, StreamProducer[Any]]()
-    val iter = dag.iterator()
-    while(iter.hasNext){
-      val sp = iter.next()
-      nodeMap.put(sp.name, sp)
-    }
-    stormDAG.setNodeMap(nodeMap)
-    stormDAG
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala
deleted file mode 100644
index 1a07e3f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.Config
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
-
-/**
- * Replace GroupByProducer(Vertex) with StreamConnector (Edge)
- *
- * For example as to Storm, it's mainly for grouping method
- *
- * @param config context configuration
- */
-case class StreamGroupbyExpansion(config: Config) extends StreamDAGExpansion(config){
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = {
-    val iter = dag.iterator()
-    var toBeAddedEdges = new ListBuffer[StreamConnector[Any,Any]]
-    var toBeRemovedVertex = new ListBuffer[StreamProducer[Any]]
-    while(iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        val child = edge.to
-        child match {
-          case p : GroupByProducer[Any] => {
-            dag.outgoingEdgesOf(p).foreach(c2 => {
-              p match {
-                case GroupByFieldProducer(fields) =>
-                  toBeAddedEdges += GroupbyFieldsConnector(current, c2.to,fields)
-                case GroupByStrategyProducer(strategy) =>
-                  toBeAddedEdges += GroupbyStrategyConnector(current, c2.to,strategy)
-                case GroupByKeyProducer(keySelector) =>
-                  current.outKeyed = true
-                  current.keySelector = KeySelectorWrapper(keySelector)
-                  c2.to.inKeyed = true
-                  toBeAddedEdges += GroupbyKeyConnector(current, c2.to,keySelector)
-                case _ => toBeAddedEdges += ShuffleConnector(current, c2.to)
-              }
-            })
-            toBeRemovedVertex += p
-          }
-          case _ =>
-        }
-      })
-    }
-
-    // add back edges
-    toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e))
-    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
-  }
-}
-
-object StreamGroupbyExpansion{
-  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamGroupbyExpansion ={
-    val e = StreamGroupbyExpansion(config)
-    e.expand(dag)
-    e
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala
deleted file mode 100644
index 4bc1812..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.Config
-import org.apache.eagle.datastream.utils.NodeNameSelector
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-/**
- * to set name for each StreamProducer
- * 1. if name is given programatically, then use this name
- * 2. otherwise use name generated by scala internally
- */
-case class StreamNameExpansion(config: Config) extends StreamDAGExpansion(config){
-  val LOG = LoggerFactory.getLogger(classOf[StreamNameExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = {
-    val iter = dag.iterator()
-    while(iter.hasNext){
-      val sp = iter.next()
-      sp.name = NodeNameSelector(sp).getName
-    }
-  }
-}
-
-
-object StreamNameExpansion{
-  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamNameExpansion ={
-    val e = StreamNameExpansion(config)
-    e.expand(dag)
-    e
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
deleted file mode 100644
index 8699da6..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import java.util.regex.Pattern
-
-import com.typesafe.config.{Config, ConfigObject, ConfigValue}
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-case class StreamParallelismConfigExpansion(config: Config) extends StreamDAGExpansion(config){
-  val LOG = LoggerFactory.getLogger(classOf[StreamParallelismConfigExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = {
-    val map = getParallelismMap(config)
-    val iter = dag.iterator()
-    while(iter.hasNext){
-      val streamProducer = iter.next()
-      if(streamProducer.name != null) {
-        map.foreach(tuple => {
-          tuple._1.matcher(streamProducer.name).find() match {
-            case true => streamProducer.parallelism(tuple._2)
-            case false =>
-          }
-        })
-      }
-    }
-  }
-
-  private def getParallelismMap(config: Config) : Map[Pattern, Int]= {
-    if(config.hasPath("envContextConfig.parallelismConfig")) {
-      val parallelismConfig: ConfigObject = config.getObject("envContextConfig.parallelismConfig")
-      parallelismConfig.asScala.toMap map {
-        case (name, value) => (Pattern.compile(name), value.asInstanceOf[ConfigValue].unwrapped().asInstanceOf[Int])
-      }
-    }else{
-      Map[Pattern,Int]()
-    }
-  }
-}
-
-object StreamParallelismConfigExpansion{
-  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamParallelismConfigExpansion ={
-    val e = StreamParallelismConfigExpansion(config)
-    e.expand(dag)
-    e
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
deleted file mode 100644
index 6445643..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.core
-
-import java.util
-import java.util.concurrent.atomic.AtomicInteger
-
-import backtype.storm.topology.base.BaseRichSpout
-import com.typesafe.config.Config
-import org.apache.eagle.alert.entity.AlertAPIEntity
-import org.apache.eagle.datastream.{FlatMapperWrapper, Collector, FlatMapper}
-import org.apache.eagle.partition.PartitionStrategy
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConversions.{asScalaBuffer, seqAsJavaList}
-import scala.collection.JavaConverters.asScalaBufferConverter
-/**
- * StreamProducer = StreamInfo + StreamProtocol
- *
- * StreamProducer is processing logic element, used the base class for all other concrete StreamProducer
- * Stream Producer can be treated as logical processing element but physical
- * It defines high level type-safe API for user to organize data stream flow
- *
- * StreamProducer is independent of execution environment
- *
- * @tparam T processed elements type
- */
-abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[T] {
-  private var graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]] = null
-  private val LOG = LoggerFactory.getLogger(classOf[StreamProducer[T]])
-
-  /**
-   * Should not modify graph when setting it
-   */
-  def initWith(graph:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]],config:Config, hook:Boolean = true):StreamProducer[T]={
-    this.config = config
-    this.graph = graph
-    if(hook && ! this.graph.containsVertex(this)) {
-      this.graph.addVertex(this)
-    }
-    LOG.debug(this.graph.toString)
-    this
-  }
-  
-  /**
-   * Get stream pure metadata info
-   * @return
-   */
-  override def getInfo:StreamInfo = this.asInstanceOf[StreamInfo]
-
-  override def stream(streamId:String):StreamProducer[T] = {
-    this.streamId = streamId
-    this
-  }
-
-  override def filter(fn : T => Boolean): StreamProducer[T] ={
-    val ret = FilterProducer[T](fn)
-    connect(this, ret)
-    ret
-  }
-
-  override def flatMap[R](flatMapper:FlatMapper[R]): StreamProducer[R] = {
-    val ret = FlatMapProducer[T,R](flatMapper)
-    connect(this, ret)
-    ret
-  }
-  override def flatMap[R](func:(Any,Collector[R])=>Unit): StreamProducer[R] = {
-    val ret = FlatMapProducer[T,R](FlatMapperWrapper[R](func))
-    connect(this, ret)
-    ret
-  }
-
-
-  override def foreach(fn : T => Unit) : Unit = {
-    val ret = ForeachProducer[T](fn)
-    connect(this, ret)
-  }
-
-  override def map[R](fn : T => R) : StreamProducer[R] = {
-    val ret = MapperProducer[T,R](0,fn)
-    connect(this, ret)
-    ret
-  }
-
-  override def map1[R](fn : T => R): StreamProducer[R] = {
-    val ret = MapperProducer[T,R](1, fn)
-    connect(this, ret)
-    ret
-  }
-
-  override def map2[R](fn : T => R): StreamProducer[R] = {
-    val ret = MapperProducer[T,R](2, fn)
-    connect(this, ret)
-    ret
-  }
-
-  override def map3[R](fn : T => R): StreamProducer[R] = {
-    val ret = MapperProducer(3, fn)
-    connect(this, ret)
-    ret
-  }
-
-  override def map4[R](fn : T => R): StreamProducer[R] = {
-    val ret = MapperProducer(4, fn)
-    connect(this, ret)
-    ret
-  }
-
-  /**
-   * starting from 0, groupby operator would be upon edge of the graph
-   */
-  override def groupBy(fields : Int*) : StreamProducer[T] = {
-    // validate each field index is greater or equal to 0
-    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
-    val ret = GroupByFieldProducer[T](fields)
-    connect(this, ret)
-    ret
-  }
-
-  def groupByFieldIndex(fields : Seq[Int]) : StreamProducer[T] = {
-    // validate each field index is greater or equal to 0
-    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
-    val ret = GroupByFieldProducer[T](fields)
-    connect(this, ret)
-    ret
-  }
-
-  //groupBy java version, starting from 1
-  override def groupBy(fields : java.util.List[Integer]) : StreamProducer[T] = {
-    // validate each field index is greater or equal to 0
-    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
-    val ret = GroupByFieldProducer[T](fields.asScala.toSeq.asInstanceOf[Seq[Int]])
-    connect(this, ret)
-    ret
-  }
-
-  override def groupByKey(keySelector: T=> Any):StreamProducer[T] = {
-    val ret = GroupByKeyProducer(keySelector)
-    connect(this,ret)
-    ret
-  }
-
-  override def streamUnion[T2,T3](others : Seq[StreamProducer[T2]]) : StreamProducer[T3] = {
-    val ret = StreamUnionProducer[T, T2, T3](others)
-    connect(this, ret)
-    ret
-  }
-
-  def streamUnion[T2,T3](other : StreamProducer[T2]) : StreamProducer[T3] = {
-    streamUnion(Seq(other))
-  }
-
-  def streamUnion[T2,T3](others : util.List[StreamProducer[T2]]) : StreamProducer[T3] = streamUnion(others.asScala)
-
-  override def groupBy(strategy : PartitionStrategy) : StreamProducer[T] = {
-    val ret = GroupByStrategyProducer(strategy)
-    connect(this, ret)
-    ret
-  }
-
-  /**
-   * alert is always sink of data flow
-   */
-  def alertWithConsumer(upStreamNames: util.List[String], alertExecutorId : String) = {
-    alert(upStreamNames.asScala, alertExecutorId,consume = true)
-  }
-
-  def alertWithoutConsumer(upStreamNames: util.List[String], alertExecutorId : String) = {
-    alert(upStreamNames.asScala, alertExecutorId, consume = false)
-  }
-
-  override def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean=true, strategy : PartitionStrategy=null):AlertStreamProducer = {
-    val ret = AlertStreamProducer(upStreamNames, alertExecutorId, consume, strategy)
-    connect(this, ret)
-    ret
-  }
-
-  def alertWithConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
-    alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = true, strategy = strategy)
-  }
-
-  def alertWithConsumer(upStreamName: String, alertExecutorId : String): Unit ={
-    alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = true)
-  }
-
-  def alertWithoutConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
-    alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = false, strategy)
-  }
-
-  def alertWithoutConsumer(upStreamName: String, alertExecutorId : String): Unit ={
-    alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = false)
-  }
-
-  def aggregate(upStreamNames: java.util.List[String], queryExecutorId : String, strategy: PartitionStrategy = null): StreamProducer[T] = {
-    val ret= AggregateProducer(upStreamNames, queryExecutorId, null, strategy)
-    connect(this, ret)
-    ret
-  }
-
-  def aggregateDirect(upStreamNames: java.util.List[String], cql : String, strategy: PartitionStrategy): StreamProducer[T] = {
-    val ret= AggregateProducer(upStreamNames, null, cql, strategy)
-    connect(this, ret)
-    ret
-  }
-
-  def persist(executorId : String, storageType: StorageType.StorageType) : StreamProducer[T] = {
-    val ret = PersistProducer(executorId, storageType)
-    connect(this, ret)
-    ret
-  }
-
-  def connect[T1,T2](current: StreamProducer[T1], next: StreamProducer[T2]) = {
-    if(current.graph == null) throw new NullPointerException(s"$current has not been registered to any graph before being connected")
-    current.graph.addVertex(next)
-    current.graph.addEdge(current, next, StreamConnector(current, next))
-    passOnContext[T1,T2](current, next)
-  }
-
-  def connect[T2]( next: StreamProducer[T2]) = {
-    if(this.graph == null) throw new NullPointerException("graph is null")
-    this.graph.addVertex(next)
-    this.graph.addEdge(this, next, StreamConnector(this, next))
-    passOnContext[T,T2](this, next)
-  }
-
-  private def passOnContext[T1 ,T2](current: StreamProducer[T1], next: StreamProducer[T2]): Unit ={
-    next.initWith(current.graph,current.config)
-  }
-
-  /**
-   * can be set by programatically or by configuration
-   */
-  override def parallelism(parallelism : Int) : StreamProducer[T] = {
-    this.parallelismNum = parallelism
-    this
-  }
-
-  override def parallelism : Int = this.parallelismNum
-  override def stream:String = this.streamId
-
-  /**
-   * Component name
-   *
-   * @param componentName component name
-   * @return
-   */
-  override def nameAs(componentName : String) : StreamProducer[T] = {
-    this.name = componentName
-    this
-  }
-}
-
-case class FilterProducer[T](fn : T => Boolean) extends StreamProducer[T]{
-  override def toString: String = s"FilterProducer"
-}
-
-case class FlatMapProducer[T, R](var mapper: FlatMapper[R]) extends StreamProducer[R]{
-  override def toString: String = mapper.toString
-}
-
-case class MapperProducer[T,R](numOutputFields : Int, var fn : T => R) extends StreamProducer[R]{
-  override def toString: String = s"MapperProducer"
-}
-
-case class ForeachProducer[T](var fn : T => Unit) extends StreamProducer[T]
-
-abstract class GroupByProducer[T] extends StreamProducer[T]
-case class GroupByFieldProducer[T](fields : Seq[Int]) extends GroupByProducer[T]
-case class GroupByStrategyProducer[T](partitionStrategy: PartitionStrategy) extends GroupByProducer[T]
-case class GroupByKeyProducer[T](keySelectorFunc:T => Any) extends GroupByProducer[T]{
-  override def toString: String = s"GroupByKey"
-}
-
-object GroupByProducer {
-  def apply[T](fields: Seq[Int]) = new GroupByFieldProducer[T](fields)
-  def apply[T](partitionStrategy : PartitionStrategy) = new GroupByStrategyProducer[T](partitionStrategy)
-  def apply[T](keySelector:T => Any) = new GroupByKeyProducer[T](keySelector)
-}
-
-case class StreamUnionProducer[T1,T2,T3](others: Seq[StreamProducer[T2]]) extends StreamProducer[T3]
-
-case class StormSourceProducer[T](source: BaseRichSpout) extends StreamProducer[T]{
-  var numFields : Int = 0
-
-  /**
-    * rename outputfields to f0, f1, f2, ...
-   * if one spout declare some field names, those fields names will be modified
-   * @param n
-   */
-  def withOutputFields(n : Int): StormSourceProducer[T] ={
-    this.numFields = n
-    this
-  }
-}
-
-case class IterableStreamProducer[T](iterable: Iterable[T],recycle:Boolean = false) extends StreamProducer[T]{
-  override def toString: String = s"IterableStreamProducer(${iterable.getClass.getSimpleName}))"
-}
-case class IteratorStreamProducer[T](iterator: Iterator[T]) extends StreamProducer[T]{
-  override def toString: String = s"IteratorStreamProducer(${iterator.getClass.getSimpleName})"
-}
-
-case class AlertStreamProducer(var upStreamNames: util.List[String], alertExecutorId : String, var consume: Boolean=true, strategy: PartitionStrategy=null) extends StreamProducer[AlertAPIEntity] {
-  def consume(consume: Boolean): AlertStreamProducer = {
-    this.consume = consume
-    this
-  }
-}
-
-case class AggregateProducer[T](var upStreamNames: util.List[String], analyzerId : String, cepQl: String = null, strategy:PartitionStrategy = null) extends StreamProducer[T]
-
-case class PersistProducer[T](executorId :String, storageType: StorageType.StorageType) extends StreamProducer[T]
-
-object UniqueId{
-  val id : AtomicInteger = new AtomicInteger(0);
-  def incrementAndGetId() : Int = {
-    id.incrementAndGet()
-  }
-}
-
-trait KeySelector extends Serializable{
-  def key(t:Any):Any
-}
-
-case class KeySelectorWrapper[T](fn:T => Any) extends KeySelector{
-  override def key(t: Any): Any = fn(t.asInstanceOf[T])
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala
deleted file mode 100644
index f3fcc4d..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream.core
-
-trait StreamProducerGraph {
-  def addEdge(from: StreamProducer[Any], to: StreamProducer[Any], streamConnector: StreamConnector[Any,Any])
-  def addVertex(producer: StreamProducer[Any])
-  def iterator() : Iterator[StreamProducer[Any]]
-  def isSource(v : StreamProducer[Any]) : Boolean
-  def outgoingEdgesOf(v : StreamProducer[Any]) : scala.collection.mutable.Set[StreamConnector[Any,Any]]
-  def getNodeByName(name : String) : Option[StreamProducer[Any]]
-  def incomingVertexOf(v: StreamProducer[Any]) : scala.collection.mutable.Set[StreamProducer[Any]]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
deleted file mode 100644
index b54b21f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.Config
-import org.apache.commons.lang3.builder.HashCodeBuilder
-import org.apache.eagle.datastream.{Collector, FlatMapper}
-import org.apache.eagle.partition.PartitionStrategy
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-/**
- * StreamInfo should be fully serializable and having not runtime type information
- */
-class StreamInfo  extends Serializable{
-  /**
-   * Processing Element Id
-   */
-  val id:Int = UniqueId.incrementAndGetId()
-
-  /**
-   * Processing Element Name
-   */
-  var name: String = null
-
-  /**
-    * Output stream id, equals to name by default
-    */
-  var streamId:String=null
-
-  var parallelismNum: Int = 1
-
-  /**
-   * Keyed input stream
-   */
-  var inKeyed:Boolean = false
-  /**
-   * Keyed output stream
-   */
-  var outKeyed:Boolean = false
-  /**
-   * Output key selector
-   */
-  var keySelector:KeySelector = null
-
-// Type Information
-// ================
-//
-//  /**
-//   * Entity class type of T
-//   */
-//  var typeClass:Class[_] = null
-//
-//  /**
-//   * Type Class Simple Name
-//   * @return
-//   */
-//  def typeClassName = if(typeClass == null) null else typeClass.getSimpleName
-//
-//  @transient private var _typeTag[_] = null
-//
-//  def typeTag[_] = {
-//    if(_typeTag == null) _typeTag = Reflections.typeTag(this.typeClass)
-//    _typeTag
-//  }
-
-  var config: Config = null
-
-  def getInfo = this
-
-  override def hashCode(): Int = new HashCodeBuilder().append(this.id).append(this.getClass).toHashCode
-}
-
-
-object StorageType extends Enumeration {
-  type StorageType = Value
-  val KAFKA, DRUID, HBASE = Value
-}
-
-/**
- * Stream interaction protocol interface
- *
- * @tparam T processed elements type
- */
-trait StreamProtocol[+T <: Any]{
-  /**
-   * Initialize the stream metadata info
-   */
-  def initWith(graph:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]],config:Config, hook:Boolean = true):StreamProducer[T]
-
-  /**
-   * Support Java API
-   *
-   * @param flatMapper
-   * @tparam R
-   * @return
-   */
-  def flatMap[R](flatMapper:FlatMapper[R]): StreamProducer[R]
-  def flatMap[R](func:(Any,Collector[R])=>Unit): StreamProducer[R]
-
-  /**
-   *
-   * @param fn
-   * @return
-   */
-  def filter(fn : T => Boolean): StreamProducer[T]
-
-  /**
-   *
-   * @param fn
-   */
-  def foreach(fn : T => Unit) : Unit
-
-  /**
-   * Type safe mapper
-   * @param fn
-   * @tparam R
-   * @return
-   */
-  def map[R](fn : T => R): StreamProducer[R]
-
-  /**
-   * Field base mapper
-   * @param fn
-   * @tparam R
-   * @return
-   */
-  def map1[R](fn : T => R) : StreamProducer[R]
-  def map2[R](fn : T => R) : StreamProducer[R]
-  def map3[R](fn : T => R) : StreamProducer[R]
-  def map4[R](fn : T => R) : StreamProducer[R]
-
-  def groupBy(fields : Int*) : StreamProducer[T]
-  def groupBy(fields : java.util.List[Integer]) : StreamProducer[T]
-  def groupBy(strategy : PartitionStrategy) : StreamProducer[T]
-
-  /**
-   * @param keyer key selector function
-   * @return
-   */
-  def groupByKey(keyer:T => Any):StreamProducer[T]
-
-  def streamUnion[T2,T3](otherStreams : Seq[StreamProducer[T2]]) : StreamProducer[T3]
-  def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean,strategy : PartitionStrategy):AlertStreamProducer
-
-  def aggregate(upStreamNames: java.util.List[String], executorId :String, strategy:PartitionStrategy): StreamProducer[T]
-
-  def aggregateDirect(upStreamNames: java.util.List[String], cql : String, strategy:PartitionStrategy): StreamProducer[T]
-
-  def persist(executorId : String, storageType: StorageType.StorageType): StreamProducer[T]
-  
-  /**
-   * Set processing element parallelism setting
-   * @param parallelismNum parallelism value
-   * @return
-   */
-  def parallelism(parallelismNum : Int) : StreamProducer[T]
-  def parallelism : Int
-  /**
-   * Set component name
-   *
-   * @param componentName
-   * @return
-   */
-  def nameAs(componentName : String) : StreamProducer[T]
-
-  /**
-   * Set stream name
-   * @param streamId stream ID
-   * @return
-   */
-  def stream(streamId: String): StreamProducer[T]
-  def stream: String
-
-  def ? (fn:T => Boolean):StreamProducer[T] = this.filter(fn)
-  def ~>[R](flatMapper : FlatMapper[R]) = this.flatMap[R](flatMapper)
-  def ! (upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean = true,strategy: PartitionStrategy = null) = alert(upStreamNames, alertExecutorId, consume,strategy)
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
deleted file mode 100644
index 5f3bd22..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.core
-
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-import scala.reflect.runtime.{universe => ru}
-
-/**
- * @since  12/7/15
- */
-trait StreamSourceBuilder {
-  def config:Configuration
-
-  /**
-   * Business logic DAG
-   * @return
-   */
-  def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]
-
-  /**
-   *
-   * @param iterable top level Iterable interface
-   * @param recycle
-   * @tparam T
-   * @return
-   */
-  def from[T:ru.TypeTag](iterable: Iterable[T],recycle:Boolean = false):IterableStreamProducer[T]={
-    val p = IterableStreamProducer[T](iterable,recycle)
-    p.initWith(dag,config.get)
-    p
-  }
-
-  def from[T:ru.TypeTag](iterator: Iterator[T],recycle:Boolean):IteratorStreamProducer[T]={
-    val p = IteratorStreamProducer[T](iterator)
-    p.initWith(dag,config.get)
-    p
-  }
-
-  def from(product: Product):IteratorStreamProducer[Any]={
-    val p = IteratorStreamProducer[Any](product.productIterator)
-    p.initWith(dag,config.get)
-    p
-  }
-
-  def register[T](producer:StreamProducer[T]):Unit = {
-    producer.initWith(dag,config.get)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala
deleted file mode 100644
index 351782b..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.Config
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
-
-/**
- * union operator should be expanded
- */
-case class StreamUnionExpansion(config: Config) extends StreamDAGExpansion(config){
-  val LOG = LoggerFactory.getLogger(classOf[StreamUnionExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = {
-    val iter = dag.iterator()
-    var toBeAddedEdges = new ListBuffer[StreamConnector[Any,Any]]
-    var toBeRemovedVertex = new ListBuffer[StreamProducer[Any]]
-    while(iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        val child = edge.to
-        child match {
-          case StreamUnionProducer(others) => {
-            dag.outgoingEdgesOf(child).foreach(c2 => {
-              toBeAddedEdges += StreamConnector(current, c2.to, edge)
-              others.foreach(o => {
-                toBeAddedEdges += StreamConnector(o, c2.to, edge)
-              })
-            })
-            toBeRemovedVertex += child
-          }
-          case _ =>
-        }
-      })
-    }
-
-    // add back edges
-    toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e))
-    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
-  }
-}
-
-object StreamUnionExpansion{
-  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamUnionExpansion ={
-    val e = StreamUnionExpansion(config)
-    e.expand(dag)
-    e
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
deleted file mode 100644
index 64b5f0f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.task.{OutputCollector, TopologyContext}
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.tuple.{Fields, Tuple}
-import org.apache.eagle.datastream.core.StreamInfo
-import org.apache.eagle.datastream.utils.NameConstants
-import org.slf4j.LoggerFactory
-
-/**
- *
- * @param fieldsNum zero-fieldsNum may means something different
- * @param ack
- * @param streamInfo
- * @tparam T
- */
-abstract class AbstractStreamBolt[T](val fieldsNum:Int=1, val ack:Boolean = true)(implicit streamInfo:StreamInfo) extends BaseRichBolt{
-  private var _collector: OutputCollector = null
-  private val LOG = LoggerFactory.getLogger(classOf[AbstractStreamBolt[T]])
-
-  /**
-   * If outKeyed then
-   *  Fields = ("key","value"]
-   * elsif num > 0
-   *  Fields = ["f0","f1",..,"fn"]
-   * elsif num == 0
-   *  Fields = ["f0"]
-   * end
-   *
-   * @param declarer
-   */
-  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
-    if(streamInfo.outKeyed) {
-      declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE))
-    }else{
-      if(fieldsNum > 0) {
-        val fields = new util.ArrayList[String]()
-        var i: Int = 0
-        while (i < fieldsNum) {
-          fields.add(NameConstants.FIELD_PREFIX + i)
-          i += 1
-        }
-        declarer.declare(new Fields(fields))
-      }else if(fieldsNum == 0){
-        declarer.declare(new Fields(NameConstants.FIELD_PREFIX + 0))
-      }
-    }
-  }
-
-  def emit(values:util.List[AnyRef])(implicit input:Tuple){
-    if (streamInfo.outKeyed) {
-      _collector.emit(input, util.Arrays.asList(streamInfo.keySelector.key(values).asInstanceOf[AnyRef], values))
-    } else {
-      _collector.emit(input, values)
-    }
-  }
-
-  def emit(value:Any)(implicit input:Tuple){
-    if(streamInfo.outKeyed) {
-      _collector.emit(input, util.Arrays.asList(streamInfo.keySelector.key(value).asInstanceOf[AnyRef],value.asInstanceOf[AnyRef]))
-    }else{
-      _collector.emit(input,util.Arrays.asList(value.asInstanceOf[AnyRef]))
-    }
-  }
-
-  override def execute(input: Tuple): Unit = {
-    try {
-      implicit val _input = input
-      if (streamInfo.inKeyed) {
-        val key = input.getValueByField(NameConstants.FIELD_KEY)
-        val value = input.getValueByField(NameConstants.FIELD_VALUE).asInstanceOf[T]
-        onKeyValue(key, value)
-      } else {
-        onValues(input.getValues)
-      }
-      if(ack) _collector.ack(input)
-    }catch {
-      case t: Throwable => {
-        LOG.error(s"Got exception when processing $input",t)
-        _collector.fail(input)
-      }
-    }
-  }
-
-  /**
-   * Handle keyed stream value
-   */
-  def onKeyValue(key:Any,value:T)(implicit input:Tuple)
-
-  /**
-   * Handle general stream values list
-   *
-   * @param values
-   */
-  def onValues(values:util.List[AnyRef])(implicit input:Tuple)
-
-  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
-    _collector = collector
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala
deleted file mode 100644
index b175b41..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.tuple.Tuple
-import org.apache.eagle.datastream.core.StreamInfo
-
-case class FilterBoltWrapper(fn:Any => Boolean)(implicit info:StreamInfo) extends AbstractStreamBolt[Any](fieldsNum = 1){
-  /**
-   * Handle keyed stream value
-   */
-  override def onKeyValue(key: Any, value: Any)(implicit input:Tuple): Unit = {
-    if(fn(value)) emit(value)
-  }
-
-  /**
-   * Handle general stream values list
-   *
-   * @param values
-   */
-  override def onValues(values: util.List[AnyRef])(implicit input:Tuple): Unit = {
-    val value = values.get(0)
-    if(fn(value)) emit(value)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala
deleted file mode 100644
index 4c105b2..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.tuple.Tuple
-import org.apache.eagle.datastream.core.StreamInfo
-
-/**
- * @since  12/6/15
- */
-case class ForeachBoltWrapper(fn:Any=>Unit)(implicit info:StreamInfo) extends AbstractStreamBolt[Any]  {
-  /**
-   * Handle keyed stream value
-   * @param value
-   */
-  override def onKeyValue(key:Any,value: Any)(implicit input:Tuple): Unit = {
-    fn(value)
-  }
-
-  /**
-   * Handle non-keyed stream values list
-   *
-   * @param values
-   */
-  override def onValues(values: util.List[AnyRef])(implicit input:Tuple): Unit = {
-    fn(values)
-  }
-}
\ No newline at end of file



Mime
View raw message