Return-Path: X-Original-To: apmail-eagle-commits-archive@minotaur.apache.org Delivered-To: apmail-eagle-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 614C618C06 for ; Thu, 3 Mar 2016 18:10:22 +0000 (UTC) Received: (qmail 55639 invoked by uid 500); 3 Mar 2016 18:10:22 -0000 Delivered-To: apmail-eagle-commits-archive@eagle.apache.org Received: (qmail 55609 invoked by uid 500); 3 Mar 2016 18:10:22 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 55600 invoked by uid 99); 3 Mar 2016 18:10:22 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Mar 2016 18:10:22 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 45E8EC0420 for ; Thu, 3 Mar 2016 18:10:21 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id pj4FYmrEfjzg for ; Thu, 3 Mar 2016 18:10:10 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 665B560E7B for ; Thu, 3 Mar 2016 18:09:39 +0000 (UTC) Received: (qmail 51155 invoked by uid 99); 3 Mar 2016 18:09:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Mar 2016 18:09:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5519FE78F8; Thu, 3 Mar 2016 18:09:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hao@apache.org To: commits@eagle.incubator.apache.org Date: Thu, 03 Mar 2016 18:10:09 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [36/51] [partial] incubator-eagle git commit: EAGLE-184 Migrate eagle website from https://github.com/eaglemonitoring/eaglemonitoring.github.io to document branch http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala deleted file mode 100644 index 1e1664a..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.core - -trait AbstractTopologyExecutor { - def execute -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala deleted file mode 100644 index e3f3050..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import com.typesafe.config.{Config, _} - -import scala.reflect.runtime.universe._ - -/** - * @since 12/4/15 - */ -case class Configuration(private var config:Config) extends Serializable{ - def get:Config = config - - def set[T<:AnyRef](key:String,value:T): Unit = { - config = config.withValue(key,ConfigValueFactory.fromAnyRef(value)) - } - - /** - * - * @param key config key - * @param default default value - * @tparam T return type - * @return - */ - def get[T](key:String,default:T=null)(implicit tag:TypeTag[T]):T = { - if(get.hasPath(key)) { - get(key) - } else default - } - - def get[T](key:String)(implicit tag: TypeTag[T]):T = tag.tpe match { - case STRING_TYPE => config.getString(key).asInstanceOf[T] - case TypeTag.Double => get.getDouble(key).asInstanceOf[T] - case TypeTag.Long => get.getLong(key).asInstanceOf[T] - case TypeTag.Int => get.getInt(key).asInstanceOf[T] - case TypeTag.Byte => get.getBytes(key).asInstanceOf[T] - case TypeTag.Boolean => get.getBoolean(key).asInstanceOf[T] - case NUMBER_TYPE => get.getNumber(key).asInstanceOf[T] - case OBJECT_TYPE => get.getObject(key).asInstanceOf[T] - case VALUE_TYPE => get.getValue(key).asInstanceOf[T] - case ANY_REF_TYPE => get.getAnyRef(key).asInstanceOf[T] - case INT_LIST_TYPE => get.getIntList(key).asInstanceOf[T] - case DOUBLE_LIST_TYPE => get.getDoubleList(key).asInstanceOf[T] - case BOOL_LIST_TYPE => get.getBooleanList(key).asInstanceOf[T] - case LONG_LIST_TYPE => get.getLongList(key).asInstanceOf[T] - case _ => throw new UnsupportedOperationException(s"$tag is not supported yet") - } - - val STRING_TYPE = typeOf[String] - val NUMBER_TYPE = typeOf[Number] - val INT_LIST_TYPE = typeOf[List[Int]] - val BOOL_LIST_TYPE = typeOf[List[Boolean]] - val DOUBLE_LIST_TYPE = typeOf[List[Double]] - val LONG_LIST_TYPE = typeOf[List[Double]] - val OBJECT_TYPE = typeOf[ConfigObject] - val VALUE_TYPE = typeOf[ConfigValue] - val ANY_REF_TYPE = typeOf[AnyRef] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala deleted file mode 100644 index c511484..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import com.typesafe.config.Config - -trait StreamContextAdapter{ - def submit(context:StreamContext):Unit = { - execute(context.build) - } - def execute(dag: StreamDAG) -} - -/** - * TODO: Decouple execution environment with stream context - * - * @since 0.3.0 - */ -abstract class ExecutionEnvironment(private val conf:Config) - extends StreamContext(conf) with StreamContextAdapter // Continue to support old API - with StreamSourceBuilder -{ - /** - * Start to execute - */ - def execute():Unit = { - submit(this) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala deleted file mode 100644 index 9564a0d..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala +++ /dev/null @@ -1,80 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.core - -import com.typesafe.config.Config -import org.apache.eagle.dataproc.impl.aggregate.AggregateExecutorFactory -import org.apache.eagle.datastream.FlatMapper -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -import scala.collection.JavaConversions.asScalaSet -import scala.collection.mutable.ListBuffer - -/** - * The expansion job for stream analyze - * - * TODO : should re-use flow with stream alert expansion, make code cleaner - */ -class StreamAggregateExpansion(config: Config) extends StreamAlertExpansion(config) { - - override def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any, Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]], - dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any, Any]], current: StreamProducer[Any], - child: StreamProducer[Any]): Unit = { - child match { - case AggregateProducer(upStreamNames, analyzerId, cepQl, strategy) => { - /** - * Rewrite the tree to add output field wrapper since policy executors accept only fixed tuple format - */ - val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames) - - val analyzeExecutors = if (cepQl != null) { - AggregateExecutorFactory.Instance.createExecutors(upStreamNames, cepQl) - } else { - AggregateExecutorFactory.Instance.createExecutors(config, upStreamNames, analyzerId) - } - - analyzeExecutors.foreach(exec => { - val t = FlatMapProducer(exec.asInstanceOf[FlatMapper[Any]]).initWith(dag,config, hook = false).nameAs(exec.getExecutorId + "_" + exec.getPartitionSeq).stream(child.stream) - - // connect with previous - if (strategy == null) { - newStreamProducers.foreach(s => toBeAddedEdges += StreamConnector(s, t)) - } else { - newStreamProducers.foreach(s => toBeAddedEdges += StreamConnector(s, t, strategy)) - } - - // connect with next - val outgoingEdges = dag.outgoingEdgesOf(child) - outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(t, e.to, e)) - }) - - // remote current child - toBeRemovedVertex += child - } - case _ => - } - } -} - -object StreamAggregateExpansion{ - def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamAggregateExpansion ={ - val e = new StreamAggregateExpansion(config) - e.expand(dag) - e - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala deleted file mode 100644 index 618bba3..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala +++ /dev/null @@ -1,257 +0,0 @@ -/* - - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import java.util - -import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity -import org.apache.eagle.alert.executor.AlertExecutorCreationUtils -import org.apache.eagle.policy.common.Constants -import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl - -import scala.collection.JavaConversions.asScalaSet -import scala.collection.mutable.ListBuffer -import org.apache.eagle.datastream.JavaStormExecutorForAlertWrapper -import org.apache.eagle.datastream.JavaStormStreamExecutor -import org.apache.eagle.datastream.StormStreamExecutor -import org.apache.eagle.datastream.storm.StormExecutorForAlertWrapper -import org.apache.eagle.datastream.utils.AlertExecutorConsumerUtils -import org.apache.eagle.service.client.EagleServiceConnector -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory - -import com.typesafe.config.Config - -/** - * The constraints for alert is: - * 1. only 3 StreamProducers can be put immediately before MapProducer, FlatMapProducer, StreamUnionProducer - * 2. For StreamUnionProducer, the only supported unioned producers are MapProducer and FlatMapProducer - * 3. the output for MapProducer and FlatMapProducer is 2-field tuple, key and value, key is string, value has to be SortedMap - * 4. the framework will wrapper original MapProducer and FlatMapProducer to emit 3-field tuple, {key, streamName and value} - * 5. the framework will automatically partition traffic with first field - * - * - * 2 steps - * step 1: wrapper previous StreamProducer with one more field "streamName" - * step 2: partition alert executor by policy partitioner class - */ - -case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) { - val LOG = LoggerFactory.getLogger(classOf[StreamAlertExpansion]) - import StreamAlertExpansion._ - - override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): Unit ={ - val iter = dag.iterator() - val toBeAddedEdges = new ListBuffer[StreamConnector[Any,Any]] - val toBeRemovedVertex = new ListBuffer[StreamProducer[Any]] - while(iter.hasNext) { - val current = iter.next() - dag.outgoingEdgesOf(current).foreach(edge => { - val child = edge.to - onIteration(toBeAddedEdges, toBeRemovedVertex, dag, current, child) - }) - } - // add back edges - toBeAddedEdges.foreach(e => { - dag.addVertex(e.from) - dag.addVertex(e.to) - dag.addEdge(e.from, e.to, e) - }) - toBeRemovedVertex.foreach(v => dag.removeVertex(v)) - } - - def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]], - dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], child: StreamProducer[Any]): Unit = { - child match { - case AlertStreamProducer(upStreamNames, alertExecutorId, withConsumer,strategy) => { - /** - * step 1: wrapper previous StreamProducer with one more field "streamName" - * for AlertStreamSink, we check previous StreamProducer and replace that - */ - val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames) - - /** - * step 2: partition alert executor by policy partitioner class - */ - val alertExecutors = AlertExecutorCreationUtils.createAlertExecutors(config, - new PolicyDefinitionEntityDAOImpl[AlertDefinitionAPIEntity](new EagleServiceConnector(config), Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME), - upStreamNames, alertExecutorId) - var alertProducers = new scala.collection.mutable.MutableList[StreamProducer[Any]] - alertExecutors.foreach(exec => { - val t = FlatMapProducer(exec).nameAs(exec.getExecutorId + "_" + exec.getPartitionSeq).initWith(dag,config, hook = false) - alertProducers += t - newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector[Any,Any](newsp, t,Seq(0))) - if (strategy == null) { - newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp,t,Seq(0))) - } - else { - newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp,t,strategy)) - } - }) - - // remove AlertStreamSink - toBeRemovedVertex += child - - // add alert consumer if necessary - if (withConsumer) { - AlertExecutorConsumerUtils.setupAlertConsumers(toBeAddedEdges, alertProducers.toList) - } - } - case _ => - } - } - - protected def rewriteWithStreamOutputWrapper(current: org.apache.eagle.datastream.core.StreamProducer[Any], dag: org.jgrapht.experimental.dag.DirectedAcyclicGraph[org.apache.eagle.datastream.core.StreamProducer[Any],org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeAddedEdges: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeRemovedVertex: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamProducer[Any]], upStreamNames: java.util.List[String]) = { - if(upStreamNames == null) throw new NullPointerException("upStreamNames is null") - - /** - * step 1: wrapper previous StreamProducer with one more field "streamName" - * for AlertStreamSink, we check previous StreamProducer and replace that - */ - val newStreamProducers = new ListBuffer[StreamProducer[Any]] - current match { - case StreamUnionProducer(others) => { - val incomingEdges = dag.incomingEdgesOf(current) - incomingEdges.foreach(e => newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, e.from, upStreamNames.get(0))) - var i: Int = 1 - others.foreach(o => { - newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, o, upStreamNames.get(i)) - i += 1 - }) - } - case p: FlatMapProducer[AnyRef, AnyRef] => { - newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(p,upStreamNames)) - } - case p: MapperProducer[AnyRef,AnyRef] => { - newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(p,upStreamNames)) - } - case s: StreamProducer[AnyRef] if dag.inDegreeOf(s) == 0 => { - newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(s,upStreamNames)) - } - case p@_ => throw new IllegalStateException(s"$p can not be put before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and MapProducer are supported") - } - newStreamProducers - } - - - protected def replace(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]], - dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], upStreamName: String) : StreamProducer[Any]= { - var newsp: StreamProducer[Any] = null - current match { - case _: FlatMapProducer[AnyRef, AnyRef] => { - val mapper = current.asInstanceOf[FlatMapProducer[_, _]].mapper - mapper match { - case a: JavaStormStreamExecutor[AnyRef] => { - val newmapper = new JavaStormExecutorForAlertWrapper(a.asInstanceOf[JavaStormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName) - newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false).stream(current.streamId) - } - case b: StormStreamExecutor[AnyRef] => { - val newmapper = StormExecutorForAlertWrapper(b.asInstanceOf[StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName) - newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false).stream(current.streamId) - } - case _ => throw new IllegalArgumentException - } - // remove old StreamProducer and replace that with new StreamProducer - val incomingEdges = dag.incomingEdgesOf(current) - incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp)) - val outgoingEdges = dag.outgoingEdgesOf(current) - outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to)) - toBeRemovedVertex += current - } - case _: MapperProducer[Any,Any] => { - val mapper = current.asInstanceOf[MapperProducer[Any,Any]].fn - val newfun: (Any => Any) = { a => - val result = mapper(a) - result match { - case scala.Tuple1(x1) => (null, upStreamName, x1) - case scala.Tuple2(x1, x2) => (x1, upStreamName, x2) - case scala.Tuple3(_, _, _) => result - case _ => throw new IllegalArgumentException(s"Illegal message :$result, Tuple1/Tuple2/Tuple3 are supported") - } - } - current match { - case MapperProducer(_, fn) => newsp = MapperProducer(3, newfun).initWith(dag,config,hook = false).stream(current.stream) - case _ => throw new IllegalArgumentException(s"Illegal producer $current") - } - val incomingEdges = dag.incomingEdgesOf(current) - incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp)) - val outgoingEdges = dag.outgoingEdgesOf(current) - outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to)) - toBeRemovedVertex += current - } - case s: StreamProducer[Any] if dag.inDegreeOf(s) == 0 => { - val fn:(AnyRef => AnyRef) = { - n => { - n match { - case scala.Tuple3 => n - case scala.Tuple2(x1,x2) => (x1,upStreamName,x2) - case scala.Tuple1(x1) => (if(x1 == null) null else x1.hashCode(),upStreamName,x1) - case _ => (if(n == null) null else n.hashCode(),upStreamName,n) - } - } - } - newsp = MapperProducer(3,fn).initWith(dag,config,hook = false).stream(s.stream) - toBeAddedEdges += StreamConnector(current,newsp) - val outgoingEdges = dag.outgoingEdgesOf(current) - outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp,e.to)) - } - case _ => throw new IllegalArgumentException("Only FlatMapProducer and MapProducer can be replaced before AlertStreamSink") - } - newsp - } -} - -object StreamAlertExpansion{ - def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamAlertExpansion ={ - val e = StreamAlertExpansion(config) - e.expand(dag) - e - } - - /** - * Try upStreamNames firstly, otherwise try producer.streamId - * - * @param producer - * @param upStreamNames - * @return - */ - private def recognizeSingleStreamName(producer: StreamProducer[AnyRef],upStreamNames:util.List[String]):String = { - if(upStreamNames == null){ - producer.streamId - }else if(upStreamNames.size()>1){ - if(producer.streamId == null) { - if (upStreamNames.size() > 1) - throw new IllegalStateException("Too many (more than 1) upStreamNames " + upStreamNames + " given for " + producer) - else - upStreamNames.get(0) - } else { - producer.streamId - } - } else if(upStreamNames.size() == 1){ - upStreamNames.get(0) - }else { - if(producer.streamId == null){ - throw new IllegalArgumentException("No stream name found for "+producer) - } else - producer.streamId - } - } -} - - http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala deleted file mode 100644 index 6e21bcc..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.core - -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.eagle.dataproc.util.ConfigOptionParser -import org.apache.eagle.datastream.ExecutionEnvironments -import org.apache.eagle.datastream.utils.GraphPrinter -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -import scala.reflect.runtime.universe._ - -trait StreamContextBuilder extends StreamSourceBuilder { - def config:Configuration - /** - * Business logic DAG - * @return - */ - def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]] - /** - * Support Java Style Config - * - * @return - */ - def getConfig:Config = config.get - def build:StreamDAG - def submit[E<:ExecutionEnvironment](implicit typeTag:TypeTag[E]):Unit - def submit(env:ExecutionEnvironment):Unit - def submit(clazz:Class[ExecutionEnvironment]):Unit -} - -class StreamContext(private val conf:Config) extends StreamContextBuilder{ - implicit private val _dag = new DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]](classOf[StreamConnector[Any,Any]]) - private val _config:Configuration = Configuration(conf) - override def dag = _dag - override def config = _config - override def build: StreamDAG = { - implicit val i_conf = _config.get - StreamNameExpansion() - GraphPrinter.print(dag,message="Before expanded DAG ") - StreamAggregateExpansion() - StreamAlertExpansion() - StreamUnionExpansion() - StreamGroupbyExpansion() - StreamParallelismConfigExpansion() - StreamNameExpansion() - GraphPrinter.print(dag,message="After expanded DAG ") - GraphPrinter.printDotDigraph(dag) - StreamDAGTransformer.transform(dag) - } - - override def submit(env: ExecutionEnvironment): Unit = { - env.submit(this) - } - - override def submit(clazz: Class[ExecutionEnvironment]): Unit = { - ExecutionEnvironments.get(clazz,conf).submit(this) - } - - override def submit[E <: ExecutionEnvironment](implicit typeTag: TypeTag[E]): Unit = { - ExecutionEnvironments.getWithConfig[E](conf).submit(this) - } -} - -object StreamContext { - /** - * @return - */ - def apply():StreamContext = { - new StreamContext(ConfigFactory.load()) - } - - /** - * - * @param args - * @return - */ - def apply(args:Array[String]):StreamContext ={ - new StreamContext(new ConfigOptionParser().load(args)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala deleted file mode 100644 index ce9d82c..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.datastream.core - -import org.apache.eagle.partition.PartitionStrategy - -abstract class StreamConnector[+T1 <: Any,+T2 <: Any](val from: StreamProducer[T1], val to: StreamProducer[T2]) extends Serializable - -case class ShuffleConnector[+T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2]) - extends StreamConnector[T1,T2](from,to){ - override def toString: String = "shuffleGroup" -} - -case class GroupbyFieldsConnector[+T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2],groupByFields : Seq[Int]) - extends StreamConnector[T1,T2](from,to){ - override def toString: String = s"groupByFields( $groupByFields )" -} - -case class GroupbyKeyConnector[T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2],keySelector: T1 => Any) - extends StreamConnector[T1,T2](from,to){ - override def toString: String = s"groupByKey($keySelector)" -} - -case class GroupbyStrategyConnector[+T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2],customGroupBy:PartitionStrategy) - extends StreamConnector[T1,T2](from,to){ - override def toString: String = s"groupByStrategy( $customGroupBy )" -} - -object StreamConnector{ - /** - * - * @param from - * @param to - * @tparam T1 - * @tparam T2 - * @return - */ - def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2]):ShuffleConnector[T1,T2] = ShuffleConnector(from,to) - - /** - * Clone connector from old connector to apply to new processing element, return ShuffleConnector by default - * - * @param from - * @param to - * @param connector - * @tparam T1 - * @tparam T2 - * @return - */ - def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2],connector: StreamConnector[Any,Any]):StreamConnector[T1,T2] = connector match { - case GroupbyFieldsConnector(_,_,fields) => GroupbyFieldsConnector[T1,T2](from,to,fields) - case GroupbyKeyConnector(_,_,keySelector) => GroupbyKeyConnector[T1,T2](from,to,keySelector) - case GroupbyStrategyConnector(_,_,strategy) => GroupbyStrategyConnector[T1,T2](from,to,strategy) - case null | ShuffleConnector(_,_) => ShuffleConnector[T1,T2](from,to) - case c@_ => throw new IllegalArgumentException(s"Unknown type of stream connector $c") - } - - /** - * - * @param from - * @param to - * @param groupByFields - * @tparam T1 - * @tparam T2 - * @return - */ - def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2],groupByFields : Seq[Int]):GroupbyFieldsConnector[T1,T2] = GroupbyFieldsConnector(from,to,groupByFields) - - /** - * - * @param from - * @param to - * @param customGroupBy - * @tparam T1 - * @tparam T2 - * @return - */ - def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2],customGroupBy: PartitionStrategy):GroupbyStrategyConnector[T1,T2] = GroupbyStrategyConnector(from,to,customGroupBy) - - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala deleted file mode 100644 index 7845740..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.datastream.core - -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -import scala.collection.JavaConverters._ -import scala.collection.{JavaConversions, mutable} - -/** - * wrapper of DAG, used for storm topology compiler - */ -class StreamDAG(val graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) extends StreamProducerGraph { - var nodeMap: mutable.Map[String, StreamProducer[Any]] = mutable.Map[String,StreamProducer[Any]]() - graph.iterator().asScala.foreach(p=> nodeMap.put(p.name,p)) - - override def addEdge(from: StreamProducer[Any], to: StreamProducer[Any], streamConnector: StreamConnector[Any,Any]): Unit = { - graph.addEdge(from, to, streamConnector) - } - - override def addVertex(producer: StreamProducer[Any]): Unit = { - graph.addVertex(producer) - nodeMap.put(producer.name,producer) - } - - override def iterator(): Iterator[StreamProducer[Any]] = { - JavaConversions.asScalaIterator(graph.iterator()) - } - - override def isSource(v: StreamProducer[Any]): Boolean = { - graph.inDegreeOf(v) match { - case 0 => true - case _ => false - } - } - - override def outgoingEdgesOf(v: StreamProducer[Any]): scala.collection.mutable.Set[StreamConnector[Any,Any]] = { - JavaConversions.asScalaSet(graph.outgoingEdgesOf(v)) - } - - override def getNodeByName(name: String): Option[StreamProducer[Any]] = { - nodeMap.get(name) - } - - def setNodeMap(nodeMap: mutable.Map[String, StreamProducer[Any]]): Unit = { - this.nodeMap = nodeMap - } - - override def incomingVertexOf(v: StreamProducer[Any]): scala.collection.mutable.Set[StreamProducer[Any]] = { - val set = mutable.Set[StreamProducer[Any]]() - graph.incomingEdgesOf(v).asScala.foreach(e => set += graph.getEdgeSource(e)) - set - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala deleted file mode 100644 index fcff639..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.datastream.core - -import com.typesafe.config.Config -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -abstract class StreamDAGExpansion(config: Config) { - def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala deleted file mode 100644 index 6b20bf2..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -import scala.collection.mutable - -/** - * convert generic DAG data structure to Storm specific DAG data structure for easy topology compiler - */ -object StreamDAGTransformer { - /** - * Transform DirectedAcyclicGraph[StreamProducer, StreamConnector] into StormStreamDAG - * - * @param dag DirectedAcyclicGraph[StreamProducer, StreamConnector] - * @return StormStreamDAG - */ - @deprecated("Use StreamDAG(dag) will transform directly") - def transform(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) : StreamDAG = { - val stormDAG = new StreamDAG(dag) - val nodeMap = mutable.HashMap[String, StreamProducer[Any]]() - val iter = dag.iterator() - while(iter.hasNext){ - val sp = iter.next() - nodeMap.put(sp.name, sp) - } - stormDAG.setNodeMap(nodeMap) - stormDAG - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala deleted file mode 100644 index 1a07e3f..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import com.typesafe.config.Config -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -import scala.collection.JavaConversions._ -import scala.collection.mutable.ListBuffer - -/** - * Replace GroupByProducer(Vertex) with StreamConnector (Edge) - * - * For example as to Storm, it's mainly for grouping method - * - * @param config context configuration - */ -case class StreamGroupbyExpansion(config: Config) extends StreamDAGExpansion(config){ - override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = { - val iter = dag.iterator() - var toBeAddedEdges = new ListBuffer[StreamConnector[Any,Any]] - var toBeRemovedVertex = new ListBuffer[StreamProducer[Any]] - while(iter.hasNext) { - val current = iter.next() - dag.outgoingEdgesOf(current).foreach(edge => { - val child = edge.to - child match { - case p : GroupByProducer[Any] => { - dag.outgoingEdgesOf(p).foreach(c2 => { - p match { - case GroupByFieldProducer(fields) => - toBeAddedEdges += GroupbyFieldsConnector(current, c2.to,fields) - case GroupByStrategyProducer(strategy) => - toBeAddedEdges += GroupbyStrategyConnector(current, c2.to,strategy) - case GroupByKeyProducer(keySelector) => - current.outKeyed = true - current.keySelector = KeySelectorWrapper(keySelector) - c2.to.inKeyed = true - toBeAddedEdges += GroupbyKeyConnector(current, c2.to,keySelector) - case _ => toBeAddedEdges += ShuffleConnector(current, c2.to) - } - }) - toBeRemovedVertex += p - } - case _ => - } - }) - } - - // add back edges - toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e)) - toBeRemovedVertex.foreach(v => dag.removeVertex(v)) - } -} - -object StreamGroupbyExpansion{ - def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamGroupbyExpansion ={ - val e = StreamGroupbyExpansion(config) - e.expand(dag) - e - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala deleted file mode 100644 index 4bc1812..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import com.typesafe.config.Config -import org.apache.eagle.datastream.utils.NodeNameSelector -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory - -/** - * to set name for each StreamProducer - * 1. if name is given programatically, then use this name - * 2. otherwise use name generated by scala internally - */ -case class StreamNameExpansion(config: Config) extends StreamDAGExpansion(config){ - val LOG = LoggerFactory.getLogger(classOf[StreamNameExpansion]) - - override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = { - val iter = dag.iterator() - while(iter.hasNext){ - val sp = iter.next() - sp.name = NodeNameSelector(sp).getName - } - } -} - - -object StreamNameExpansion{ - def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamNameExpansion ={ - val e = StreamNameExpansion(config) - e.expand(dag) - e - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala deleted file mode 100644 index 8699da6..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import java.util.regex.Pattern - -import com.typesafe.config.{Config, ConfigObject, ConfigValue} -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory - -import scala.collection.JavaConverters._ - -case class StreamParallelismConfigExpansion(config: Config) extends StreamDAGExpansion(config){ - val LOG = LoggerFactory.getLogger(classOf[StreamParallelismConfigExpansion]) - - override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = { - val map = getParallelismMap(config) - val iter = dag.iterator() - while(iter.hasNext){ - val streamProducer = iter.next() - if(streamProducer.name != null) { - map.foreach(tuple => { - tuple._1.matcher(streamProducer.name).find() match { - case true => streamProducer.parallelism(tuple._2) - case false => - } - }) - } - } - } - - private def getParallelismMap(config: Config) : Map[Pattern, Int]= { - if(config.hasPath("envContextConfig.parallelismConfig")) { - val parallelismConfig: ConfigObject = config.getObject("envContextConfig.parallelismConfig") - parallelismConfig.asScala.toMap map { - case (name, value) => (Pattern.compile(name), value.asInstanceOf[ConfigValue].unwrapped().asInstanceOf[Int]) - } - }else{ - Map[Pattern,Int]() - } - } -} - -object StreamParallelismConfigExpansion{ - def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamParallelismConfigExpansion ={ - val e = StreamParallelismConfigExpansion(config) - e.expand(dag) - e - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala deleted file mode 100644 index 6445643..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala +++ /dev/null @@ -1,342 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.core - -import java.util -import java.util.concurrent.atomic.AtomicInteger - -import backtype.storm.topology.base.BaseRichSpout -import com.typesafe.config.Config -import org.apache.eagle.alert.entity.AlertAPIEntity -import org.apache.eagle.datastream.{FlatMapperWrapper, Collector, FlatMapper} -import org.apache.eagle.partition.PartitionStrategy -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory - -import scala.collection.JavaConversions.{asScalaBuffer, seqAsJavaList} -import scala.collection.JavaConverters.asScalaBufferConverter -/** - * StreamProducer = StreamInfo + StreamProtocol - * - * StreamProducer is processing logic element, used the base class for all other concrete StreamProducer - * Stream Producer can be treated as logical processing element but physical - * It defines high level type-safe API for user to organize data stream flow - * - * StreamProducer is independent of execution environment - * - * @tparam T processed elements type - */ -abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[T] { - private var graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]] = null - private val LOG = LoggerFactory.getLogger(classOf[StreamProducer[T]]) - - /** - * Should not modify graph when setting it - */ - def initWith(graph:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]],config:Config, hook:Boolean = true):StreamProducer[T]={ - this.config = config - this.graph = graph - if(hook && ! this.graph.containsVertex(this)) { - this.graph.addVertex(this) - } - LOG.debug(this.graph.toString) - this - } - - /** - * Get stream pure metadata info - * @return - */ - override def getInfo:StreamInfo = this.asInstanceOf[StreamInfo] - - override def stream(streamId:String):StreamProducer[T] = { - this.streamId = streamId - this - } - - override def filter(fn : T => Boolean): StreamProducer[T] ={ - val ret = FilterProducer[T](fn) - connect(this, ret) - ret - } - - override def flatMap[R](flatMapper:FlatMapper[R]): StreamProducer[R] = { - val ret = FlatMapProducer[T,R](flatMapper) - connect(this, ret) - ret - } - override def flatMap[R](func:(Any,Collector[R])=>Unit): StreamProducer[R] = { - val ret = FlatMapProducer[T,R](FlatMapperWrapper[R](func)) - connect(this, ret) - ret - } - - - override def foreach(fn : T => Unit) : Unit = { - val ret = ForeachProducer[T](fn) - connect(this, ret) - } - - override def map[R](fn : T => R) : StreamProducer[R] = { - val ret = MapperProducer[T,R](0,fn) - connect(this, ret) - ret - } - - override def map1[R](fn : T => R): StreamProducer[R] = { - val ret = MapperProducer[T,R](1, fn) - connect(this, ret) - ret - } - - override def map2[R](fn : T => R): StreamProducer[R] = { - val ret = MapperProducer[T,R](2, fn) - connect(this, ret) - ret - } - - override def map3[R](fn : T => R): StreamProducer[R] = { - val ret = MapperProducer(3, fn) - connect(this, ret) - ret - } - - override def map4[R](fn : T => R): StreamProducer[R] = { - val ret = MapperProducer(4, fn) - connect(this, ret) - ret - } - - /** - * starting from 0, groupby operator would be upon edge of the graph - */ - override def groupBy(fields : Int*) : StreamProducer[T] = { - // validate each field index is greater or equal to 0 - fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0")) - val ret = GroupByFieldProducer[T](fields) - connect(this, ret) - ret - } - - def groupByFieldIndex(fields : Seq[Int]) : StreamProducer[T] = { - // validate each field index is greater or equal to 0 - fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0")) - val ret = GroupByFieldProducer[T](fields) - connect(this, ret) - ret - } - - //groupBy java version, starting from 1 - override def groupBy(fields : java.util.List[Integer]) : StreamProducer[T] = { - // validate each field index is greater or equal to 0 - fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0")) - val ret = GroupByFieldProducer[T](fields.asScala.toSeq.asInstanceOf[Seq[Int]]) - connect(this, ret) - ret - } - - override def groupByKey(keySelector: T=> Any):StreamProducer[T] = { - val ret = GroupByKeyProducer(keySelector) - connect(this,ret) - ret - } - - override def streamUnion[T2,T3](others : Seq[StreamProducer[T2]]) : StreamProducer[T3] = { - val ret = StreamUnionProducer[T, T2, T3](others) - connect(this, ret) - ret - } - - def streamUnion[T2,T3](other : StreamProducer[T2]) : StreamProducer[T3] = { - streamUnion(Seq(other)) - } - - def streamUnion[T2,T3](others : util.List[StreamProducer[T2]]) : StreamProducer[T3] = streamUnion(others.asScala) - - override def groupBy(strategy : PartitionStrategy) : StreamProducer[T] = { - val ret = GroupByStrategyProducer(strategy) - connect(this, ret) - ret - } - - /** - * alert is always sink of data flow - */ - def alertWithConsumer(upStreamNames: util.List[String], alertExecutorId : String) = { - alert(upStreamNames.asScala, alertExecutorId,consume = true) - } - - def alertWithoutConsumer(upStreamNames: util.List[String], alertExecutorId : String) = { - alert(upStreamNames.asScala, alertExecutorId, consume = false) - } - - override def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean=true, strategy : PartitionStrategy=null):AlertStreamProducer = { - val ret = AlertStreamProducer(upStreamNames, alertExecutorId, consume, strategy) - connect(this, ret) - ret - } - - def alertWithConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={ - alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = true, strategy = strategy) - } - - def alertWithConsumer(upStreamName: String, alertExecutorId : String): Unit ={ - alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = true) - } - - def alertWithoutConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={ - alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = false, strategy) - } - - def alertWithoutConsumer(upStreamName: String, alertExecutorId : String): Unit ={ - alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = false) - } - - def aggregate(upStreamNames: java.util.List[String], queryExecutorId : String, strategy: PartitionStrategy = null): StreamProducer[T] = { - val ret= AggregateProducer(upStreamNames, queryExecutorId, null, strategy) - connect(this, ret) - ret - } - - def aggregateDirect(upStreamNames: java.util.List[String], cql : String, strategy: PartitionStrategy): StreamProducer[T] = { - val ret= AggregateProducer(upStreamNames, null, cql, strategy) - connect(this, ret) - ret - } - - def persist(executorId : String, storageType: StorageType.StorageType) : StreamProducer[T] = { - val ret = PersistProducer(executorId, storageType) - connect(this, ret) - ret - } - - def connect[T1,T2](current: StreamProducer[T1], next: StreamProducer[T2]) = { - if(current.graph == null) throw new NullPointerException(s"$current has not been registered to any graph before being connected") - current.graph.addVertex(next) - current.graph.addEdge(current, next, StreamConnector(current, next)) - passOnContext[T1,T2](current, next) - } - - def connect[T2]( next: StreamProducer[T2]) = { - if(this.graph == null) throw new NullPointerException("graph is null") - this.graph.addVertex(next) - this.graph.addEdge(this, next, StreamConnector(this, next)) - passOnContext[T,T2](this, next) - } - - private def passOnContext[T1 ,T2](current: StreamProducer[T1], next: StreamProducer[T2]): Unit ={ - next.initWith(current.graph,current.config) - } - - /** - * can be set by programatically or by configuration - */ - override def parallelism(parallelism : Int) : StreamProducer[T] = { - this.parallelismNum = parallelism - this - } - - override def parallelism : Int = this.parallelismNum - override def stream:String = this.streamId - - /** - * Component name - * - * @param componentName component name - * @return - */ - override def nameAs(componentName : String) : StreamProducer[T] = { - this.name = componentName - this - } -} - -case class FilterProducer[T](fn : T => Boolean) extends StreamProducer[T]{ - override def toString: String = s"FilterProducer" -} - -case class FlatMapProducer[T, R](var mapper: FlatMapper[R]) extends StreamProducer[R]{ - override def toString: String = mapper.toString -} - -case class MapperProducer[T,R](numOutputFields : Int, var fn : T => R) extends StreamProducer[R]{ - override def toString: String = s"MapperProducer" -} - -case class ForeachProducer[T](var fn : T => Unit) extends StreamProducer[T] - -abstract class GroupByProducer[T] extends StreamProducer[T] -case class GroupByFieldProducer[T](fields : Seq[Int]) extends GroupByProducer[T] -case class GroupByStrategyProducer[T](partitionStrategy: PartitionStrategy) extends GroupByProducer[T] -case class GroupByKeyProducer[T](keySelectorFunc:T => Any) extends GroupByProducer[T]{ - override def toString: String = s"GroupByKey" -} - -object GroupByProducer { - def apply[T](fields: Seq[Int]) = new GroupByFieldProducer[T](fields) - def apply[T](partitionStrategy : PartitionStrategy) = new GroupByStrategyProducer[T](partitionStrategy) - def apply[T](keySelector:T => Any) = new GroupByKeyProducer[T](keySelector) -} - -case class StreamUnionProducer[T1,T2,T3](others: Seq[StreamProducer[T2]]) extends StreamProducer[T3] - -case class StormSourceProducer[T](source: BaseRichSpout) extends StreamProducer[T]{ - var numFields : Int = 0 - - /** - * rename outputfields to f0, f1, f2, ... - * if one spout declare some field names, those fields names will be modified - * @param n - */ - def withOutputFields(n : Int): StormSourceProducer[T] ={ - this.numFields = n - this - } -} - -case class IterableStreamProducer[T](iterable: Iterable[T],recycle:Boolean = false) extends StreamProducer[T]{ - override def toString: String = s"IterableStreamProducer(${iterable.getClass.getSimpleName}))" -} -case class IteratorStreamProducer[T](iterator: Iterator[T]) extends StreamProducer[T]{ - override def toString: String = s"IteratorStreamProducer(${iterator.getClass.getSimpleName})" -} - -case class AlertStreamProducer(var upStreamNames: util.List[String], alertExecutorId : String, var consume: Boolean=true, strategy: PartitionStrategy=null) extends StreamProducer[AlertAPIEntity] { - def consume(consume: Boolean): AlertStreamProducer = { - this.consume = consume - this - } -} - -case class AggregateProducer[T](var upStreamNames: util.List[String], analyzerId : String, cepQl: String = null, strategy:PartitionStrategy = null) extends StreamProducer[T] - -case class PersistProducer[T](executorId :String, storageType: StorageType.StorageType) extends StreamProducer[T] - -object UniqueId{ - val id : AtomicInteger = new AtomicInteger(0); - def incrementAndGetId() : Int = { - id.incrementAndGet() - } -} - -trait KeySelector extends Serializable{ - def key(t:Any):Any -} - -case class KeySelectorWrapper[T](fn:T => Any) extends KeySelector{ - override def key(t: Any): Any = fn(t.asInstanceOf[T]) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala deleted file mode 100644 index f3fcc4d..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.datastream.core - -trait StreamProducerGraph { - def addEdge(from: StreamProducer[Any], to: StreamProducer[Any], streamConnector: StreamConnector[Any,Any]) - def addVertex(producer: StreamProducer[Any]) - def iterator() : Iterator[StreamProducer[Any]] - def isSource(v : StreamProducer[Any]) : Boolean - def outgoingEdgesOf(v : StreamProducer[Any]) : scala.collection.mutable.Set[StreamConnector[Any,Any]] - def getNodeByName(name : String) : Option[StreamProducer[Any]] - def incomingVertexOf(v: StreamProducer[Any]) : scala.collection.mutable.Set[StreamProducer[Any]] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala deleted file mode 100644 index b54b21f..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import com.typesafe.config.Config -import org.apache.commons.lang3.builder.HashCodeBuilder -import org.apache.eagle.datastream.{Collector, FlatMapper} -import org.apache.eagle.partition.PartitionStrategy -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -/** - * StreamInfo should be fully serializable and having not runtime type information - */ -class StreamInfo extends Serializable{ - /** - * Processing Element Id - */ - val id:Int = UniqueId.incrementAndGetId() - - /** - * Processing Element Name - */ - var name: String = null - - /** - * Output stream id, equals to name by default - */ - var streamId:String=null - - var parallelismNum: Int = 1 - - /** - * Keyed input stream - */ - var inKeyed:Boolean = false - /** - * Keyed output stream - */ - var outKeyed:Boolean = false - /** - * Output key selector - */ - var keySelector:KeySelector = null - -// Type Information -// ================ -// -// /** -// * Entity class type of T -// */ -// var typeClass:Class[_] = null -// -// /** -// * Type Class Simple Name -// * @return -// */ -// def typeClassName = if(typeClass == null) null else typeClass.getSimpleName -// -// @transient private var _typeTag[_] = null -// -// def typeTag[_] = { -// if(_typeTag == null) _typeTag = Reflections.typeTag(this.typeClass) -// _typeTag -// } - - var config: Config = null - - def getInfo = this - - override def hashCode(): Int = new HashCodeBuilder().append(this.id).append(this.getClass).toHashCode -} - - -object StorageType extends Enumeration { - type StorageType = Value - val KAFKA, DRUID, HBASE = Value -} - -/** - * Stream interaction protocol interface - * - * @tparam T processed elements type - */ -trait StreamProtocol[+T <: Any]{ - /** - * Initialize the stream metadata info - */ - def initWith(graph:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]],config:Config, hook:Boolean = true):StreamProducer[T] - - /** - * Support Java API - * - * @param flatMapper - * @tparam R - * @return - */ - def flatMap[R](flatMapper:FlatMapper[R]): StreamProducer[R] - def flatMap[R](func:(Any,Collector[R])=>Unit): StreamProducer[R] - - /** - * - * @param fn - * @return - */ - def filter(fn : T => Boolean): StreamProducer[T] - - /** - * - * @param fn - */ - def foreach(fn : T => Unit) : Unit - - /** - * Type safe mapper - * @param fn - * @tparam R - * @return - */ - def map[R](fn : T => R): StreamProducer[R] - - /** - * Field base mapper - * @param fn - * @tparam R - * @return - */ - def map1[R](fn : T => R) : StreamProducer[R] - def map2[R](fn : T => R) : StreamProducer[R] - def map3[R](fn : T => R) : StreamProducer[R] - def map4[R](fn : T => R) : StreamProducer[R] - - def groupBy(fields : Int*) : StreamProducer[T] - def groupBy(fields : java.util.List[Integer]) : StreamProducer[T] - def groupBy(strategy : PartitionStrategy) : StreamProducer[T] - - /** - * @param keyer key selector function - * @return - */ - def groupByKey(keyer:T => Any):StreamProducer[T] - - def streamUnion[T2,T3](otherStreams : Seq[StreamProducer[T2]]) : StreamProducer[T3] - def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean,strategy : PartitionStrategy):AlertStreamProducer - - def aggregate(upStreamNames: java.util.List[String], executorId :String, strategy:PartitionStrategy): StreamProducer[T] - - def aggregateDirect(upStreamNames: java.util.List[String], cql : String, strategy:PartitionStrategy): StreamProducer[T] - - def persist(executorId : String, storageType: StorageType.StorageType): StreamProducer[T] - - /** - * Set processing element parallelism setting - * @param parallelismNum parallelism value - * @return - */ - def parallelism(parallelismNum : Int) : StreamProducer[T] - def parallelism : Int - /** - * Set component name - * - * @param componentName - * @return - */ - def nameAs(componentName : String) : StreamProducer[T] - - /** - * Set stream name - * @param streamId stream ID - * @return - */ - def stream(streamId: String): StreamProducer[T] - def stream: String - - def ? (fn:T => Boolean):StreamProducer[T] = this.filter(fn) - def ~>[R](flatMapper : FlatMapper[R]) = this.flatMap[R](flatMapper) - def ! (upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean = true,strategy: PartitionStrategy = null) = alert(upStreamNames, alertExecutorId, consume,strategy) - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala deleted file mode 100644 index 5f3bd22..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.core - -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -import scala.reflect.runtime.{universe => ru} - -/** - * @since 12/7/15 - */ -trait StreamSourceBuilder { - def config:Configuration - - /** - * Business logic DAG - * @return - */ - def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]] - - /** - * - * @param iterable top level Iterable interface - * @param recycle - * @tparam T - * @return - */ - def from[T:ru.TypeTag](iterable: Iterable[T],recycle:Boolean = false):IterableStreamProducer[T]={ - val p = IterableStreamProducer[T](iterable,recycle) - p.initWith(dag,config.get) - p - } - - def from[T:ru.TypeTag](iterator: Iterator[T],recycle:Boolean):IteratorStreamProducer[T]={ - val p = IteratorStreamProducer[T](iterator) - p.initWith(dag,config.get) - p - } - - def from(product: Product):IteratorStreamProducer[Any]={ - val p = IteratorStreamProducer[Any](product.productIterator) - p.initWith(dag,config.get) - p - } - - def register[T](producer:StreamProducer[T]):Unit = { - producer.initWith(dag,config.get) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala deleted file mode 100644 index 351782b..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.datastream.core - -import com.typesafe.config.Config -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory - -import scala.collection.JavaConversions._ -import scala.collection.mutable.ListBuffer - -/** - * union operator should be expanded - */ -case class StreamUnionExpansion(config: Config) extends StreamDAGExpansion(config){ - val LOG = LoggerFactory.getLogger(classOf[StreamUnionExpansion]) - - override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = { - val iter = dag.iterator() - var toBeAddedEdges = new ListBuffer[StreamConnector[Any,Any]] - var toBeRemovedVertex = new ListBuffer[StreamProducer[Any]] - while(iter.hasNext) { - val current = iter.next() - dag.outgoingEdgesOf(current).foreach(edge => { - val child = edge.to - child match { - case StreamUnionProducer(others) => { - dag.outgoingEdgesOf(child).foreach(c2 => { - toBeAddedEdges += StreamConnector(current, c2.to, edge) - others.foreach(o => { - toBeAddedEdges += StreamConnector(o, c2.to, edge) - }) - }) - toBeRemovedVertex += child - } - case _ => - } - }) - } - - // add back edges - toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e)) - toBeRemovedVertex.foreach(v => dag.removeVertex(v)) - } -} - -object StreamUnionExpansion{ - def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamUnionExpansion ={ - val e = StreamUnionExpansion(config) - e.expand(dag) - e - } -} - http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala deleted file mode 100644 index 64b5f0f..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.storm - -import java.util - -import backtype.storm.task.{OutputCollector, TopologyContext} -import backtype.storm.topology.OutputFieldsDeclarer -import backtype.storm.topology.base.BaseRichBolt -import backtype.storm.tuple.{Fields, Tuple} -import org.apache.eagle.datastream.core.StreamInfo -import org.apache.eagle.datastream.utils.NameConstants -import org.slf4j.LoggerFactory - -/** - * - * @param fieldsNum zero-fieldsNum may means something different - * @param ack - * @param streamInfo - * @tparam T - */ -abstract class AbstractStreamBolt[T](val fieldsNum:Int=1, val ack:Boolean = true)(implicit streamInfo:StreamInfo) extends BaseRichBolt{ - private var _collector: OutputCollector = null - private val LOG = LoggerFactory.getLogger(classOf[AbstractStreamBolt[T]]) - - /** - * If outKeyed then - * Fields = ("key","value"] - * elsif num > 0 - * Fields = ["f0","f1",..,"fn"] - * elsif num == 0 - * Fields = ["f0"] - * end - * - * @param declarer - */ - override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { - if(streamInfo.outKeyed) { - declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE)) - }else{ - if(fieldsNum > 0) { - val fields = new util.ArrayList[String]() - var i: Int = 0 - while (i < fieldsNum) { - fields.add(NameConstants.FIELD_PREFIX + i) - i += 1 - } - declarer.declare(new Fields(fields)) - }else if(fieldsNum == 0){ - declarer.declare(new Fields(NameConstants.FIELD_PREFIX + 0)) - } - } - } - - def emit(values:util.List[AnyRef])(implicit input:Tuple){ - if (streamInfo.outKeyed) { - _collector.emit(input, util.Arrays.asList(streamInfo.keySelector.key(values).asInstanceOf[AnyRef], values)) - } else { - _collector.emit(input, values) - } - } - - def emit(value:Any)(implicit input:Tuple){ - if(streamInfo.outKeyed) { - _collector.emit(input, util.Arrays.asList(streamInfo.keySelector.key(value).asInstanceOf[AnyRef],value.asInstanceOf[AnyRef])) - }else{ - _collector.emit(input,util.Arrays.asList(value.asInstanceOf[AnyRef])) - } - } - - override def execute(input: Tuple): Unit = { - try { - implicit val _input = input - if (streamInfo.inKeyed) { - val key = input.getValueByField(NameConstants.FIELD_KEY) - val value = input.getValueByField(NameConstants.FIELD_VALUE).asInstanceOf[T] - onKeyValue(key, value) - } else { - onValues(input.getValues) - } - if(ack) _collector.ack(input) - }catch { - case t: Throwable => { - LOG.error(s"Got exception when processing $input",t) - _collector.fail(input) - } - } - } - - /** - * Handle keyed stream value - */ - def onKeyValue(key:Any,value:T)(implicit input:Tuple) - - /** - * Handle general stream values list - * - * @param values - */ - def onValues(values:util.List[AnyRef])(implicit input:Tuple) - - override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = { - _collector = collector - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala deleted file mode 100644 index b175b41..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.storm - -import java.util - -import backtype.storm.tuple.Tuple -import org.apache.eagle.datastream.core.StreamInfo - -case class FilterBoltWrapper(fn:Any => Boolean)(implicit info:StreamInfo) extends AbstractStreamBolt[Any](fieldsNum = 1){ - /** - * Handle keyed stream value - */ - override def onKeyValue(key: Any, value: Any)(implicit input:Tuple): Unit = { - if(fn(value)) emit(value) - } - - /** - * Handle general stream values list - * - * @param values - */ - override def onValues(values: util.List[AnyRef])(implicit input:Tuple): Unit = { - val value = values.get(0) - if(fn(value)) emit(value) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala deleted file mode 100644 index 4c105b2..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.storm - -import java.util - -import backtype.storm.tuple.Tuple -import org.apache.eagle.datastream.core.StreamInfo - -/** - * @since 12/6/15 - */ -case class ForeachBoltWrapper(fn:Any=>Unit)(implicit info:StreamInfo) extends AbstractStreamBolt[Any] { - /** - * Handle keyed stream value - * @param value - */ - override def onKeyValue(key:Any,value: Any)(implicit input:Tuple): Unit = { - fn(value) - } - - /** - * Handle non-keyed stream values list - * - * @param values - */ - override def onValues(values: util.List[AnyRef])(implicit input:Tuple): Unit = { - fn(values) - } -} \ No newline at end of file