eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject incubator-eagle git commit: large PRD missed some files Author: @yonzhang2012 <yonzhang2012@gmail.com> Closes: #342
Date Sun, 14 Aug 2016 06:41:25 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 0f11a591a -> c66b52554


large PRD missed some files
Author: @yonzhang2012 <yonzhang2012@gmail.com>
Closes: #342


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c66b5255
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c66b5255
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c66b5255

Branch: refs/heads/develop
Commit: c66b52554e7de8bd8c1be1928aee90303ada8fbd
Parents: 0f11a59
Author: yonzhang <yonzhang2012@gmail.com>
Authored: Sat Aug 13 23:45:19 2016 -0700
Committer: yonzhang <yonzhang2012@gmail.com>
Committed: Sat Aug 13 23:45:19 2016 -0700

----------------------------------------------------------------------
 .../datastream/ExecutionEnvironments.scala      | 109 ------
 .../datastream/core/ExecutionEnvironment.scala  |  45 ---
 .../eagle/datastream/core/StreamBuilder.scala   |  87 -----
 .../eagle/datastream/core/StreamProducer.scala  | 332 -------------------
 .../datastream/storm/StormBoltFactory.scala     |  61 ----
 5 files changed, 634 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c66b5255/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
deleted file mode 100644
index c3ddb00..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.eagle.dataproc.util.ConfigOptionParser
-import org.apache.eagle.datastream.core._
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment
-
-import scala.reflect.runtime.universe._
-
-/**
- * Execution environment factory
- *
- * The factory is mainly used for create or manage execution environment,
- * and also handles the shared works like configuration, arguments for execution environment
- *
- * Notice: this factory class should not know any implementation like storm or spark
- *
- * @since 0.3.0
- */
-object ExecutionEnvironments{
-  type storm = StormExecutionEnvironment
-
-
-  /**
-   * @param typeTag
-   * @tparam T
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](implicit typeTag: TypeTag[T]): T ={
-    getWithConfig[T](ConfigFactory.load())
-  }
-
-  /**
-   *
-   * @param config
-   * @param typeTag
-   * @tparam T
-   * @return
-   */
-  def getWithConfig[T <: ExecutionEnvironment](config:Config)(implicit typeTag: TypeTag[T]):
T ={
-    typeTag.mirror.runtimeClass(typeOf[T]).getConstructor(classOf[Config]).newInstance(config).asInstanceOf[T]
-  }
-
-  /**
-   *
-   * @param args
-   * @param typeTag
-   * @tparam T
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](args:Array[String])(implicit typeTag: TypeTag[T]):
T ={
-    getWithConfig[T](new ConfigOptionParser().load(args))
-  }
-
-  /**
-   * Support java style for default config
-   *
-   * @param clazz execution environment class
-   * @tparam T execution environment type
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](clazz:Class[T]):T ={
-    get[T](ConfigFactory.load(),clazz)
-  }
-
-  def get[T<:ExecutionEnvironment](clazz:Class[T], config:Config):T ={
-    get[T](config,clazz)
-  }
-
-  /**
-   * Support java style
-    *
-    * @param config command config
-   * @param clazz execution environment class
-   * @tparam T execution environment type
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](config:Config,clazz:Class[T]):T ={
-    clazz.getConstructor(classOf[Config]).newInstance(config)
-  }
-
-  /**
-   * Support java style
-   *
-   * @param args command arguments in string array
-   * @param clazz execution environment class
-   * @tparam T execution environment type
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](args:Array[String],clazz:Class[T]):T ={
-    clazz.getConstructor(classOf[Config]).newInstance(new ConfigOptionParser().load(args))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c66b5255/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
deleted file mode 100644
index 444559a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.Config
-
-trait StreamContextAdapter{
-  def submit(context:StreamContext):Unit = {
-    execute(null)
-  }
-  def execute(dag: StreamDAG)
-}
-
-/**
- * TODO: Decouple execution environment with stream context
- *
- * @since 0.3.0
- */
-abstract class ExecutionEnvironment(private val conf:Config)
-  extends StreamContext(conf) with StreamContextAdapter     // Continue to support old API
-  with StreamSourceBuilder
-{
-  /**
-   * Start to execute
-   */
-  def execute():Unit = {
-    submit(this)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c66b5255/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
deleted file mode 100644
index fcd89c0..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.eagle.dataproc.util.ConfigOptionParser
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-import scala.reflect
-import scala.reflect.runtime.universe._
-
-trait StreamContextBuilder extends StreamSourceBuilder {
-  def config:Configuration
-  /**
-   * Business logic DAG
-    *
-    * @return
-   */
-  def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]
-  /**
-   * Support Java Style Config
-   *
-   * @return
-   */
-  def getConfig:Config = config.get
-  def build:StreamDAG
-  def submit[E<:ExecutionEnvironment](implicit typeTag:TypeTag[E]):Unit
-  def submit(env:ExecutionEnvironment):Unit
-  def submit(clazz:Class[ExecutionEnvironment]):Unit
-}
-
-class StreamContext(private val conf:Config) extends StreamContextBuilder{
-  implicit private val _dag = new DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]](classOf[StreamConnector[Any,Any]])
-  private val _config:Configuration = Configuration(conf)
-  override def dag = _dag
-  override def config = _config
-
-  def build:StreamDAG = {
-    null
-  }
-
-
-  override def submit(env: ExecutionEnvironment): Unit = {
-    env.submit(this)
-  }
-
-  def submit[E<:ExecutionEnvironment](implicit typeTag:TypeTag[E]):Unit = {
-
-  }
-
-
-  override def submit(clazz: Class[ExecutionEnvironment]): Unit = {
-
-  }
-}
-
-object StreamContext {
-  /**
-   * @return
-   */
-  def apply():StreamContext = {
-    new StreamContext(ConfigFactory.load())
-  }
-
-  /**
-   *
-   * @param args
-   * @return
-   */
-  def apply(args:Array[String]):StreamContext ={
-    new StreamContext(new ConfigOptionParser().load(args))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c66b5255/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
deleted file mode 100644
index 33cc2dd..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.core
-
-import java.util
-import java.util.concurrent.atomic.AtomicInteger
-
-import backtype.storm.topology.base.BaseRichSpout
-import com.typesafe.config.Config
-import org.apache.eagle.alert.entity.AlertAPIEntity
-import org.apache.eagle.datastream.{FlatMapperWrapper, Collector, FlatMapper}
-import org.apache.eagle.partition.PartitionStrategy
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConversions.{asScalaBuffer, seqAsJavaList}
-import scala.collection.JavaConverters.asScalaBufferConverter
-/**
- * StreamProducer = StreamInfo + StreamProtocol
- *
- * StreamProducer is processing logic element, used the base class for all other concrete
StreamProducer
- * Stream Producer can be treated as logical processing element but physical
- * It defines high level type-safe API for user to organize data stream flow
- *
- * StreamProducer is independent of execution environment
- *
- * @tparam T processed elements type
- */
-abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[T] {
-  private var graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]
= null
-  private val LOG = LoggerFactory.getLogger(classOf[StreamProducer[T]])
-
-  /**
-   * Should not modify graph when setting it
-   */
-  def initWith(graph:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]],config:Config,
hook:Boolean = true):StreamProducer[T]={
-    this.config = config
-    this.graph = graph
-    if(hook && ! this.graph.containsVertex(this)) {
-      this.graph.addVertex(this)
-    }
-    LOG.debug(this.graph.toString)
-    this
-  }
-  
-  /**
-   * Get stream pure metadata info
-   * @return
-   */
-  override def getInfo:StreamInfo = this.asInstanceOf[StreamInfo]
-
-  override def stream(streamId:String):StreamProducer[T] = {
-    this.streamId = streamId
-    this
-  }
-
-  override def filter(fn : T => Boolean): StreamProducer[T] ={
-    val ret = FilterProducer[T](fn)
-    connect(this, ret)
-    ret
-  }
-
-  override def flatMap[R](flatMapper:FlatMapper[R]): StreamProducer[R] = {
-    val ret = FlatMapProducer[T,R](flatMapper)
-    connect(this, ret)
-    ret
-  }
-  override def flatMap[R](func:(Any,Collector[R])=>Unit): StreamProducer[R] = {
-    val ret = FlatMapProducer[T,R](FlatMapperWrapper[R](func))
-    connect(this, ret)
-    ret
-  }
-
-
-  override def foreach(fn : T => Unit) : Unit = {
-    val ret = ForeachProducer[T](fn)
-    connect(this, ret)
-  }
-
-  override def map[R](fn : T => R) : StreamProducer[R] = {
-    val ret = MapperProducer[T,R](0,fn)
-    connect(this, ret)
-    ret
-  }
-
-  override def map1[R](fn : T => R): StreamProducer[R] = {
-    val ret = MapperProducer[T,R](1, fn)
-    connect(this, ret)
-    ret
-  }
-
-  override def map2[R](fn : T => R): StreamProducer[R] = {
-    val ret = MapperProducer[T,R](2, fn)
-    connect(this, ret)
-    ret
-  }
-
-  override def map3[R](fn : T => R): StreamProducer[R] = {
-    val ret = MapperProducer(3, fn)
-    connect(this, ret)
-    ret
-  }
-
-  override def map4[R](fn : T => R): StreamProducer[R] = {
-    val ret = MapperProducer(4, fn)
-    connect(this, ret)
-    ret
-  }
-
-  /**
-   * starting from 0, groupby operator would be upon edge of the graph
-   */
-  override def groupBy(fields : Int*) : StreamProducer[T] = {
-    // validate each field index is greater or equal to 0
-    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should
be always >= 0"))
-    val ret = GroupByFieldProducer[T](fields)
-    connect(this, ret)
-    ret
-  }
-
-  def groupByFieldIndex(fields : Seq[Int]) : StreamProducer[T] = {
-    // validate each field index is greater or equal to 0
-    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should
be always >= 0"))
-    val ret = GroupByFieldProducer[T](fields)
-    connect(this, ret)
-    ret
-  }
-
-  //groupBy java version, starting from 1
-  override def groupBy(fields : java.util.List[Integer]) : StreamProducer[T] = {
-    // validate each field index is greater or equal to 0
-    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should
be always >= 0"))
-    val ret = GroupByFieldProducer[T](fields.asScala.toSeq.asInstanceOf[Seq[Int]])
-    connect(this, ret)
-    ret
-  }
-
-  override def groupByKey(keySelector: T=> Any):StreamProducer[T] = {
-    val ret = GroupByKeyProducer(keySelector)
-    connect(this,ret)
-    ret
-  }
-
-  override def streamUnion[T2,T3](others : Seq[StreamProducer[T2]]) : StreamProducer[T3]
= {
-    val ret = StreamUnionProducer[T, T2, T3](others)
-    connect(this, ret)
-    ret
-  }
-
-  def streamUnion[T2,T3](other : StreamProducer[T2]) : StreamProducer[T3] = {
-    streamUnion(Seq(other))
-  }
-
-  def streamUnion[T2,T3](others : util.List[StreamProducer[T2]]) : StreamProducer[T3] = streamUnion(others.asScala)
-
-  override def groupBy(strategy : PartitionStrategy) : StreamProducer[T] = {
-    val ret = GroupByStrategyProducer(strategy)
-    connect(this, ret)
-    ret
-  }
-
-
-  override def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean=true,
strategy : PartitionStrategy=null):AlertStreamProducer = {
-    val ret = AlertStreamProducer(upStreamNames, alertExecutorId, consume, strategy)
-    connect(this, ret)
-    ret
-  }
-
-  def alertWithConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy):
Unit ={
-    alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = true, strategy = strategy)
-  }
-
-  def alertWithConsumer(upStreamName: String, alertExecutorId : String): Unit ={
-    alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = true)
-  }
-
-  def alertWithoutConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy):
Unit ={
-    alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = false, strategy)
-  }
-
-  def alertWithoutConsumer(upStreamName: String, alertExecutorId : String): Unit ={
-    alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = false)
-  }
-
-  def aggregate(upStreamNames: java.util.List[String], queryExecutorId : String, strategy:
PartitionStrategy = null): StreamProducer[T] = {
-    val ret= AggregateProducer(upStreamNames, queryExecutorId, null, strategy)
-    connect(this, ret)
-    ret
-  }
-
-  def aggregateDirect(upStreamNames: java.util.List[String], cql : String, strategy: PartitionStrategy):
StreamProducer[T] = {
-    val ret= AggregateProducer(upStreamNames, null, cql, strategy)
-    connect(this, ret)
-    ret
-  }
-
-  def persist(executorId : String, storageType: StorageType.StorageType) : StreamProducer[T]
= {
-    val ret = PersistProducer(executorId, storageType)
-    connect(this, ret)
-    ret
-  }
-
-  def connect[T1,T2](current: StreamProducer[T1], next: StreamProducer[T2]) = {
-    if(current.graph == null) throw new NullPointerException(s"$current has not been registered
to any graph before being connected")
-    current.graph.addVertex(next)
-    current.graph.addEdge(current, next, StreamConnector(current, next))
-    passOnContext[T1,T2](current, next)
-  }
-
-  def connect[T2]( next: StreamProducer[T2]) = {
-    if(this.graph == null) throw new NullPointerException("graph is null")
-    this.graph.addVertex(next)
-    this.graph.addEdge(this, next, StreamConnector(this, next))
-    passOnContext[T,T2](this, next)
-  }
-
-  private def passOnContext[T1 ,T2](current: StreamProducer[T1], next: StreamProducer[T2]):
Unit ={
-    next.initWith(current.graph,current.config)
-  }
-
-  /**
-   * can be set by programatically or by configuration
-   */
-  override def parallelism(parallelism : Int) : StreamProducer[T] = {
-    this.parallelismNum = parallelism
-    this
-  }
-
-  override def parallelism : Int = this.parallelismNum
-  override def stream:String = this.streamId
-
-  /**
-   * Component name
-   *
-   * @param componentName component name
-   * @return
-   */
-  override def nameAs(componentName : String) : StreamProducer[T] = {
-    this.name = componentName
-    this
-  }
-}
-
-case class FilterProducer[T](fn : T => Boolean) extends StreamProducer[T]{
-  override def toString: String = s"FilterProducer"
-}
-
-case class FlatMapProducer[T, R](var mapper: FlatMapper[R]) extends StreamProducer[R]{
-  override def toString: String = mapper.toString
-}
-
-case class MapperProducer[T,R](numOutputFields : Int, var fn : T => R) extends StreamProducer[R]{
-  override def toString: String = s"MapperProducer"
-}
-
-case class ForeachProducer[T](var fn : T => Unit) extends StreamProducer[T]
-
-abstract class GroupByProducer[T] extends StreamProducer[T]
-case class GroupByFieldProducer[T](fields : Seq[Int]) extends GroupByProducer[T]
-case class GroupByStrategyProducer[T](partitionStrategy: PartitionStrategy) extends GroupByProducer[T]
-case class GroupByKeyProducer[T](keySelectorFunc:T => Any) extends GroupByProducer[T]{
-  override def toString: String = s"GroupByKey"
-}
-
-object GroupByProducer {
-  def apply[T](fields: Seq[Int]) = new GroupByFieldProducer[T](fields)
-  def apply[T](partitionStrategy : PartitionStrategy) = new GroupByStrategyProducer[T](partitionStrategy)
-  def apply[T](keySelector:T => Any) = new GroupByKeyProducer[T](keySelector)
-}
-
-case class StreamUnionProducer[T1,T2,T3](others: Seq[StreamProducer[T2]]) extends StreamProducer[T3]
-
-case class StormSourceProducer[T](source: BaseRichSpout) extends StreamProducer[T]{
-  var numFields : Int = 0
-
-  /**
-    * rename outputfields to f0, f1, f2, ...
-   * if one spout declare some field names, those fields names will be modified
-   * @param n
-   */
-  def withOutputFields(n : Int): StormSourceProducer[T] ={
-    this.numFields = n
-    this
-  }
-}
-
-case class IterableStreamProducer[T](iterable: Iterable[T],recycle:Boolean = false) extends
StreamProducer[T]{
-  override def toString: String = s"IterableStreamProducer(${iterable.getClass.getSimpleName}))"
-}
-case class IteratorStreamProducer[T](iterator: Iterator[T]) extends StreamProducer[T]{
-  override def toString: String = s"IteratorStreamProducer(${iterator.getClass.getSimpleName})"
-}
-
-case class AlertStreamProducer(var upStreamNames: util.List[String], alertExecutorId : String,
var consume: Boolean=true, strategy: PartitionStrategy=null) extends StreamProducer[AlertAPIEntity]
{
-  def consume(consume: Boolean): AlertStreamProducer = {
-    this.consume = consume
-    this
-  }
-}
-
-case class AggregateProducer[T](var upStreamNames: util.List[String], analyzerId : String,
cepQl: String = null, strategy:PartitionStrategy = null) extends StreamProducer[T]
-
-case class PersistProducer[T](executorId :String, storageType: StorageType.StorageType) extends
StreamProducer[T]
-
-object UniqueId{
-  val id : AtomicInteger = new AtomicInteger(0);
-  def incrementAndGetId() : Int = {
-    id.incrementAndGet()
-  }
-}
-
-trait KeySelector extends Serializable{
-  def key(t:Any):Any
-}
-
-case class KeySelectorWrapper[T](fn:T => Any) extends KeySelector{
-  override def key(t: Any): Any = fn(t.asInstanceOf[T])
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c66b5255/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
deleted file mode 100644
index 379c859..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream.storm
-
-import backtype.storm.topology.base.BaseRichBolt
-import com.typesafe.config.Config
-import org.apache.eagle.datastream._
-import org.apache.eagle.datastream.core._
-
-object StormBoltFactory {
-  def getBoltWrapper(graph: StreamProducerGraph, producer : StreamProducer[Any], config :
Config) : BaseRichBolt = {
-    implicit val streamInfo = producer.getInfo
-    producer match{
-      case FlatMapProducer(worker) => {
-        if(worker.isInstanceOf[JavaStormStreamExecutor[AnyRef]]){
-          worker.asInstanceOf[JavaStormStreamExecutor[AnyRef]].prepareConfig(config)
-          JavaStormBoltWrapper(worker.asInstanceOf[JavaStormStreamExecutor[AnyRef]])
-        }else if(worker.isInstanceOf[StormStreamExecutor[AnyRef]]){
-          worker.asInstanceOf[StormStreamExecutor[AnyRef]].prepareConfig(config)
-          StormBoltWrapper(worker.asInstanceOf[StormStreamExecutor[AnyRef]])
-        }else if(worker.isInstanceOf[FlatMapperWrapper[Any]]){
-          StormFlatFunctionWrapper(worker.asInstanceOf[FlatMapperWrapper[Any]].func)
-        } else {
-          StormFlatMapperWrapper(worker)
-        }
-//        else {
-//          throw new UnsupportedOperationException(s"Unsupported FlatMapperProducer type:
$producer")
-//        }
-      }
-      case filter:FilterProducer[Any] => {
-        FilterBoltWrapper(filter.fn)
-      }
-      case mapper:MapperProducer[Any,Any] => {
-        MapBoltWrapper(mapper.numOutputFields, mapper.fn)
-      }
-      case foreach:ForeachProducer[Any] => {
-        ForeachBoltWrapper(foreach.fn)
-      }
-      case persist : PersistProducer[Any] => {
-        JavaStormBoltWrapper(null)
-      }
-      case _ => throw new UnsupportedOperationException(s"Unsupported producer: ${producer.toString}")
-    }
-  }
-}
\ No newline at end of file


Mime
View raw message