eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [05/13] incubator-eagle git commit: EAGLE-341 clean inner process alert engine code clean inner process alert engine code
Date Sun, 14 Aug 2016 06:23:05 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/SpoutProxy.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/SpoutProxy.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/SpoutProxy.scala
deleted file mode 100644
index 37622e4..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/SpoutProxy.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.spout.{ISpoutOutputCollector, SpoutOutputCollector}
-import backtype.storm.task.TopologyContext
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichSpout
-import backtype.storm.tuple.Fields
-import org.apache.eagle.datastream.core.{KeySelector, StreamInfo}
-import org.apache.eagle.datastream.utils.NameConstants
-import java.util.{List => JList}
-
-/**
- * Declare delegated BaseRichSpout with given field names
- *
- * @param delegate delegated BaseRichSpout
- * @param outputFields given field names
- */
-case class SpoutProxy(delegate: BaseRichSpout, outputFields: java.util.List[String]) extends BaseRichSpout{
-  def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector) {
-    this.delegate.open(conf, context, collector)
-  }
-
-  def nextTuple {
-    this.delegate.nextTuple
-  }
-
-  override def ack(msgId: AnyRef) {
-    this.delegate.ack(msgId)
-  }
-
-  override def fail(msgId: AnyRef) {
-    this.delegate.fail(msgId)
-  }
-
-  override def deactivate {
-    this.delegate.deactivate
-  }
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer) {
-    declarer.declare(new Fields(outputFields))
-  }
-
-  override def close {
-    this.delegate.close
-  }
-}
-
-case class KeyedSpoutOutputCollector(delegate: ISpoutOutputCollector,keyer:KeySelector) extends SpoutOutputCollector(delegate) {
-  override def emitDirect (taskId: Int, streamId: String, tuple: JList[AnyRef], messageId: AnyRef):Unit = {
-    val kv = toKeyValue(tuple)
-    delegate.emitDirect(taskId, streamId,kv, messageId)
-  }
-
-  override def emit(streamId: String, tuple: JList[AnyRef], messageId: AnyRef):JList[Integer] = {
-    val kv = toKeyValue(tuple)
-    delegate.emit(streamId,kv, messageId)
-  }
-
-  def toKeyValue(tuple: JList[AnyRef]) = util.Arrays.asList(keyer.key(tuple.get(0)),tuple.get(0)).asInstanceOf[JList[AnyRef]]
-}
-
-case class KeyedSpoutProxy(delegate: BaseRichSpout)(implicit streamInfo:StreamInfo) extends BaseRichSpout{
-  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector) {
-    if(!streamInfo.outKeyed) throw new IllegalArgumentException(s"$streamInfo is not key-based")
-    if(streamInfo.keySelector == null) throw new NullPointerException(s"KeySelector $streamInfo is null")
-
-    this.delegate.open(conf, context, KeyedSpoutOutputCollector(collector,streamInfo.keySelector))
-  }
-
-  override def nextTuple {
-    this.delegate.nextTuple
-  }
-
-  override def ack(msgId: AnyRef) {
-    this.delegate.ack(msgId)
-  }
-
-  override def fail(msgId: AnyRef) {
-    this.delegate.fail(msgId)
-  }
-
-  override def deactivate {
-    this.delegate.deactivate
-  }
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer) {
-    declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE))
-  }
-
-  override def close {
-    this.delegate.close
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
index 21057e7..379c859 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
@@ -20,7 +20,6 @@ package org.apache.eagle.datastream.storm
 
 import backtype.storm.topology.base.BaseRichBolt
 import com.typesafe.config.Config
-import org.apache.eagle.dataproc.impl.persist.PersistExecutor
 import org.apache.eagle.datastream._
 import org.apache.eagle.datastream.core._
 
@@ -54,9 +53,7 @@ object StormBoltFactory {
         ForeachBoltWrapper(foreach.fn)
       }
       case persist : PersistProducer[Any] => {
-        val persisExecutor = new PersistExecutor(persist.executorId, persist.storageType.toString)
-        persisExecutor.prepareConfig(config)
-        JavaStormBoltWrapper(persisExecutor.asInstanceOf[JavaStormStreamExecutor[AnyRef]])
+        JavaStormBoltWrapper(null)
       }
       case _ => throw new UnsupportedOperationException(s"Unsupported producer: ${producer.toString}")
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala
deleted file mode 100644
index 75f045e..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.task.{OutputCollector, TopologyContext}
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.tuple.{Fields, Tuple}
-import org.apache.eagle.datastream.{Collector, StormStreamExecutor}
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-case class StormBoltWrapper(worker : StormStreamExecutor[AnyRef]) extends BaseRichBolt{
-  val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
-  var _collector : OutputCollector = null
-
-  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
-    _collector = collector
-    worker.init
-  }
-
-  override def execute(input : Tuple): Unit = {
-    try {
-      worker.flatMap(input.getValues.asScala, new Collector[AnyRef] {
-        override def collect(t: AnyRef): Unit = {
-          _collector.emit(input, StormWrapperUtils.productAsJavaList(t.asInstanceOf[Product]))
-        }
-      })
-    }catch{
-      case ex: Exception => {
-        LOG.error("fail executing", ex)
-        _collector.fail(input)
-        throw new RuntimeException(ex)
-      }
-    }
-    _collector.ack(input)
-  }
-
-  override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
-    val fields = worker.fields
-    LOG.info("Output fields for worker " + worker + " : " + fields.toList)
-    declarer.declare(new Fields(fields:_*))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
deleted file mode 100644
index 4165db4..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import backtype.storm.topology.base.BaseRichSpout
-import com.typesafe.config.Config
-import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider
-import org.apache.eagle.datastream.core.{ExecutionEnvironment, StormSourceProducer, StreamDAG}
-
-/**
- * @since  12/7/15
- */
-class StormExecutionEnvironment(private val conf:Config) extends ExecutionEnvironment(conf) {
-  override def execute(dag: StreamDAG) : Unit = {
-    StormTopologyCompiler(config.get, dag).buildTopology.execute
-  }
-
-  def fromSpout[T](source: BaseRichSpout): StormSourceProducer[T] = {
-    val ret = StormSourceProducer[T](source)
-    ret.initWith(dag,config.get)
-    ret
-  }
-
-  def fromSpout[T](sourceProvider: StormSpoutProvider):StormSourceProducer[T] = fromSpout(sourceProvider.getSpout(config.get))
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
deleted file mode 100644
index f15e736..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import org.apache.eagle.datastream.Collector
-import org.apache.eagle.datastream.StormStreamExecutor
-import org.apache.eagle.datastream.StormStreamExecutor3
-
-import com.typesafe.config.Config
-
-case class StormExecutorForAlertWrapper(delegate: StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]], streamName: String)
-  extends StormStreamExecutor3[String, String, util.SortedMap[Object, Object]]{
-  override def prepareConfig(config: Config): Unit = {
-    delegate.prepareConfig(config)
-  }
-
-  override def init: Unit = {
-    delegate.init
-  }
-
-  override def flatMap(input: Seq[AnyRef], collector: Collector[Tuple3[String, String, util.SortedMap[Object, Object]]]): Unit = {
-    delegate.flatMap(input, new Collector[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]] {
-      override def collect(r: Tuple2[String, util.SortedMap[AnyRef, AnyRef]]): Unit = {
-        collector.collect(Tuple3(r._1, streamName, r._2))
-      }
-    })
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala
deleted file mode 100644
index e5eea1f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.tuple.Tuple
-import org.apache.eagle.datastream.Collector
-import org.apache.eagle.datastream.core.StreamInfo
-
-case class StormFlatFunctionWrapper(flatMapper:(Any,Collector[Any])=>Unit)(implicit info:StreamInfo) extends AbstractStreamBolt[Any]{
-  /**
-   * Handle keyed stream value
-   */
-  override def onKeyValue(key: Any, value: Any)(implicit input: Tuple): Unit = {
-    flatMapper(value,new Collector[Any] {
-      override def collect(r: Any): Unit = emit(r)(input)
-    })
-  }
-
-  /**
-   * Handle general stream values list
-   *
-   * @param values
-   */
-  override def onValues(values: util.List[AnyRef])(implicit input: Tuple): Unit = {
-    flatMapper(values,new Collector[Any] {
-      override def collect(r: Any): Unit = emit(r)(input)
-    })
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala
deleted file mode 100644
index e5fb86d..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import backtype.storm.tuple.Tuple
-import org.apache.eagle.datastream.{Collector, FlatMapper}
-import org.apache.eagle.datastream.core.StreamInfo
-import scala.collection.JavaConverters._
-
-case class StormFlatMapperWrapper(flatMapper:FlatMapper[Any])(implicit info:StreamInfo) extends AbstractStreamBolt[Any]{
-  /**
-   * Handle keyed stream value
-   */
-  override def onKeyValue(key: Any, value: Any)(implicit input: Tuple): Unit = {
-    flatMapper.flatMap(value.asInstanceOf[Seq[AnyRef]],new Collector[Any] {
-      override def collect(r: Any): Unit = emit(r)(input)
-    })
-  }
-
-  /**
-   * Handle general stream values list
-   *
-   * @param values
-   */
-  override def onValues(values: java.util.List[AnyRef])(implicit input: Tuple): Unit = {
-    flatMapper.flatMap(values.asScala,new Collector[Any] {
-      override def collect(r: Any): Unit = emit(r)(input)
-    })
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
deleted file mode 100644
index 6a3b606..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.topology.base.BaseRichSpout
-import com.typesafe.config.Config
-import org.apache.eagle.datastream.core.{IteratorStreamProducer, IterableStreamProducer, StormSourceProducer, StreamProducer}
-import org.apache.eagle.datastream.utils.NameConstants
-
-object StormSpoutFactory {
-  def createSpout(config: Config, from: StreamProducer[Any]): BaseRichSpout = {
-    implicit val streamInfo = from.getInfo
-    from match {
-      case p@StormSourceProducer(source) =>
-        if(p.outKeyed) {
-          createKeyedProxySpout(p)
-        }else {
-          createProxySpout(p)
-        }
-      case p@IterableStreamProducer(iterable,recycle) =>
-        IterableStreamSpout(iterable,recycle)
-      case p@IteratorStreamProducer(iterator) =>
-        IteratorStreamSpout(iterator)
-      case _ =>
-        throw new IllegalArgumentException(s"Cannot compile unknown $from to a Storm Spout")
-    }
-  }
-
-  /**
-   * @param sourceProducer source producer
-   * @return
-   */
-  def createProxySpout(sourceProducer: StormSourceProducer[Any]): BaseRichSpout = {
-    val numFields = sourceProducer.numFields
-    if (numFields <= 0) {
-      sourceProducer.source
-    } else {
-      var i = 0
-      val ret = new util.ArrayList[String]
-      while (i < numFields) {
-        ret.add(NameConstants.FIELD_PREFIX + i)
-        i += 1
-      }
-      SpoutProxy(sourceProducer.source, ret)
-    }
-  }
-
-  def createKeyedProxySpout(sourceProducer: StormSourceProducer[Any]):BaseRichSpout = {
-    KeyedSpoutProxy(sourceProducer.source)(sourceProducer.getInfo)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
deleted file mode 100644
index f572377..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.topology.{BoltDeclarer, TopologyBuilder}
-import backtype.storm.tuple.Fields
-import com.typesafe.config.Config
-import org.apache.eagle.dataproc.impl.storm.partition.CustomPartitionGrouping
-import org.apache.eagle.datastream.core._
-import org.apache.eagle.datastream.utils.NameConstants
-import org.slf4j.LoggerFactory
-
-import scala.collection.mutable.ListBuffer
-
-case class StormTopologyCompiler(config: Config, graph: StreamProducerGraph) extends AbstractTopologyCompiler{
-  val LOG = LoggerFactory.getLogger(StormTopologyCompiler.getClass)
-  val boltCache = scala.collection.mutable.Map[StreamProducer[Any], StormBoltWrapper]()
-
-  override def buildTopology: AbstractTopologyExecutor ={
-    val builder = new TopologyBuilder()
-    val iter = graph.iterator()
-    val boltDeclarerCache = scala.collection.mutable.Map[String, BoltDeclarer]()
-    val stormTopologyGraph = ListBuffer[String]()
-    while(iter.hasNext){
-      val from = iter.next()
-      val fromName = from.name
-      if(graph.isSource(from)){
-        val spout = StormSpoutFactory.createSpout(config, from)
-        builder.setSpout(fromName, spout, from.parallelism)
-        LOG.info("Spout: " + fromName + " with parallelism " + from.parallelism)
-      } else {
-        LOG.debug("Bolt" + fromName)
-      }
-      val edges = graph.outgoingEdgesOf(from)
-      edges.foreach(sc => {
-        val toName = sc.to.name
-        var boltDeclarer: BoltDeclarer = null
-        val toBolt = createBoltIfAbsent(toName)
-        boltDeclarerCache.get(toName) match {
-          case None => {
-            var finalParallelism = 1
-            graph.getNodeByName(toName) match {
-              case Some(p) => finalParallelism = p.parallelism
-              case None => finalParallelism = 1
-            }
-            boltDeclarer = builder.setBolt(toName, toBolt, finalParallelism)
-            LOG.info("Bolt: " + toName + " with parallelism " + finalParallelism)
-            boltDeclarerCache.put(toName, boltDeclarer)
-          }
-          case Some(bt) => boltDeclarer = bt
-        }
-
-        sc match {
-          case GroupbyFieldsConnector(_, _, groupByFields) =>
-            boltDeclarer.fieldsGrouping(fromName, new Fields(fields(groupByFields)))
-          case GroupbyStrategyConnector(_, _, strategy) =>
-            boltDeclarer.customGrouping(fromName, new CustomPartitionGrouping(strategy));
-          case GroupbyKeyConnector(_, _, keySelector) =>
-            boltDeclarer.fieldsGrouping(fromName, new Fields(NameConstants.FIELD_KEY));
-          case ShuffleConnector(_, _) => {
-            boltDeclarer.shuffleGrouping(fromName)
-          }
-          case _ => throw new UnsupportedOperationException(s"Supported stream connector $sc")
-        }
-
-        if (graph.isSource(from)) {
-          stormTopologyGraph += s"Spout{$fromName}{${from.parallelism}) ~> Bolt{$toName}{${from.parallelism}} in $sc"
-        } else {
-          stormTopologyGraph += s"Bolt{$fromName}{${from.parallelism}) ~> Bolt{$toName}{${from.parallelism}} in $sc"
-        }
-      })
-    }
-    LOG.info(s"Storm topology DAG\n{\n \t${stormTopologyGraph.mkString("\n\t")} \n}")
-    new StormTopologyExecutorImpl(builder.createTopology, config)
-  }
-
-  def fields(fields : Seq[Int]): java.util.List[String] ={
-    val ret = new util.ArrayList[String]
-    fields.map(n => ret.add(NameConstants.FIELD_PREFIX + n))
-    ret
-  }
-
-  def createBoltIfAbsent(name : String) : BaseRichBolt = {
-    val producer = graph.getNodeByName(name)
-    producer match{
-      case Some(p) => createBoltIfAbsent(graph, p)
-      case None => throw new IllegalArgumentException("please check bolt name " + name)
-    }
-  }
-
-  def createBoltIfAbsent(graph: StreamProducerGraph, producer : StreamProducer[Any]): BaseRichBolt ={
-    boltCache.get(producer) match{
-      case Some(bolt) => bolt
-      case None => {
-        StormBoltFactory.getBoltWrapper(graph, producer, config)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
deleted file mode 100644
index 54d09e6..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.storm
-
-import java.io.{File, FileInputStream}
-
-import _root_.storm.trident.spout.RichSpoutBatchExecutor
-import backtype.storm.generated.StormTopology
-import backtype.storm.utils.Utils
-import backtype.storm.{Config, LocalCluster, StormSubmitter}
-import org.apache.eagle.common.config.EagleConfigConstants
-import org.apache.eagle.datastream.core.AbstractTopologyExecutor
-import org.apache.thrift7.transport.TTransportException
-import org.slf4j.LoggerFactory
-import org.yaml.snakeyaml.Yaml
-
-case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesafe.config.Config) extends AbstractTopologyExecutor {
-  val LOG = LoggerFactory.getLogger(classOf[StormTopologyExecutorImpl])
-  @throws(classOf[Exception])
-  def execute {
-    val localMode: Boolean = config.getString("envContextConfig.mode").equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)
-    val conf: Config = new Config
-    conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024))
-    conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8))
-    conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32))
-    conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384))
-    conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384))
-    conf.put(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000))
-
-    if(config.hasPath("envContextConfig.stormConfigFile")) {
-      val file = new File(config.getString("envContextConfig.stormConfigFile"))
-      if(file.exists()) {
-        val inputFileStream = new FileInputStream(file)
-        val yaml = new Yaml()
-        try {
-          val stormConf = yaml.load(inputFileStream).asInstanceOf[java.util.LinkedHashMap[String, Object]]
-          if(stormConf != null) conf.putAll(stormConf)
-        } catch {
-          case t: Throwable => {
-            LOG.error(s"Got example $t",t)
-            throw t
-          }
-        } finally {
-          if(inputFileStream != null) inputFileStream.close()
-        }
-      }
-    }
-
-    val topologyName = config.getString("envContextConfig.topologyName")
-    if (!localMode) {
-      if(config.hasPath("envContextConfig.nimbusHost")) {
-        LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_HOST} as ${config.getString("envContextConfig.nimbusHost")}")
-        conf.put(backtype.storm.Config.NIMBUS_HOST, config.getString("envContextConfig.nimbusHost"))
-      }
-
-      if(config.hasPath("envContextConfig.nimbusThriftPort")) {
-        LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_THRIFT_PORT} as ${config.getString("envContextConfig.nimbusThriftPort")}")
-        conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, config.getNumber("envContextConfig.nimbusThriftPort"))
-      }
-
-      if(config.hasPath("envContextConfig.jarFile")){
-        LOG.info(s"Setting storm.jar as ${config.getString("envContextConfig.jarFile")}")
-        System.setProperty("storm.jar",config.getString("envContextConfig.jarFile"))
-      }
-
-      LOG.info("Submitting as cluster mode")
-      try {
-        StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology)
-      } catch {
-        case e:TTransportException =>
-          LOG.error(s"Got thrift exception, type: ${e.getType}")
-          throw e
-      }
-      finally {
-        System.clearProperty("storm.jar")
-      }
-    } else {
-      LOG.info("Submitting as local mode")
-      val cluster: LocalCluster = new LocalCluster
-      cluster.submitTopology(topologyName, conf, topology)
-      try {
-        while(true) {
-          Utils.sleep(Integer.MAX_VALUE)
-        }
-      } catch {
-        case ex: Throwable =>
-          LOG.warn("Sleep is interrupted with " + ex.toString)
-          cluster.killTopology(topologyName)
-          cluster.shutdown
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormWrapperUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormWrapperUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormWrapperUtils.scala
deleted file mode 100644
index 95c6ac1..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormWrapperUtils.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package org.apache.eagle.datastream.storm
-
-import java.util
-
-object StormWrapperUtils {
-  def productAsJavaList(product:Product):util.List[AnyRef]={
-    val list = new util.LinkedList[AnyRef]()
-    product.productIterator.foreach((p:Any) => list.add(p.asInstanceOf[AnyRef]))
-    list
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
deleted file mode 100644
index 752c317..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream.utils
-
-import java.util
-
-import org.apache.eagle.alert.dedup.AlertEntityDeduplicationExecutor
-import org.apache.eagle.alert.executor.AlertExecutor
-import org.apache.eagle.alert.notification.AlertNotificationExecutor
-import org.apache.eagle.datastream.core.{FlatMapProducer, StreamConnector, StreamProducer}
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.collection.mutable.ListBuffer
-
-/**
- * Create alert executors and provide callback for programmer to link alert executor to immediate parent executors
- *
- * <br/><br/>
- * Explanations for programId, alertExecutorId and policy<br/><br/>
- * - programId - distributed or single-process program for example one storm topology<br/>
- * - alertExecutorId - one process/thread which executes multiple policies<br/>
- * - policy - some rules to be evaluated<br/>
- *
- * <br/>
- *
- * Normally the mapping is like following:
- * <pre>
- * programId (1:N) alertExecutorId
- * alertExecutorId (1:N) policy
- * </pre>
- */
-
-object AlertExecutorConsumerUtils {
-  private val LOG: Logger = LoggerFactory.getLogger(AlertExecutorConsumerUtils.getClass)
-
-  def setupAlertConsumers(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], alertStreamProducers: List[StreamProducer[Any]]): Unit = {
-    val alertExecutorIdList: java.util.List[String] = new util.ArrayList[String]()
-    alertStreamProducers.map(x =>
-      alertExecutorIdList.add(x.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getExecutorId));
-    val alertDefDao = alertStreamProducers.head.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getPolicyDefinitionDao
-    val entityDedupExecutor: AlertEntityDeduplicationExecutor = new AlertEntityDeduplicationExecutor(alertExecutorIdList, alertDefDao)
-    //val emailDedupExecutor: AlertEmailDeduplicationExecutor = new AlertEmailDeduplicationExecutor(alertExecutorIdList, alertDefDao)
-    val notificationExecutor: AlertNotificationExecutor = new AlertNotificationExecutor(alertExecutorIdList, alertDefDao)
-    //val persistExecutor: AlertPersistExecutor = new AlertPersistExecutor
-
-    val entityDedupStreamProducer = FlatMapProducer(entityDedupExecutor)
-    //val persistStreamProducer = FlatMapProducer(persistExecutor)
-    //val emailDedupStreamProducer = FlatMapProducer(emailDedupExecutor)
-    val notificationStreamProducer = FlatMapProducer(notificationExecutor)
-    //toBeAddedEdges += StreamConnector(entityDedupStreamProducer, persistStreamProducer)
-    toBeAddedEdges += StreamConnector(entityDedupStreamProducer, notificationStreamProducer)
-
-    alertStreamProducers.foreach(sp => {
-      toBeAddedEdges += StreamConnector(sp, entityDedupStreamProducer)
-    })
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/GraphPrinter.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/GraphPrinter.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/GraphPrinter.scala
deleted file mode 100644
index 33a805c..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/GraphPrinter.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream.utils
-
-import org.apache.eagle.datastream.core.{StreamConnector, StreamProducer}
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
-
-/**
- * @see scala.tools.nsc.DotDiagramGenerator
- */
-object GraphPrinter {
-  private val LOG = LoggerFactory.getLogger(GraphPrinter.getClass)
-  def print(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any, Any]], message:String = "Print stream DAG graph"): Unit = {
-    val graphStr = ListBuffer[String]()
-    val iter = dag.iterator()
-    while (iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        graphStr += s"${edge.from.name}{${edge.from.parallelism}} ~> ${edge.to.name}{${edge.to.parallelism}} in ${edge.toString}"
-      })
-    }
-    LOG.info(message+"\n{ \n\t" + graphStr.mkString("\n\t") + "\n}")
-  }
-  
-  def printDotDigraph(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any, Any]], title:String = "dag", message:String = "Print DOT digraph (copy and visualize with http://www.webgraphviz.com/)"): String = {
-    val graphStr = ListBuffer[String]()
-    val iter = dag.iterator()
-    while (iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        graphStr += s""""${edge.from.name} x ${edge.from.parallelismNum}" -> "${edge.to.name} x ${edge.from.parallelismNum}" [label = "$edge"];"""
-      })
-    }
-    val dotDigraph = s"""digraph $title { \n\t${graphStr.mkString("\n\t")} \n}"""
-    LOG.info(s"""$message\n\n$dotDigraph\n""")
-    dotDigraph
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala
deleted file mode 100644
index 331cf7c..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.utils
-
-import org.apache.eagle.datastream.core.StreamInfo
-
-case class NodeNameSelector(producer : StreamInfo) {
-  def getName : String = {
-    producer.name match {
-      case null => producer.toString+NameConstants.FIELD_SEPARATOR+producer.id
-      case _ => producer.name
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala
deleted file mode 100644
index 4ac0cdc..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream.utils
-
-import java.util
-
-import org.apache.eagle.datastream.core.StreamProducer
-
-import scala.collection.JavaConverters._
-
-object UnionUtils {
-  def join[T1,T2](producers : StreamProducer[T1]*) : StreamProducer[T2] = {
-    producers.head.streamUnion(producers.drop(1))
-  }
-
-  def join[T1,T2](producers : java.util.List[StreamProducer[T1]]) : StreamProducer[T2] = {
-    val newList = new util.ArrayList(producers)
-    val head = newList.get(0)
-    newList.remove(0)
-    head.streamUnion(newList.asScala);
-  }
-
-  def join[T1,T2](producers : List[StreamProducer[T1]]) : StreamProducer[T2] = {
-    val head = producers.head
-    head.streamUnion(producers.tail);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/dataproc/util/TestConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/dataproc/util/TestConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/dataproc/util/TestConfigOptionParser.java
deleted file mode 100644
index d8efa97..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/dataproc/util/TestConfigOptionParser.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.util;
-
-import org.apache.commons.cli.ParseException;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-/**
- * @since 8/23/15
- */
-public class TestConfigOptionParser {
-    private final static Logger LOG = LoggerFactory.getLogger(TestConfigOptionParser.class);
-
-    @Test
-    public void testValidCommandArguments() throws ParseException {
-        String[] arguments = new String[]{
-                "-D","key1=value1",
-                "-D","key2=value2",
-                "-D","key3=value3=something",
-                "-D","key4=",
-                "-D","key5=\"--param having whitespace\""
-        };
-
-        Map<String,String> config = new ConfigOptionParser().parseConfig(arguments);
-
-        Assert.assertTrue(config.containsKey("key1"));
-        Assert.assertTrue(config.containsKey("key2"));
-        Assert.assertTrue(config.containsKey("key3"));
-        Assert.assertTrue(config.containsKey("key4"));
-        Assert.assertEquals("value1", config.get("key1"));
-        Assert.assertEquals("value2", config.get("key2"));
-        Assert.assertEquals("value3=something",config.get("key3"));
-        Assert.assertEquals("",config.get("key4"));
-        Assert.assertEquals("\"--param having whitespace",config.get("key5"));
-    }
-
-    @Test
-    public void testValidCommandArgumentsAsSystem() throws ParseException {
-        String[] arguments = new String[]{
-                "-D","key1=value1",
-                "-D","key2=value2",
-                "-D","key3=value3=something",
-                "-D","key4=",
-        };
-
-        new ConfigOptionParser().load(arguments);
-
-        Assert.assertTrue(System.getProperties().containsKey("key1"));
-        Assert.assertTrue(System.getProperties().containsKey("key2"));
-        Assert.assertTrue(System.getProperties().containsKey("key3"));
-        Assert.assertTrue(System.getProperties().containsKey("key4"));
-
-        Assert.assertEquals("value1", System.getProperty("key1"));
-        Assert.assertEquals("value2", System.getProperty("key2"));
-        Assert.assertEquals("value3=something",System.getProperty("key3"));
-        Assert.assertEquals("",System.getProperty("key4"));
-    }
-
-    @Test
-    public void testInvalidCommandArgument1()  {
-        String[] arguments = new String[]{
-                "-D","key1"
-        };
-
-        try {
-            new ConfigOptionParser().parseConfig(arguments);
-            Assert.fail("Should throw ParseException");
-        } catch (ParseException e) {
-            LOG.info("Expected exception: " +e.getMessage());
-        }
-    }
-
-    @Test
-    public void testInvalidCommandArgument2()  {
-        String[] arguments = new String[]{
-                "-D","=value"
-        };
-
-        try {
-            new ConfigOptionParser().parseConfig(arguments);
-            Assert.fail("Should throw ParseException");
-        } catch (ParseException e) {
-            LOG.info("Expected exception: " + e.getMessage());
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java
deleted file mode 100644
index 511db38..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream;
-
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple1;
-
-public class JavaEchoExecutor extends JavaStormStreamExecutor1<String>{
-    private static Logger LOG = LoggerFactory.getLogger(JavaEchoExecutor.class);
-    private Config config;
-    @Override
-    public void prepareConfig(Config config){
-        this.config = config;
-    }
-
-    /**
-     * give business code a chance to do initialization for example daemon thread
-     * this method is executed in remote machine
-     */
-    @Override
-    public void init(){
-    }
-
-    @Override
-    public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> collector){
-        collector.collect(new Tuple1(input.get(0)));
-        LOG.info("echo " + input);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java
deleted file mode 100644
index 9e96e2c..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * @since 12/5/15
- */
-public class TestExecutionEnvironmentJava {
-
-    @Test
-    public void testGetEnvInJava() {
-        StormExecutionEnvironment env0 = ExecutionEnvironments.get(StormExecutionEnvironment.class);
-        Assert.assertNotNull(env0);
-
-        StormExecutionEnvironment env1 = ExecutionEnvironments.get(new String[]{}, StormExecutionEnvironment.class);
-        Assert.assertNotNull(env1);
-        Config config = ConfigFactory.load();
-        StormExecutionEnvironment env2 = ExecutionEnvironments.get(config, StormExecutionEnvironment.class);
-        Assert.assertNotNull(env2);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
deleted file mode 100644
index 979b09d..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.junit.Test;
-import scala.runtime.AbstractFunction1;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-public class TestJavaMain {
-    public static class SerializableFunction1<T1,R> extends AbstractFunction1<T1, R> implements Serializable {
-        @Override
-        public Object apply(Object v1) {
-            return null;
-        }
-    }
-
-    //@Test
-//    public void testGeneral(){
-//        Config config = ConfigFactory.load();
-//        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
-//        env.fromSpout(new TestKeyValueSpout()).withOutputFields(2).groupBy(Arrays.asList(0)).flatMap(new GroupedEchoExecutor()).parallelism(2);
-//        env.execute();
-//    }
-
-//    //@Test
-//    public void testMap(){
-//        Config config = ConfigFactory.load();
-//        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
-//        SerializableFunction1 f1 = new SerializableFunction1<Object, Object>();
-//        env.fromSpout(new TestKeyValueSpout()).withOutputFields(2).
-//                map1(f1);
-//        env.execute();
-//    }
-
-    @Test
-    public void test() {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java
deleted file mode 100644
index f5c69a6..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream;
-
-import org.junit.Assert;
-import org.apache.eagle.datastream.utils.Reflections;
-import org.junit.Test;
-import scala.reflect.api.TypeTags;
-
-/**
- * @since 12/8/15
- */
-public class TestJavaReflectionUtils {
-    @Test
-    public void testJavaFlatMapper(){
-        Class<String> clazz = Reflections.javaTypeClass(new JavaEchoExecutor(), 0);
-        Assert.assertEquals(String.class,clazz);
-        TypeTags.TypeTag typeTag = Reflections.typeTag(clazz);
-        Assert.assertNotNull(typeTag);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
deleted file mode 100644
index 2a5043c..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream;
-
-import org.apache.eagle.datastream.storm.KafkaStreamMonitorApp;
-
-/**
- * @since 11/7/15
- */
-public class TestKafkaStreamMonitor {
-    public static void main(String[] args){
-        new KafkaStreamMonitorApp().main(args);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java
deleted file mode 100644
index bef530a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.junit.Assert;
-import org.apache.eagle.dataproc.impl.aggregate.SimpleAggregateExecutor;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.junit.Test;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Created on 1/20/16.
- */
-public class TestSimpleAggregateExecutor {
-
-    @Test
-    public void test() throws Exception {
-        SimpleAggregateExecutor sae = new SimpleAggregateExecutor(new String[]{"s1"},
-                "define stream s1(eagleAlertContext object, timestamp long, metric string);" +
-                        " @info(name='query')" +
-                        " from s1 select * insert into tmp;"
-                ,
-                "siddhiCEPEngine",
-                0,
-                1);
-
-        Config config = ConfigFactory.empty();
-        sae.prepareConfig(config);
-        sae.init();
-
-        List<Object> tuple = new ArrayList<>(3);
-        tuple.add(0, "groupbykey");
-        tuple.add(1, "s1");
-        SortedMap value = new TreeMap();
-        value.put("timestamp", System.currentTimeMillis());
-        value.put("metric", "name-of-the-metric");
-        tuple.add(2, value);
-
-        final AtomicInteger count = new AtomicInteger();
-        sae.flatMap(tuple, new Collector<Tuple2<String, AggregateEntity>>(){
-            @Override
-            public void collect(Tuple2<String, AggregateEntity> stringAggregateEntityTuple2) {
-                System.out.print(stringAggregateEntityTuple2._1());
-                count.incrementAndGet();
-            }
-        });
-
-        Thread.sleep(3000);
-        Assert.assertEquals(1, count.get());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java
deleted file mode 100644
index 1f22a97..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.datastream.core.*;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.apache.eagle.partition.PartitionStrategy;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import scala.collection.Seq;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * @since Dec 18, 2015
- *
- */
-public class TestStreamAggregate {
-
-	private Config config;
-
-	@SuppressWarnings("serial")
-	private final class SimpleSpout extends BaseRichSpout {
-		@SuppressWarnings("rawtypes")
-		@Override
-		public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-		}
-		@Override
-		public void nextTuple() {
-		}
-		@Override
-		public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		}
-	}
-
-	public static class TestEnvironment extends StormExecutionEnvironment {
-		private static final long serialVersionUID = 1L;
-		public TestEnvironment(Config conf) {
-			super(conf);
-		}
-		@Override
-		public void execute(StreamDAG dag) {
-			System.out.println("DAT completed!");
-		}
-	}
-	
-	public static class DummyStrategy implements PartitionStrategy {
-		private static final long serialVersionUID = 1L;
-		@Override
-		public int balance(String key, int buckNum) {
-			return 0;
-		}
-	};
-	
-	public static class DummyExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity>  {
-		@Override
-		public void prepareConfig(Config config) {
-		}
-		@Override
-		public void init() {
-		}
-		@Override
-		public void flatMap(List input, Collector collector) {
-		}
-	}
-	
-	@Before
-	public void setUp() {
-		System.setProperty("config.resource", "/application.conf");
-		ConfigFactory.invalidateCaches();
-		config = ConfigFactory.load();
-	}
-
-	@SuppressWarnings({ "unchecked", "rawtypes", "serial" })
-	@Test
-	public void testAggregate1() {
-		StormExecutionEnvironment exe = new TestEnvironment(config);
-		
-		BaseRichSpout spout = new SimpleSpout();
-		StormSourceProducer ssp = exe.fromSpout(spout);
-		
-		ssp.flatMap(new FlatMapper<String>() {
-			@Override
-			public void flatMap(Seq<Object> input, Collector<String> collector) {
-				// do nothing
-			}
-		}).aggregate(Arrays.asList("c3EsLogEventStream"), "qid", new DummyStrategy());
-		
-		try {
-			exe.execute();
-			Assert.fail("customzied flat mapper(non java storm executor) before analyze is not supported!");
-		} catch (Exception e ){
-		}
-	}
-	
-	@SuppressWarnings({ "unchecked", "rawtypes", "serial" })
-	@Test
-	public void testAggregate() {
-		StormExecutionEnvironment exe = new TestEnvironment(config);
-		StormSourceProducer ssp = exe.fromSpout(new SimpleSpout());
-		DummyExecutor dummy = new DummyExecutor();
-		ssp.flatMap(dummy).aggregate(Arrays.asList("c3EsLogEventStream"), "analyzeStreamExecutor", new DummyStrategy());
-
-		try {
-			exe.execute();
-		} catch (Exception e) {
-			Assert.fail("customized flat mapper before");
-		}
-		// Assertion
-		DirectedAcyclicGraph<StreamProducer<Object>, StreamConnector<Object, Object>> dag = exe.dag();
-		Assert.assertEquals("three vertex", 3, dag.vertexSet().size());
-		boolean hasWrapped = false;
-		for (StreamProducer<Object> obj : dag.vertexSet()) {
-			if (obj instanceof FlatMapProducer) {
-				if (((FlatMapProducer) obj).mapper() instanceof JavaStormExecutorForAlertWrapper) {
-					hasWrapped = true;
-					Assert.assertEquals("dummy executor should be wrapped in the alert wrapper func", dummy,
-							((JavaStormExecutorForAlertWrapper) ((FlatMapProducer) obj).mapper() ).getDelegate());
-
-				}
-			}
-		}
-		Assert.assertTrue(hasWrapped);
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
deleted file mode 100644
index 87b1947..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
+++ /dev/null
@@ -1,79 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-	"envContextConfig" : {
-		"env" : "storm",
-		"mode" : "local",
-		"topologyName" : "kafka-monitor-topology",
-		"parallelismConfig" : {
-			"kafkaMsgConsumer" : 1
-		}
-	},
-	"dataSourceConfig": {
-		"topic" : "nn_jmx_metric_sandbox",
-		"zkConnection" : "localhost:2181",
-		"zkConnectionTimeoutMS" : 15000,
-		"consumerGroupId" : "EagleConsumer",
-		"fetchSize" : 1048586,
-		"transactionZKServers" : "localhost",
-		"transactionZKPort" : 2181,
-		"transactionZKRoot" : "/consumers",
-		"transactionStateUpdateMS" : 2000
-	},
-	"alertExecutorConfigs" : {
-		"eventStreamExecutor" : {
-			"parallelism" : 1,
-			"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-			"needValidation" : "true"
-		}
-	},
-	"persistExecutorConfigs" {
-		"persistExecutor1" : {
-			"kafka": {
-				"bootstrap_servers" : "localhost",
-				"topics" : {
-					"defaultOutput" : "downSampleOutput"
-				}
-			}
-		}
-	},
-	"aggregateExecutorConfigs" : {
-		"aggregateStreamExecutor" : {
-			"parallelism" : 1,
-			"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-			"needValidation" : "true"
-		}
-	},
-	"eagleProps" : {
-		"site" : "sandbox",
-		"application": "eventSource",
-		"dataJoinPollIntervalSec" : 30,
-		"mailHost" : "mail.host.com",
-		"mailSmtpPort":"25",
-		"mailDebug" : "true",
-		"eagleService": {
-			"host": "localhost",
-			"port": 38080,
-			"username": "admin",
-			"password": "secret"
-		}
-	},
-	"dynamicConfigSource" : {
-		"enabled" : true,
-		"initDelayMillis" : 0,
-		"delayMillis" : 30000
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/event-metadata-init.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/event-metadata-init.sh b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/event-metadata-init.sh
deleted file mode 100644
index 3e57b6c..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/event-metadata-init.sh
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-export EAGLE_SERVICE_USER="admin"
-export EAGLE_SERVICE_PASSWD="secret"
-export EAGLE_SERVICE_HOST="localhost"
-export EAGLE_SERVICE_PORT=38080
-
-# AlertDataSource: data sources bound to sites
-echo "Importing AlertDataSourceService for stream... "
-
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", "dataSource":"eventSource"}, "enabled": "true", "config" : ""}]'
-
-## AlertStreamService: alert streams generated from data source
-echo ""
-echo "Importing AlertStreamService for stream... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" -d '[{"prefix":"alertStream","tags":{"dataSource":"eventSource","streamName":"eventStream"},"desc":"alert event stream from hdfs audit log"}]'
-
-## AlertExecutorService: what alert streams are consumed by alert executor
-echo ""
-echo "Importing AlertExecutorService for stream ... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"eventSource","alertExecutorId":"eventStreamExecutor","streamName":"eventStream"},"desc":"alert executor for event stream"}]'
-
-## AlertStreamSchemaService: schema for event from alert stream
-echo ""
-echo "Importing AlertStreamSchemaService for stream ... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"eventSource","streamName":"eventStream","attrName":"timestamp"},"attrDescription":"event timestamp","attrType":"long","category":"","attrValueResolver":""},{"prefix":"alertStreamSchema","tags":{"dataSource":"eventSource","streamName":"eventStream","attrName":"name"},"attrDescription":"event name","attrType":"string","category":"","attrValueResolver":""},{"prefix":"alertStreamSchema","tags":{"dataSource":"eventSource","streamName":"eventStream","attrName":"value"},"attrDescription":"event value","attrType":"integer","category":"","attrValueResolver":""}]'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/log4j.properties b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/log4j.properties
deleted file mode 100644
index 3499c46..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,35 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, stdout
-
- eagle.log.dir=./logs
- eagle.log.file=eagle.log
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
-
-# Daily Rolling File Appender
- log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
- log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
- log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-## 30-day backup
-# log4j.appender.DRFA.MaxBackupIndex=30
- log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala
deleted file mode 100644
index 9d378c9..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package org.apache.eagle.datastream
-
-import org.apache.eagle.datastream.storm.StormWrapperUtils
-import org.scalatest.{FlatSpec, Matchers}
-
-class StormWrapperUtilsSpec extends FlatSpec with Matchers{
-  import StormWrapperUtils._
-  "StormWrapperUtils" should "convert Tuple{1,2,3,..} to java.util.List" in {
-    val list1 = productAsJavaList(new Tuple1("a"))
-    list1.size() should be(1)
-    list1.get(0) should be("a")
-
-    val list2 = productAsJavaList(new Tuple2("a","b"))
-    list2.size() should be(2)
-    list2.get(0) should be("a")
-    list2.get(1) should be("b")
-
-    val list3 = productAsJavaList(new Tuple3("a","b","c"))
-    list3.size() should be(3)
-    list3.get(0) should be("a")
-    list3.get(1) should be("b")
-    list3.get(2) should be("c")
-  }
-}
\ No newline at end of file


Mime
View raw message