Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D9929200B60 for ; Sun, 14 Aug 2016 08:41:32 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D81D5160AA9; Sun, 14 Aug 2016 06:41:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 857F2160A81 for ; Sun, 14 Aug 2016 08:41:31 +0200 (CEST) Received: (qmail 88420 invoked by uid 500); 14 Aug 2016 06:41:30 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 88411 invoked by uid 99); 14 Aug 2016 06:41:30 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 14 Aug 2016 06:41:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 3DD80C7634 for ; Sun, 14 Aug 2016 06:41:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.739 X-Spam-Level: X-Spam-Status: No, score=-3.739 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.519] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id WYmuisl9edDo for ; Sun, 14 Aug 2016 06:41:27 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 4F9B85F2C4 for ; Sun, 14 Aug 2016 06:41:26 +0000 (UTC) Received: (qmail 88384 invoked by uid 99); 14 Aug 2016 06:41:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 14 Aug 2016 06:41:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6D8C8E0FC4; Sun, 14 Aug 2016 06:41:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yonzhang2012@apache.org To: commits@eagle.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-eagle git commit: large PRD missed some files Author: @yonzhang2012 Closes: #342 Date: Sun, 14 Aug 2016 06:41:25 +0000 (UTC) archived-at: Sun, 14 Aug 2016 06:41:33 -0000 Repository: incubator-eagle Updated Branches: refs/heads/develop 0f11a591a -> c66b52554 large PRD missed some files Author: @yonzhang2012 Closes: #342 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c66b5255 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c66b5255 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c66b5255 Branch: refs/heads/develop Commit: c66b52554e7de8bd8c1be1928aee90303ada8fbd Parents: 0f11a59 Author: yonzhang Authored: Sat Aug 13 23:45:19 2016 -0700 Committer: yonzhang Committed: Sat Aug 13 23:45:19 2016 -0700 ---------------------------------------------------------------------- .../datastream/ExecutionEnvironments.scala | 109 ------ .../datastream/core/ExecutionEnvironment.scala | 45 --- .../eagle/datastream/core/StreamBuilder.scala | 87 ----- .../eagle/datastream/core/StreamProducer.scala | 332 ------------------- .../datastream/storm/StormBoltFactory.scala | 61 ---- 5 files changed, 634 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c66b5255/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala deleted file mode 100644 index c3ddb00..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream - -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.eagle.dataproc.util.ConfigOptionParser -import org.apache.eagle.datastream.core._ -import org.apache.eagle.datastream.storm.StormExecutionEnvironment - -import scala.reflect.runtime.universe._ - -/** - * Execution environment factory - * - * The factory is mainly used for create or manage execution environment, - * and also handles the shared works like configuration, arguments for execution environment - * - * Notice: this factory class should not know any implementation like storm or spark - * - * @since 0.3.0 - */ -object ExecutionEnvironments{ - type storm = StormExecutionEnvironment - - - /** - * @param typeTag - * @tparam T - * @return - */ - def get[T<:ExecutionEnvironment](implicit typeTag: TypeTag[T]): T ={ - getWithConfig[T](ConfigFactory.load()) - } - - /** - * - * @param config - * @param typeTag - * @tparam T - * @return - */ - def getWithConfig[T <: ExecutionEnvironment](config:Config)(implicit typeTag: TypeTag[T]): T ={ - typeTag.mirror.runtimeClass(typeOf[T]).getConstructor(classOf[Config]).newInstance(config).asInstanceOf[T] - } - - /** - * - * @param args - * @param typeTag - * @tparam T - * @return - */ - def get[T<:ExecutionEnvironment](args:Array[String])(implicit typeTag: TypeTag[T]): T ={ - getWithConfig[T](new ConfigOptionParser().load(args)) - } - - /** - * Support java style for default config - * - * @param clazz execution environment class - * @tparam T execution environment type - * @return - */ - def get[T<:ExecutionEnvironment](clazz:Class[T]):T ={ - get[T](ConfigFactory.load(),clazz) - } - - def get[T<:ExecutionEnvironment](clazz:Class[T], config:Config):T ={ - get[T](config,clazz) - } - - /** - * Support java style - * - * @param config command config - * @param clazz execution environment class - * @tparam T execution environment type - * @return - */ - def get[T<:ExecutionEnvironment](config:Config,clazz:Class[T]):T ={ - clazz.getConstructor(classOf[Config]).newInstance(config) - } - - /** - * Support java style - * - * @param args command arguments in string array - * @param clazz execution environment class - * @tparam T execution environment type - * @return - */ - def get[T<:ExecutionEnvironment](args:Array[String],clazz:Class[T]):T ={ - clazz.getConstructor(classOf[Config]).newInstance(new ConfigOptionParser().load(args)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c66b5255/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala deleted file mode 100644 index 444559a..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import com.typesafe.config.Config - -trait StreamContextAdapter{ - def submit(context:StreamContext):Unit = { - execute(null) - } - def execute(dag: StreamDAG) -} - -/** - * TODO: Decouple execution environment with stream context - * - * @since 0.3.0 - */ -abstract class ExecutionEnvironment(private val conf:Config) - extends StreamContext(conf) with StreamContextAdapter // Continue to support old API - with StreamSourceBuilder -{ - /** - * Start to execute - */ - def execute():Unit = { - submit(this) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c66b5255/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala deleted file mode 100644 index fcd89c0..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.core - -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.eagle.dataproc.util.ConfigOptionParser -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -import scala.reflect -import scala.reflect.runtime.universe._ - -trait StreamContextBuilder extends StreamSourceBuilder { - def config:Configuration - /** - * Business logic DAG - * - * @return - */ - def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]] - /** - * Support Java Style Config - * - * @return - */ - def getConfig:Config = config.get - def build:StreamDAG - def submit[E<:ExecutionEnvironment](implicit typeTag:TypeTag[E]):Unit - def submit(env:ExecutionEnvironment):Unit - def submit(clazz:Class[ExecutionEnvironment]):Unit -} - -class StreamContext(private val conf:Config) extends StreamContextBuilder{ - implicit private val _dag = new DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]](classOf[StreamConnector[Any,Any]]) - private val _config:Configuration = Configuration(conf) - override def dag = _dag - override def config = _config - - def build:StreamDAG = { - null - } - - - override def submit(env: ExecutionEnvironment): Unit = { - env.submit(this) - } - - def submit[E<:ExecutionEnvironment](implicit typeTag:TypeTag[E]):Unit = { - - } - - - override def submit(clazz: Class[ExecutionEnvironment]): Unit = { - - } -} - -object StreamContext { - /** - * @return - */ - def apply():StreamContext = { - new StreamContext(ConfigFactory.load()) - } - - /** - * - * @param args - * @return - */ - def apply(args:Array[String]):StreamContext ={ - new StreamContext(new ConfigOptionParser().load(args)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c66b5255/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala deleted file mode 100644 index 33cc2dd..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.core - -import java.util -import java.util.concurrent.atomic.AtomicInteger - -import backtype.storm.topology.base.BaseRichSpout -import com.typesafe.config.Config -import org.apache.eagle.alert.entity.AlertAPIEntity -import org.apache.eagle.datastream.{FlatMapperWrapper, Collector, FlatMapper} -import org.apache.eagle.partition.PartitionStrategy -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory - -import scala.collection.JavaConversions.{asScalaBuffer, seqAsJavaList} -import scala.collection.JavaConverters.asScalaBufferConverter -/** - * StreamProducer = StreamInfo + StreamProtocol - * - * StreamProducer is processing logic element, used the base class for all other concrete StreamProducer - * Stream Producer can be treated as logical processing element but physical - * It defines high level type-safe API for user to organize data stream flow - * - * StreamProducer is independent of execution environment - * - * @tparam T processed elements type - */ -abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[T] { - private var graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]] = null - private val LOG = LoggerFactory.getLogger(classOf[StreamProducer[T]]) - - /** - * Should not modify graph when setting it - */ - def initWith(graph:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]],config:Config, hook:Boolean = true):StreamProducer[T]={ - this.config = config - this.graph = graph - if(hook && ! this.graph.containsVertex(this)) { - this.graph.addVertex(this) - } - LOG.debug(this.graph.toString) - this - } - - /** - * Get stream pure metadata info - * @return - */ - override def getInfo:StreamInfo = this.asInstanceOf[StreamInfo] - - override def stream(streamId:String):StreamProducer[T] = { - this.streamId = streamId - this - } - - override def filter(fn : T => Boolean): StreamProducer[T] ={ - val ret = FilterProducer[T](fn) - connect(this, ret) - ret - } - - override def flatMap[R](flatMapper:FlatMapper[R]): StreamProducer[R] = { - val ret = FlatMapProducer[T,R](flatMapper) - connect(this, ret) - ret - } - override def flatMap[R](func:(Any,Collector[R])=>Unit): StreamProducer[R] = { - val ret = FlatMapProducer[T,R](FlatMapperWrapper[R](func)) - connect(this, ret) - ret - } - - - override def foreach(fn : T => Unit) : Unit = { - val ret = ForeachProducer[T](fn) - connect(this, ret) - } - - override def map[R](fn : T => R) : StreamProducer[R] = { - val ret = MapperProducer[T,R](0,fn) - connect(this, ret) - ret - } - - override def map1[R](fn : T => R): StreamProducer[R] = { - val ret = MapperProducer[T,R](1, fn) - connect(this, ret) - ret - } - - override def map2[R](fn : T => R): StreamProducer[R] = { - val ret = MapperProducer[T,R](2, fn) - connect(this, ret) - ret - } - - override def map3[R](fn : T => R): StreamProducer[R] = { - val ret = MapperProducer(3, fn) - connect(this, ret) - ret - } - - override def map4[R](fn : T => R): StreamProducer[R] = { - val ret = MapperProducer(4, fn) - connect(this, ret) - ret - } - - /** - * starting from 0, groupby operator would be upon edge of the graph - */ - override def groupBy(fields : Int*) : StreamProducer[T] = { - // validate each field index is greater or equal to 0 - fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0")) - val ret = GroupByFieldProducer[T](fields) - connect(this, ret) - ret - } - - def groupByFieldIndex(fields : Seq[Int]) : StreamProducer[T] = { - // validate each field index is greater or equal to 0 - fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0")) - val ret = GroupByFieldProducer[T](fields) - connect(this, ret) - ret - } - - //groupBy java version, starting from 1 - override def groupBy(fields : java.util.List[Integer]) : StreamProducer[T] = { - // validate each field index is greater or equal to 0 - fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0")) - val ret = GroupByFieldProducer[T](fields.asScala.toSeq.asInstanceOf[Seq[Int]]) - connect(this, ret) - ret - } - - override def groupByKey(keySelector: T=> Any):StreamProducer[T] = { - val ret = GroupByKeyProducer(keySelector) - connect(this,ret) - ret - } - - override def streamUnion[T2,T3](others : Seq[StreamProducer[T2]]) : StreamProducer[T3] = { - val ret = StreamUnionProducer[T, T2, T3](others) - connect(this, ret) - ret - } - - def streamUnion[T2,T3](other : StreamProducer[T2]) : StreamProducer[T3] = { - streamUnion(Seq(other)) - } - - def streamUnion[T2,T3](others : util.List[StreamProducer[T2]]) : StreamProducer[T3] = streamUnion(others.asScala) - - override def groupBy(strategy : PartitionStrategy) : StreamProducer[T] = { - val ret = GroupByStrategyProducer(strategy) - connect(this, ret) - ret - } - - - override def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean=true, strategy : PartitionStrategy=null):AlertStreamProducer = { - val ret = AlertStreamProducer(upStreamNames, alertExecutorId, consume, strategy) - connect(this, ret) - ret - } - - def alertWithConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={ - alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = true, strategy = strategy) - } - - def alertWithConsumer(upStreamName: String, alertExecutorId : String): Unit ={ - alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = true) - } - - def alertWithoutConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={ - alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = false, strategy) - } - - def alertWithoutConsumer(upStreamName: String, alertExecutorId : String): Unit ={ - alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = false) - } - - def aggregate(upStreamNames: java.util.List[String], queryExecutorId : String, strategy: PartitionStrategy = null): StreamProducer[T] = { - val ret= AggregateProducer(upStreamNames, queryExecutorId, null, strategy) - connect(this, ret) - ret - } - - def aggregateDirect(upStreamNames: java.util.List[String], cql : String, strategy: PartitionStrategy): StreamProducer[T] = { - val ret= AggregateProducer(upStreamNames, null, cql, strategy) - connect(this, ret) - ret - } - - def persist(executorId : String, storageType: StorageType.StorageType) : StreamProducer[T] = { - val ret = PersistProducer(executorId, storageType) - connect(this, ret) - ret - } - - def connect[T1,T2](current: StreamProducer[T1], next: StreamProducer[T2]) = { - if(current.graph == null) throw new NullPointerException(s"$current has not been registered to any graph before being connected") - current.graph.addVertex(next) - current.graph.addEdge(current, next, StreamConnector(current, next)) - passOnContext[T1,T2](current, next) - } - - def connect[T2]( next: StreamProducer[T2]) = { - if(this.graph == null) throw new NullPointerException("graph is null") - this.graph.addVertex(next) - this.graph.addEdge(this, next, StreamConnector(this, next)) - passOnContext[T,T2](this, next) - } - - private def passOnContext[T1 ,T2](current: StreamProducer[T1], next: StreamProducer[T2]): Unit ={ - next.initWith(current.graph,current.config) - } - - /** - * can be set by programatically or by configuration - */ - override def parallelism(parallelism : Int) : StreamProducer[T] = { - this.parallelismNum = parallelism - this - } - - override def parallelism : Int = this.parallelismNum - override def stream:String = this.streamId - - /** - * Component name - * - * @param componentName component name - * @return - */ - override def nameAs(componentName : String) : StreamProducer[T] = { - this.name = componentName - this - } -} - -case class FilterProducer[T](fn : T => Boolean) extends StreamProducer[T]{ - override def toString: String = s"FilterProducer" -} - -case class FlatMapProducer[T, R](var mapper: FlatMapper[R]) extends StreamProducer[R]{ - override def toString: String = mapper.toString -} - -case class MapperProducer[T,R](numOutputFields : Int, var fn : T => R) extends StreamProducer[R]{ - override def toString: String = s"MapperProducer" -} - -case class ForeachProducer[T](var fn : T => Unit) extends StreamProducer[T] - -abstract class GroupByProducer[T] extends StreamProducer[T] -case class GroupByFieldProducer[T](fields : Seq[Int]) extends GroupByProducer[T] -case class GroupByStrategyProducer[T](partitionStrategy: PartitionStrategy) extends GroupByProducer[T] -case class GroupByKeyProducer[T](keySelectorFunc:T => Any) extends GroupByProducer[T]{ - override def toString: String = s"GroupByKey" -} - -object GroupByProducer { - def apply[T](fields: Seq[Int]) = new GroupByFieldProducer[T](fields) - def apply[T](partitionStrategy : PartitionStrategy) = new GroupByStrategyProducer[T](partitionStrategy) - def apply[T](keySelector:T => Any) = new GroupByKeyProducer[T](keySelector) -} - -case class StreamUnionProducer[T1,T2,T3](others: Seq[StreamProducer[T2]]) extends StreamProducer[T3] - -case class StormSourceProducer[T](source: BaseRichSpout) extends StreamProducer[T]{ - var numFields : Int = 0 - - /** - * rename outputfields to f0, f1, f2, ... - * if one spout declare some field names, those fields names will be modified - * @param n - */ - def withOutputFields(n : Int): StormSourceProducer[T] ={ - this.numFields = n - this - } -} - -case class IterableStreamProducer[T](iterable: Iterable[T],recycle:Boolean = false) extends StreamProducer[T]{ - override def toString: String = s"IterableStreamProducer(${iterable.getClass.getSimpleName}))" -} -case class IteratorStreamProducer[T](iterator: Iterator[T]) extends StreamProducer[T]{ - override def toString: String = s"IteratorStreamProducer(${iterator.getClass.getSimpleName})" -} - -case class AlertStreamProducer(var upStreamNames: util.List[String], alertExecutorId : String, var consume: Boolean=true, strategy: PartitionStrategy=null) extends StreamProducer[AlertAPIEntity] { - def consume(consume: Boolean): AlertStreamProducer = { - this.consume = consume - this - } -} - -case class AggregateProducer[T](var upStreamNames: util.List[String], analyzerId : String, cepQl: String = null, strategy:PartitionStrategy = null) extends StreamProducer[T] - -case class PersistProducer[T](executorId :String, storageType: StorageType.StorageType) extends StreamProducer[T] - -object UniqueId{ - val id : AtomicInteger = new AtomicInteger(0); - def incrementAndGetId() : Int = { - id.incrementAndGet() - } -} - -trait KeySelector extends Serializable{ - def key(t:Any):Any -} - -case class KeySelectorWrapper[T](fn:T => Any) extends KeySelector{ - override def key(t: Any): Any = fn(t.asInstanceOf[T]) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c66b5255/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala deleted file mode 100644 index 379c859..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.datastream.storm - -import backtype.storm.topology.base.BaseRichBolt -import com.typesafe.config.Config -import org.apache.eagle.datastream._ -import org.apache.eagle.datastream.core._ - -object StormBoltFactory { - def getBoltWrapper(graph: StreamProducerGraph, producer : StreamProducer[Any], config : Config) : BaseRichBolt = { - implicit val streamInfo = producer.getInfo - producer match{ - case FlatMapProducer(worker) => { - if(worker.isInstanceOf[JavaStormStreamExecutor[AnyRef]]){ - worker.asInstanceOf[JavaStormStreamExecutor[AnyRef]].prepareConfig(config) - JavaStormBoltWrapper(worker.asInstanceOf[JavaStormStreamExecutor[AnyRef]]) - }else if(worker.isInstanceOf[StormStreamExecutor[AnyRef]]){ - worker.asInstanceOf[StormStreamExecutor[AnyRef]].prepareConfig(config) - StormBoltWrapper(worker.asInstanceOf[StormStreamExecutor[AnyRef]]) - }else if(worker.isInstanceOf[FlatMapperWrapper[Any]]){ - StormFlatFunctionWrapper(worker.asInstanceOf[FlatMapperWrapper[Any]].func) - } else { - StormFlatMapperWrapper(worker) - } -// else { -// throw new UnsupportedOperationException(s"Unsupported FlatMapperProducer type: $producer") -// } - } - case filter:FilterProducer[Any] => { - FilterBoltWrapper(filter.fn) - } - case mapper:MapperProducer[Any,Any] => { - MapBoltWrapper(mapper.numOutputFields, mapper.fn) - } - case foreach:ForeachProducer[Any] => { - ForeachBoltWrapper(foreach.fn) - } - case persist : PersistProducer[Any] => { - JavaStormBoltWrapper(null) - } - case _ => throw new UnsupportedOperationException(s"Unsupported producer: ${producer.toString}") - } - } -} \ No newline at end of file