eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [35/51] [partial] incubator-eagle git commit: EAGLE-184 Migrate eagle website from https://github.com/eaglemonitoring/eaglemonitoring.github.io to document branch
Date Thu, 03 Mar 2016 18:10:08 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
deleted file mode 100644
index c64ea83..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.spout.SpoutOutputCollector
-import backtype.storm.task.TopologyContext
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichSpout
-import backtype.storm.tuple.Fields
-import backtype.storm.utils.Utils
-import org.apache.eagle.datastream.core.StreamInfo
-import org.apache.eagle.datastream.utils.NameConstants
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-/**
- * @since  12/6/15
- */
-case class IterableStreamSpout(iterable: Iterable[Any],recycle:Boolean = true)(implicit info:StreamInfo) extends BaseRichSpout {
-  val LOG = LoggerFactory.getLogger(classOf[IterableStreamSpout])
-  var _collector:SpoutOutputCollector=null
-  var _iterator:Iterator[Any] = null
-
-  override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
-    this._collector = collector
-    this._iterator = iterable.iterator
-  }
-
-  override def nextTuple(): Unit = {
-    if(_iterator.hasNext){
-      val current = _iterator.next().asInstanceOf[AnyRef]
-      if(info.outKeyed) {
-        _collector.emit(List(info.keySelector.key(current),current).asJava.asInstanceOf[util.List[AnyRef]])
-      }else{
-        _collector.emit(List(current).asJava)
-      }
-    }else if(recycle){
-      if(LOG.isDebugEnabled) LOG.debug("Recycling the iterator")
-      _iterator = iterable.iterator
-    }else{
-      if(LOG.isDebugEnabled) LOG.debug("No tuple left, sleep forever")
-      this.deactivate()
-      Utils.sleep(Long.MaxValue)
-    }
-  }
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
-    if(info.outKeyed) {
-      declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE))
-    }else{
-      declarer.declare(new Fields(s"${NameConstants.FIELD_PREFIX}0"))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala
deleted file mode 100644
index ea6d658..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.spout.SpoutOutputCollector
-import backtype.storm.task.TopologyContext
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichSpout
-import backtype.storm.tuple.Fields
-import backtype.storm.utils.Utils
-import org.apache.eagle.datastream.core.StreamInfo
-import org.apache.eagle.datastream.utils.NameConstants
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-case class IteratorStreamSpout(iterator: Iterator[Any])(implicit info:StreamInfo) extends BaseRichSpout {
-  val LOG = LoggerFactory.getLogger(classOf[IterableStreamSpout])
-  var _collector:SpoutOutputCollector=null
-  var _iterator:Iterator[Any] = null
-
-  override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
-    this._collector = collector
-    this._iterator = iterator
-  }
-
-  override def nextTuple(): Unit = {
-    if(_iterator.hasNext){
-      val current = _iterator.next().asInstanceOf[AnyRef]
-      if(info.outKeyed) {
-        _collector.emit(List(info.keySelector.key(current),current).asJava.asInstanceOf[util.List[AnyRef]])
-      }else{
-        _collector.emit(List(current).asJava)
-      }
-    }else{
-      LOG.info("No tuple left, sleep forever")
-      this.deactivate()
-      Utils.sleep(Long.MaxValue)
-    }
-  }
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
-    if(info.outKeyed) {
-      declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE))
-    }else{
-      declarer.declare(new Fields(s"${NameConstants.FIELD_PREFIX}0"))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala
deleted file mode 100644
index 802c782..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.task.{OutputCollector, TopologyContext}
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.tuple.{Fields, Tuple}
-import org.apache.eagle.datastream.{Collector, JavaStormStreamExecutor}
-import org.slf4j.LoggerFactory
-
-case class JavaStormBoltWrapper(worker : JavaStormStreamExecutor[AnyRef]) extends BaseRichBolt{
-  val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
-  var _collector : OutputCollector = null
-
-  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
-    _collector = collector
-    worker.init
-  }
-
-  override def execute(input : Tuple): Unit ={
-    worker.flatMap(input.getValues, new Collector[AnyRef](){
-      def collect(t: AnyRef): Unit ={
-        _collector.emit(input, StormWrapperUtils.productAsJavaList(t.asInstanceOf[Product]))
-      }
-    })
-    _collector.ack(input)
-  }
-
-  override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
-    val fields = worker.fields
-    LOG.info("output fields for worker " + worker + " : " + fields.toList)
-    declarer.declare(new Fields(fields:_*))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
deleted file mode 100644
index 19305fa..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.io.IOException
-import java.util
-import java.util.Properties
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer
-import org.slf4j.{Logger, LoggerFactory}
-
-/**
- * @since  11/6/15
- */
-case class JsonMessageDeserializer(props:Properties) extends SpoutKafkaMessageDeserializer{
-  private val objectMapper: ObjectMapper = new ObjectMapper
-  private val LOG: Logger = LoggerFactory.getLogger(classOf[JsonMessageDeserializer])
-
-  override def deserialize(bytes: Array[Byte]): AnyRef = {
-    var map: util.Map[String, _] = null
-    if(bytes.length == 0 || bytes == null){
-     if(LOG.isDebugEnabled) LOG.warn("Skip empty message")
-    }else {
-      try {
-        map = objectMapper.readValue(bytes, classOf[util.TreeMap[String, _]])
-      } catch {
-        case e: IOException => {
-          LOG.error("Failed to deserialize json from: " + new String(bytes), e)
-        }
-      }
-    }
-    map
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala
deleted file mode 100644
index 8c92590..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider
-import org.apache.eagle.datastream.ExecutionEnvironments
-
-class KafkaStreamMonitorApp extends App {
-  val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
-  val streamName = env.config.get[String]("eagle.stream.name","eventStream")
-  val streamExecutorId = env.config.get[String]("eagle.stream.executor",s"${streamName}Executor")
-  env.config.set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)
-  env.fromSpout(new KafkaSourcedSpoutProvider()).parallelism(1).nameAs(streamName) ! (Seq(streamName),streamExecutorId)
-  env.execute()
-}
-
-object KafkaStreamMonitor extends KafkaStreamMonitorApp
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala
deleted file mode 100644
index 1119e08..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.tuple.Tuple
-import org.apache.eagle.datastream.core.StreamInfo
-
-/**
- * @param num if num is zero, then means that it's using type-safe way, because to map operation, it must require at least one output field
- * @param fn
- * @param streamInfo
- */
-case class MapBoltWrapper(num: Int, fn: Any => Any)(implicit streamInfo: StreamInfo) extends AbstractStreamBolt[Any](fieldsNum = num){
-  /**
-   * Handle keyed stream value
-   */
-  override def onKeyValue(key: Any, value: Any)(implicit input:Tuple): Unit = {
-    emit(fn(value))
-  }
-
-  /**
-   * Handle general stream values list
-   *
-   * @param values
-   */
-  override def onValues(values: util.List[AnyRef])(implicit input:Tuple): Unit = {
-    val size = values.size()
-    if(size == 0) return
-    if(num == 0) {
-      emit(fn(values.get(0)))
-    } else {
-      var tuple: AnyRef = null
-      size match {
-        case 1 => tuple = scala.Tuple1[AnyRef](values.get(0))
-        case 2 => tuple = scala.Tuple2(values.get(0), values.get(1))
-        case 3 => tuple = scala.Tuple3(values.get(0), values.get(1), values.get(2))
-        case 4 => tuple = scala.Tuple4(values.get(0), values.get(1), values.get(2), values.get(3))
-        case _ => throw new IllegalArgumentException(s"Exceed max supported tuple size $size > 4")
-      }
-      val output = fn(tuple)
-      output match {
-        case scala.Tuple1(a) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef]))
-        case scala.Tuple2(a, b) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef]))
-        case scala.Tuple3(a, b, c) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef]))
-        case scala.Tuple4(a, b, c, d) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef], d.asInstanceOf[AnyRef]))
-        case a => emit(util.Arrays.asList(a.asInstanceOf[AnyRef]))
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/SpoutProxy.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/SpoutProxy.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/SpoutProxy.scala
deleted file mode 100644
index 37622e4..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/SpoutProxy.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.spout.{ISpoutOutputCollector, SpoutOutputCollector}
-import backtype.storm.task.TopologyContext
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichSpout
-import backtype.storm.tuple.Fields
-import org.apache.eagle.datastream.core.{KeySelector, StreamInfo}
-import org.apache.eagle.datastream.utils.NameConstants
-import java.util.{List => JList}
-
-/**
- * Declare delegated BaseRichSpout with given field names
- *
- * @param delegate delegated BaseRichSpout
- * @param outputFields given field names
- */
-case class SpoutProxy(delegate: BaseRichSpout, outputFields: java.util.List[String]) extends BaseRichSpout{
-  def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector) {
-    this.delegate.open(conf, context, collector)
-  }
-
-  def nextTuple {
-    this.delegate.nextTuple
-  }
-
-  override def ack(msgId: AnyRef) {
-    this.delegate.ack(msgId)
-  }
-
-  override def fail(msgId: AnyRef) {
-    this.delegate.fail(msgId)
-  }
-
-  override def deactivate {
-    this.delegate.deactivate
-  }
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer) {
-    declarer.declare(new Fields(outputFields))
-  }
-
-  override def close {
-    this.delegate.close
-  }
-}
-
-case class KeyedSpoutOutputCollector(delegate: ISpoutOutputCollector,keyer:KeySelector) extends SpoutOutputCollector(delegate) {
-  override def emitDirect (taskId: Int, streamId: String, tuple: JList[AnyRef], messageId: AnyRef):Unit = {
-    val kv = toKeyValue(tuple)
-    delegate.emitDirect(taskId, streamId,kv, messageId)
-  }
-
-  override def emit(streamId: String, tuple: JList[AnyRef], messageId: AnyRef):JList[Integer] = {
-    val kv = toKeyValue(tuple)
-    delegate.emit(streamId,kv, messageId)
-  }
-
-  def toKeyValue(tuple: JList[AnyRef]) = util.Arrays.asList(keyer.key(tuple.get(0)),tuple.get(0)).asInstanceOf[JList[AnyRef]]
-}
-
-case class KeyedSpoutProxy(delegate: BaseRichSpout)(implicit streamInfo:StreamInfo) extends BaseRichSpout{
-  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector) {
-    if(!streamInfo.outKeyed) throw new IllegalArgumentException(s"$streamInfo is not key-based")
-    if(streamInfo.keySelector == null) throw new NullPointerException(s"KeySelector $streamInfo is null")
-
-    this.delegate.open(conf, context, KeyedSpoutOutputCollector(collector,streamInfo.keySelector))
-  }
-
-  override def nextTuple {
-    this.delegate.nextTuple
-  }
-
-  override def ack(msgId: AnyRef) {
-    this.delegate.ack(msgId)
-  }
-
-  override def fail(msgId: AnyRef) {
-    this.delegate.fail(msgId)
-  }
-
-  override def deactivate {
-    this.delegate.deactivate
-  }
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer) {
-    declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE))
-  }
-
-  override def close {
-    this.delegate.close
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
deleted file mode 100644
index 21057e7..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream.storm
-
-import backtype.storm.topology.base.BaseRichBolt
-import com.typesafe.config.Config
-import org.apache.eagle.dataproc.impl.persist.PersistExecutor
-import org.apache.eagle.datastream._
-import org.apache.eagle.datastream.core._
-
-object StormBoltFactory {
-  def getBoltWrapper(graph: StreamProducerGraph, producer : StreamProducer[Any], config : Config) : BaseRichBolt = {
-    implicit val streamInfo = producer.getInfo
-    producer match{
-      case FlatMapProducer(worker) => {
-        if(worker.isInstanceOf[JavaStormStreamExecutor[AnyRef]]){
-          worker.asInstanceOf[JavaStormStreamExecutor[AnyRef]].prepareConfig(config)
-          JavaStormBoltWrapper(worker.asInstanceOf[JavaStormStreamExecutor[AnyRef]])
-        }else if(worker.isInstanceOf[StormStreamExecutor[AnyRef]]){
-          worker.asInstanceOf[StormStreamExecutor[AnyRef]].prepareConfig(config)
-          StormBoltWrapper(worker.asInstanceOf[StormStreamExecutor[AnyRef]])
-        }else if(worker.isInstanceOf[FlatMapperWrapper[Any]]){
-          StormFlatFunctionWrapper(worker.asInstanceOf[FlatMapperWrapper[Any]].func)
-        } else {
-          StormFlatMapperWrapper(worker)
-        }
-//        else {
-//          throw new UnsupportedOperationException(s"Unsupported FlatMapperProducer type: $producer")
-//        }
-      }
-      case filter:FilterProducer[Any] => {
-        FilterBoltWrapper(filter.fn)
-      }
-      case mapper:MapperProducer[Any,Any] => {
-        MapBoltWrapper(mapper.numOutputFields, mapper.fn)
-      }
-      case foreach:ForeachProducer[Any] => {
-        ForeachBoltWrapper(foreach.fn)
-      }
-      case persist : PersistProducer[Any] => {
-        val persisExecutor = new PersistExecutor(persist.executorId, persist.storageType.toString)
-        persisExecutor.prepareConfig(config)
-        JavaStormBoltWrapper(persisExecutor.asInstanceOf[JavaStormStreamExecutor[AnyRef]])
-      }
-      case _ => throw new UnsupportedOperationException(s"Unsupported producer: ${producer.toString}")
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala
deleted file mode 100644
index 75f045e..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.task.{OutputCollector, TopologyContext}
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.tuple.{Fields, Tuple}
-import org.apache.eagle.datastream.{Collector, StormStreamExecutor}
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-case class StormBoltWrapper(worker : StormStreamExecutor[AnyRef]) extends BaseRichBolt{
-  val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
-  var _collector : OutputCollector = null
-
-  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
-    _collector = collector
-    worker.init
-  }
-
-  override def execute(input : Tuple): Unit = {
-    try {
-      worker.flatMap(input.getValues.asScala, new Collector[AnyRef] {
-        override def collect(t: AnyRef): Unit = {
-          _collector.emit(input, StormWrapperUtils.productAsJavaList(t.asInstanceOf[Product]))
-        }
-      })
-    }catch{
-      case ex: Exception => {
-        LOG.error("fail executing", ex)
-        _collector.fail(input)
-        throw new RuntimeException(ex)
-      }
-    }
-    _collector.ack(input)
-  }
-
-  override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
-    val fields = worker.fields
-    LOG.info("Output fields for worker " + worker + " : " + fields.toList)
-    declarer.declare(new Fields(fields:_*))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
deleted file mode 100644
index 4165db4..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import backtype.storm.topology.base.BaseRichSpout
-import com.typesafe.config.Config
-import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider
-import org.apache.eagle.datastream.core.{ExecutionEnvironment, StormSourceProducer, StreamDAG}
-
-/**
- * @since  12/7/15
- */
-class StormExecutionEnvironment(private val conf:Config) extends ExecutionEnvironment(conf) {
-  override def execute(dag: StreamDAG) : Unit = {
-    StormTopologyCompiler(config.get, dag).buildTopology.execute
-  }
-
-  def fromSpout[T](source: BaseRichSpout): StormSourceProducer[T] = {
-    val ret = StormSourceProducer[T](source)
-    ret.initWith(dag,config.get)
-    ret
-  }
-
-  def fromSpout[T](sourceProvider: StormSpoutProvider):StormSourceProducer[T] = fromSpout(sourceProvider.getSpout(config.get))
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
deleted file mode 100644
index f15e736..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import org.apache.eagle.datastream.Collector
-import org.apache.eagle.datastream.StormStreamExecutor
-import org.apache.eagle.datastream.StormStreamExecutor3
-
-import com.typesafe.config.Config
-
-case class StormExecutorForAlertWrapper(delegate: StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]], streamName: String)
-  extends StormStreamExecutor3[String, String, util.SortedMap[Object, Object]]{
-  override def prepareConfig(config: Config): Unit = {
-    delegate.prepareConfig(config)
-  }
-
-  override def init: Unit = {
-    delegate.init
-  }
-
-  override def flatMap(input: Seq[AnyRef], collector: Collector[Tuple3[String, String, util.SortedMap[Object, Object]]]): Unit = {
-    delegate.flatMap(input, new Collector[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]] {
-      override def collect(r: Tuple2[String, util.SortedMap[AnyRef, AnyRef]]): Unit = {
-        collector.collect(Tuple3(r._1, streamName, r._2))
-      }
-    })
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala
deleted file mode 100644
index e5eea1f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.tuple.Tuple
-import org.apache.eagle.datastream.Collector
-import org.apache.eagle.datastream.core.StreamInfo
-
-case class StormFlatFunctionWrapper(flatMapper:(Any,Collector[Any])=>Unit)(implicit info:StreamInfo) extends AbstractStreamBolt[Any]{
-  /**
-   * Handle keyed stream value
-   */
-  override def onKeyValue(key: Any, value: Any)(implicit input: Tuple): Unit = {
-    flatMapper(value,new Collector[Any] {
-      override def collect(r: Any): Unit = emit(r)(input)
-    })
-  }
-
-  /**
-   * Handle general stream values list
-   *
-   * @param values
-   */
-  override def onValues(values: util.List[AnyRef])(implicit input: Tuple): Unit = {
-    flatMapper(values,new Collector[Any] {
-      override def collect(r: Any): Unit = emit(r)(input)
-    })
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala
deleted file mode 100644
index e5fb86d..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import backtype.storm.tuple.Tuple
-import org.apache.eagle.datastream.{Collector, FlatMapper}
-import org.apache.eagle.datastream.core.StreamInfo
-import scala.collection.JavaConverters._
-
-case class StormFlatMapperWrapper(flatMapper:FlatMapper[Any])(implicit info:StreamInfo) extends AbstractStreamBolt[Any]{
-  /**
-   * Handle keyed stream value
-   */
-  override def onKeyValue(key: Any, value: Any)(implicit input: Tuple): Unit = {
-    flatMapper.flatMap(value.asInstanceOf[Seq[AnyRef]],new Collector[Any] {
-      override def collect(r: Any): Unit = emit(r)(input)
-    })
-  }
-
-  /**
-   * Handle general stream values list
-   *
-   * @param values
-   */
-  override def onValues(values: java.util.List[AnyRef])(implicit input: Tuple): Unit = {
-    flatMapper.flatMap(values.asScala,new Collector[Any] {
-      override def collect(r: Any): Unit = emit(r)(input)
-    })
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
deleted file mode 100644
index 6a3b606..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.topology.base.BaseRichSpout
-import com.typesafe.config.Config
-import org.apache.eagle.datastream.core.{IteratorStreamProducer, IterableStreamProducer, StormSourceProducer, StreamProducer}
-import org.apache.eagle.datastream.utils.NameConstants
-
-object StormSpoutFactory {
-  def createSpout(config: Config, from: StreamProducer[Any]): BaseRichSpout = {
-    implicit val streamInfo = from.getInfo
-    from match {
-      case p@StormSourceProducer(source) =>
-        if(p.outKeyed) {
-          createKeyedProxySpout(p)
-        }else {
-          createProxySpout(p)
-        }
-      case p@IterableStreamProducer(iterable,recycle) =>
-        IterableStreamSpout(iterable,recycle)
-      case p@IteratorStreamProducer(iterator) =>
-        IteratorStreamSpout(iterator)
-      case _ =>
-        throw new IllegalArgumentException(s"Cannot compile unknown $from to a Storm Spout")
-    }
-  }
-
-  /**
-   * @param sourceProducer source producer
-   * @return
-   */
-  def createProxySpout(sourceProducer: StormSourceProducer[Any]): BaseRichSpout = {
-    val numFields = sourceProducer.numFields
-    if (numFields <= 0) {
-      sourceProducer.source
-    } else {
-      var i = 0
-      val ret = new util.ArrayList[String]
-      while (i < numFields) {
-        ret.add(NameConstants.FIELD_PREFIX + i)
-        i += 1
-      }
-      SpoutProxy(sourceProducer.source, ret)
-    }
-  }
-
-  def createKeyedProxySpout(sourceProducer: StormSourceProducer[Any]):BaseRichSpout = {
-    KeyedSpoutProxy(sourceProducer.source)(sourceProducer.getInfo)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
deleted file mode 100644
index f572377..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.topology.{BoltDeclarer, TopologyBuilder}
-import backtype.storm.tuple.Fields
-import com.typesafe.config.Config
-import org.apache.eagle.dataproc.impl.storm.partition.CustomPartitionGrouping
-import org.apache.eagle.datastream.core._
-import org.apache.eagle.datastream.utils.NameConstants
-import org.slf4j.LoggerFactory
-
-import scala.collection.mutable.ListBuffer
-
-case class StormTopologyCompiler(config: Config, graph: StreamProducerGraph) extends AbstractTopologyCompiler{
-  val LOG = LoggerFactory.getLogger(StormTopologyCompiler.getClass)
-  val boltCache = scala.collection.mutable.Map[StreamProducer[Any], StormBoltWrapper]()
-
-  override def buildTopology: AbstractTopologyExecutor ={
-    val builder = new TopologyBuilder()
-    val iter = graph.iterator()
-    val boltDeclarerCache = scala.collection.mutable.Map[String, BoltDeclarer]()
-    val stormTopologyGraph = ListBuffer[String]()
-    while(iter.hasNext){
-      val from = iter.next()
-      val fromName = from.name
-      if(graph.isSource(from)){
-        val spout = StormSpoutFactory.createSpout(config, from)
-        builder.setSpout(fromName, spout, from.parallelism)
-        LOG.info("Spout: " + fromName + " with parallelism " + from.parallelism)
-      } else {
-        LOG.debug("Bolt" + fromName)
-      }
-      val edges = graph.outgoingEdgesOf(from)
-      edges.foreach(sc => {
-        val toName = sc.to.name
-        var boltDeclarer: BoltDeclarer = null
-        val toBolt = createBoltIfAbsent(toName)
-        boltDeclarerCache.get(toName) match {
-          case None => {
-            var finalParallelism = 1
-            graph.getNodeByName(toName) match {
-              case Some(p) => finalParallelism = p.parallelism
-              case None => finalParallelism = 1
-            }
-            boltDeclarer = builder.setBolt(toName, toBolt, finalParallelism)
-            LOG.info("Bolt: " + toName + " with parallelism " + finalParallelism)
-            boltDeclarerCache.put(toName, boltDeclarer)
-          }
-          case Some(bt) => boltDeclarer = bt
-        }
-
-        sc match {
-          case GroupbyFieldsConnector(_, _, groupByFields) =>
-            boltDeclarer.fieldsGrouping(fromName, new Fields(fields(groupByFields)))
-          case GroupbyStrategyConnector(_, _, strategy) =>
-            boltDeclarer.customGrouping(fromName, new CustomPartitionGrouping(strategy));
-          case GroupbyKeyConnector(_, _, keySelector) =>
-            boltDeclarer.fieldsGrouping(fromName, new Fields(NameConstants.FIELD_KEY));
-          case ShuffleConnector(_, _) => {
-            boltDeclarer.shuffleGrouping(fromName)
-          }
-          case _ => throw new UnsupportedOperationException(s"Supported stream connector $sc")
-        }
-
-        if (graph.isSource(from)) {
-          stormTopologyGraph += s"Spout{$fromName}{${from.parallelism}) ~> Bolt{$toName}{${from.parallelism}} in $sc"
-        } else {
-          stormTopologyGraph += s"Bolt{$fromName}{${from.parallelism}) ~> Bolt{$toName}{${from.parallelism}} in $sc"
-        }
-      })
-    }
-    LOG.info(s"Storm topology DAG\n{\n \t${stormTopologyGraph.mkString("\n\t")} \n}")
-    new StormTopologyExecutorImpl(builder.createTopology, config)
-  }
-
-  def fields(fields : Seq[Int]): java.util.List[String] ={
-    val ret = new util.ArrayList[String]
-    fields.map(n => ret.add(NameConstants.FIELD_PREFIX + n))
-    ret
-  }
-
-  def createBoltIfAbsent(name : String) : BaseRichBolt = {
-    val producer = graph.getNodeByName(name)
-    producer match{
-      case Some(p) => createBoltIfAbsent(graph, p)
-      case None => throw new IllegalArgumentException("please check bolt name " + name)
-    }
-  }
-
-  def createBoltIfAbsent(graph: StreamProducerGraph, producer : StreamProducer[Any]): BaseRichBolt ={
-    boltCache.get(producer) match{
-      case Some(bolt) => bolt
-      case None => {
-        StormBoltFactory.getBoltWrapper(graph, producer, config)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
deleted file mode 100644
index 5d64c4c..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.io.{File, FileInputStream}
-
-import _root_.storm.trident.spout.RichSpoutBatchExecutor
-import backtype.storm.generated.StormTopology
-import backtype.storm.utils.Utils
-import backtype.storm.{Config, LocalCluster, StormSubmitter}
-import org.apache.eagle.datastream.core.AbstractTopologyExecutor
-import org.apache.thrift7.transport.TTransportException
-import org.slf4j.LoggerFactory
-import org.yaml.snakeyaml.Yaml
-
-case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesafe.config.Config) extends AbstractTopologyExecutor {
-  val LOG = LoggerFactory.getLogger(classOf[StormTopologyExecutorImpl])
-  @throws(classOf[Exception])
-  def execute {
-    val localMode: Boolean = config.getString("envContextConfig.mode").equalsIgnoreCase("local")
-    val conf: Config = new Config
-    conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024))
-    conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8))
-    conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32))
-    conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384))
-    conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384))
-    conf.put(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000))
-
-    if(config.hasPath("envContextConfig.stormConfigFile")) {
-      val file = new File(config.getString("envContextConfig.stormConfigFile"))
-      if(file.exists()) {
-        val inputFileStream = new FileInputStream(file)
-        val yaml = new Yaml()
-        try {
-          val stormConf = yaml.load(inputFileStream).asInstanceOf[java.util.LinkedHashMap[String, Object]]
-          if(stormConf != null) conf.putAll(stormConf)
-        } catch {
-          case t: Throwable => {
-            LOG.error(s"Got example $t",t)
-            throw t
-          }
-        } finally {
-          if(inputFileStream != null) inputFileStream.close()
-        }
-      }
-    }
-
-    val topologyName = config.getString("envContextConfig.topologyName")
-    if (!localMode) {
-      if(config.hasPath("envContextConfig.nimbusHost")) {
-        LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_HOST} as ${config.getString("envContextConfig.nimbusHost")}")
-        conf.put(backtype.storm.Config.NIMBUS_HOST, config.getString("envContextConfig.nimbusHost"))
-      }
-
-      if(config.hasPath("envContextConfig.nimbusThriftPort")) {
-        LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_THRIFT_PORT} as ${config.getString("envContextConfig.nimbusThriftPort")}")
-        conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, config.getNumber("envContextConfig.nimbusThriftPort"))
-      }
-
-      if(config.hasPath("envContextConfig.jarFile")){
-        LOG.info(s"Setting storm.jar as ${config.getString("envContextConfig.jarFile")}")
-        System.setProperty("storm.jar",config.getString("envContextConfig.jarFile"))
-      }
-
-      LOG.info("Submitting as cluster mode")
-      try {
-        StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology)
-      } catch {
-        case e:TTransportException =>
-          LOG.error(s"Got thrift exception, type: ${e.getType}")
-          throw e
-      }
-      finally {
-        System.clearProperty("storm.jar")
-      }
-    } else {
-      LOG.info("Submitting as local mode")
-      val cluster: LocalCluster = new LocalCluster
-      cluster.submitTopology(topologyName, conf, topology)
-      while(true) {
-        try {
-          Utils.sleep(Integer.MAX_VALUE)
-        } catch {
-          case _: Throwable => () // Do nothing
-        }
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormWrapperUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormWrapperUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormWrapperUtils.scala
deleted file mode 100644
index 95c6ac1..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormWrapperUtils.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-object StormWrapperUtils {
-  def productAsJavaList(product:Product):util.List[AnyRef]={
-    val list = new util.LinkedList[AnyRef]()
-    product.productIterator.foreach((p:Any) => list.add(p.asInstanceOf[AnyRef]))
-    list
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
deleted file mode 100644
index 76250e2..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream.utils
-
-import java.util
-
-import org.apache.eagle.alert.dedup.{AlertEmailDeduplicationExecutor, AlertEntityDeduplicationExecutor}
-import org.apache.eagle.alert.executor.AlertExecutor
-import org.apache.eagle.alert.notification.AlertNotificationExecutor
-import org.apache.eagle.alert.persist.AlertPersistExecutor
-import org.apache.eagle.datastream.core.{StreamConnector, FlatMapProducer, StreamProducer}
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.collection.mutable.ListBuffer
-
-/**
- * Create alert executors and provide callback for programmer to link alert executor to immediate parent executors
- *
- * <br/><br/>
- * Explanations for programId, alertExecutorId and policy<br/><br/>
- * - programId - distributed or single-process program for example one storm topology<br/>
- * - alertExecutorId - one process/thread which executes multiple policies<br/>
- * - policy - some rules to be evaluated<br/>
- *
- * <br/>
- *
- * Normally the mapping is like following:
- * <pre>
- * programId (1:N) alertExecutorId
- * alertExecutorId (1:N) policy
- * </pre>
- */
-
-object AlertExecutorConsumerUtils {
-  private val LOG: Logger = LoggerFactory.getLogger(AlertExecutorConsumerUtils.getClass)
-
-  def setupAlertConsumers(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], alertStreamProducers: List[StreamProducer[Any]]): Unit = {
-    val alertExecutorIdList: java.util.List[String] = new util.ArrayList[String]()
-    alertStreamProducers.map(x =>
-      alertExecutorIdList.add(x.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getExecutorId));
-    val alertDefDao = alertStreamProducers.head.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getPolicyDefinitionDao
-    val entityDedupExecutor: AlertEntityDeduplicationExecutor = new AlertEntityDeduplicationExecutor(alertExecutorIdList, alertDefDao)
-    val emailDedupExecutor: AlertEmailDeduplicationExecutor = new AlertEmailDeduplicationExecutor(alertExecutorIdList, alertDefDao)
-    val notificationExecutor: AlertNotificationExecutor = new AlertNotificationExecutor(alertExecutorIdList, alertDefDao)
-    val persistExecutor: AlertPersistExecutor = new AlertPersistExecutor
-
-    val entityDedupStreamProducer = FlatMapProducer(entityDedupExecutor)
-    val persistStreamProducer = FlatMapProducer(persistExecutor)
-    val emailDedupStreamProducer = FlatMapProducer(emailDedupExecutor)
-    val notificationStreamProducer = FlatMapProducer(notificationExecutor)
-    toBeAddedEdges += StreamConnector(entityDedupStreamProducer, persistStreamProducer)
-    toBeAddedEdges += StreamConnector(emailDedupStreamProducer, notificationStreamProducer)
-
-    alertStreamProducers.foreach(sp => {
-      toBeAddedEdges += StreamConnector(sp, entityDedupStreamProducer)
-      toBeAddedEdges += StreamConnector(sp, emailDedupStreamProducer)
-    })
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/GraphPrinter.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/GraphPrinter.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/GraphPrinter.scala
deleted file mode 100644
index 33a805c..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/GraphPrinter.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream.utils
-
-import org.apache.eagle.datastream.core.{StreamConnector, StreamProducer}
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
-
-/**
- * @see scala.tools.nsc.DotDiagramGenerator
- */
-object GraphPrinter {
-  private val LOG = LoggerFactory.getLogger(GraphPrinter.getClass)
-  def print(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any, Any]], message:String = "Print stream DAG graph"): Unit = {
-    val graphStr = ListBuffer[String]()
-    val iter = dag.iterator()
-    while (iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        graphStr += s"${edge.from.name}{${edge.from.parallelism}} ~> ${edge.to.name}{${edge.to.parallelism}} in ${edge.toString}"
-      })
-    }
-    LOG.info(message+"\n{ \n\t" + graphStr.mkString("\n\t") + "\n}")
-  }
-  
-  def printDotDigraph(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any, Any]], title:String = "dag", message:String = "Print DOT digraph (copy and visualize with http://www.webgraphviz.com/)"): String = {
-    val graphStr = ListBuffer[String]()
-    val iter = dag.iterator()
-    while (iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        graphStr += s""""${edge.from.name} x ${edge.from.parallelismNum}" -> "${edge.to.name} x ${edge.from.parallelismNum}" [label = "$edge"];"""
-      })
-    }
-    val dotDigraph = s"""digraph $title { \n\t${graphStr.mkString("\n\t")} \n}"""
-    LOG.info(s"""$message\n\n$dotDigraph\n""")
-    dotDigraph
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NameConstants.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NameConstants.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NameConstants.scala
deleted file mode 100644
index d4836e4..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NameConstants.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.utils
-
-object NameConstants {
-  val FIELD_PREFIX = "f"
-  val FIELD_KEY = "key"
-  val FIELD_VALUE = "value"
-  val FIELD_SEPARATOR = "_"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala
deleted file mode 100644
index 331cf7c..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.utils
-
-import org.apache.eagle.datastream.core.StreamInfo
-
-case class NodeNameSelector(producer : StreamInfo) {
-  def getName : String = {
-    producer.name match {
-      case null => producer.toString+NameConstants.FIELD_SEPARATOR+producer.id
-      case _ => producer.name
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala
deleted file mode 100644
index 1d48752..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.utils
-
-import scala.reflect.api
-import scala.reflect.runtime.{universe => ru}
-
-/**
- * @since  12/7/15
- */
-object Reflections{
-  private val UNIT_CLASS = classOf[Unit]
-  private val UNIT_TYPE_TAG = ru.typeTag[Unit]
-
-  /**
-   * Class to TypeTag
-   * @param clazz class
-   * @tparam T Type T
-   * @return
-   */
-  def typeTag[T](clazz:Class[T]):ru.TypeTag[T]={
-    if(clazz == null){
-      null
-    }else if(clazz == UNIT_CLASS) {
-      UNIT_TYPE_TAG.asInstanceOf[ru.TypeTag[T]]
-    } else {
-      val mirror = ru.runtimeMirror(clazz.getClassLoader)
-      val sym = mirror.staticClass(clazz.getCanonicalName)
-      val tpe = sym.selfType
-      ru.TypeTag(mirror, new api.TypeCreator {
-        def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) =
-          if (m eq mirror) tpe.asInstanceOf[U#Type]
-          else throw new IllegalArgumentException(s"Type tag defined in $mirror cannot be migrated to other mirrors.")
-      })
-    }
-  }
-
-  def javaTypeClass[T](obj: AnyRef, index: Int = 0):Class[T] = JavaReflections.getGenericTypeClass(obj,index).asInstanceOf[Class[T]]
-  def javaTypeTag[T](obj: AnyRef, index: Int = 0):ru.TypeTag[T] = typeTag(JavaReflections.getGenericTypeClass(obj,index)).asInstanceOf[ru.TypeTag[T]]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala
deleted file mode 100644
index 4ac0cdc..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream.utils
-
-import java.util
-
-import org.apache.eagle.datastream.core.StreamProducer
-
-import scala.collection.JavaConverters._
-
-object UnionUtils {
-  def join[T1,T2](producers : StreamProducer[T1]*) : StreamProducer[T2] = {
-    producers.head.streamUnion(producers.drop(1))
-  }
-
-  def join[T1,T2](producers : java.util.List[StreamProducer[T1]]) : StreamProducer[T2] = {
-    val newList = new util.ArrayList(producers)
-    val head = newList.get(0)
-    newList.remove(0)
-    head.streamUnion(newList.asScala);
-  }
-
-  def join[T1,T2](producers : List[StreamProducer[T1]]) : StreamProducer[T2] = {
-    val head = producers.head
-    head.streamUnion(producers.tail);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/dataproc/util/TestConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/dataproc/util/TestConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/dataproc/util/TestConfigOptionParser.java
deleted file mode 100644
index efcb0e7..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/dataproc/util/TestConfigOptionParser.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.util;
-
-import junit.framework.Assert;
-import org.apache.commons.cli.ParseException;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-/**
- * @since 8/23/15
- */
-public class TestConfigOptionParser {
-    private final static Logger LOG = LoggerFactory.getLogger(TestConfigOptionParser.class);
-
-    @Test
-    public void testValidCommandArguments() throws ParseException {
-        String[] arguments = new String[]{
-                "-D","key1=value1",
-                "-D","key2=value2",
-                "-D","key3=value3=something",
-                "-D","key4=",
-                "-D","key5=\"--param having whitespace\""
-        };
-
-        Map<String,String> config = new ConfigOptionParser().parseConfig(arguments);
-
-        Assert.assertTrue(config.containsKey("key1"));
-        Assert.assertTrue(config.containsKey("key2"));
-        Assert.assertTrue(config.containsKey("key3"));
-        Assert.assertTrue(config.containsKey("key4"));
-        Assert.assertEquals("value1", config.get("key1"));
-        Assert.assertEquals("value2", config.get("key2"));
-        Assert.assertEquals("value3=something",config.get("key3"));
-        Assert.assertEquals("",config.get("key4"));
-        Assert.assertEquals("\"--param having whitespace",config.get("key5"));
-    }
-
-    @Test
-    public void testValidCommandArgumentsAsSystem() throws ParseException {
-        String[] arguments = new String[]{
-                "-D","key1=value1",
-                "-D","key2=value2",
-                "-D","key3=value3=something",
-                "-D","key4=",
-        };
-
-        new ConfigOptionParser().load(arguments);
-
-        Assert.assertTrue(System.getProperties().containsKey("key1"));
-        Assert.assertTrue(System.getProperties().containsKey("key2"));
-        Assert.assertTrue(System.getProperties().containsKey("key3"));
-        Assert.assertTrue(System.getProperties().containsKey("key4"));
-
-        Assert.assertEquals("value1", System.getProperty("key1"));
-        Assert.assertEquals("value2", System.getProperty("key2"));
-        Assert.assertEquals("value3=something",System.getProperty("key3"));
-        Assert.assertEquals("",System.getProperty("key4"));
-    }
-
-    @Test
-    public void testInvalidCommandArgument1()  {
-        String[] arguments = new String[]{
-                "-D","key1"
-        };
-
-        try {
-            new ConfigOptionParser().parseConfig(arguments);
-            Assert.fail("Should throw ParseException");
-        } catch (ParseException e) {
-            LOG.info("Expected exception: " +e.getMessage());
-        }
-    }
-
-    @Test
-    public void testInvalidCommandArgument2()  {
-        String[] arguments = new String[]{
-                "-D","=value"
-        };
-
-        try {
-            new ConfigOptionParser().parseConfig(arguments);
-            Assert.fail("Should throw ParseException");
-        } catch (ParseException e) {
-            LOG.info("Expected exception: " + e.getMessage());
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java
deleted file mode 100644
index 511db38..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream;
-
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple1;
-
-public class JavaEchoExecutor extends JavaStormStreamExecutor1<String>{
-    private static Logger LOG = LoggerFactory.getLogger(JavaEchoExecutor.class);
-    private Config config;
-    @Override
-    public void prepareConfig(Config config){
-        this.config = config;
-    }
-
-    /**
-     * give business code a chance to do initialization for example daemon thread
-     * this method is executed in remote machine
-     */
-    @Override
-    public void init(){
-    }
-
-    @Override
-    public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> collector){
-        collector.collect(new Tuple1(input.get(0)));
-        LOG.info("echo " + input);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java
deleted file mode 100644
index c5f9045..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.junit.Test;
-
-/**
- * @since 12/5/15
- */
-public class TestExecutionEnvironmentJava {
-
-    @Test
-    public void testGetEnvInJava() {
-        StormExecutionEnvironment env0 = ExecutionEnvironments.get(StormExecutionEnvironment.class);
-        Assert.assertNotNull(env0);
-
-        StormExecutionEnvironment env1 = ExecutionEnvironments.get(new String[]{}, StormExecutionEnvironment.class);
-        Assert.assertNotNull(env1);
-        Config config = ConfigFactory.load();
-        StormExecutionEnvironment env2 = ExecutionEnvironments.get(config, StormExecutionEnvironment.class);
-        Assert.assertNotNull(env2);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
deleted file mode 100644
index 979b09d..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.junit.Test;
-import scala.runtime.AbstractFunction1;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-public class TestJavaMain {
-    public static class SerializableFunction1<T1,R> extends AbstractFunction1<T1, R> implements Serializable {
-        @Override
-        public Object apply(Object v1) {
-            return null;
-        }
-    }
-
-    //@Test
-//    public void testGeneral(){
-//        Config config = ConfigFactory.load();
-//        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
-//        env.fromSpout(new TestKeyValueSpout()).withOutputFields(2).groupBy(Arrays.asList(0)).flatMap(new GroupedEchoExecutor()).parallelism(2);
-//        env.execute();
-//    }
-
-//    //@Test
-//    public void testMap(){
-//        Config config = ConfigFactory.load();
-//        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
-//        SerializableFunction1 f1 = new SerializableFunction1<Object, Object>();
-//        env.fromSpout(new TestKeyValueSpout()).withOutputFields(2).
-//                map1(f1);
-//        env.execute();
-//    }
-
-    @Test
-    public void test() {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java
deleted file mode 100644
index 6d2d36f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream;
-
-import junit.framework.Assert;
-import org.apache.eagle.datastream.utils.Reflections;
-import org.junit.Test;
-import scala.reflect.api.TypeTags;
-
-/**
- * @since 12/8/15
- */
-public class TestJavaReflectionUtils {
-    @Test
-    public void testJavaFlatMapper(){
-        Class<String> clazz = Reflections.javaTypeClass(new JavaEchoExecutor(), 0);
-        Assert.assertEquals(String.class,clazz);
-        TypeTags.TypeTag typeTag = Reflections.typeTag(clazz);
-        Assert.assertNotNull(typeTag);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
deleted file mode 100644
index 2a5043c..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream;
-
-import org.apache.eagle.datastream.storm.KafkaStreamMonitorApp;
-
-/**
- * @since 11/7/15
- */
-public class TestKafkaStreamMonitor {
-    public static void main(String[] args){
-        new KafkaStreamMonitorApp().main(args);
-    }
-}
\ No newline at end of file



Mime
View raw message