Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 905F210BE7 for ; Sun, 4 Jan 2015 20:52:08 +0000 (UTC) Received: (qmail 39025 invoked by uid 500); 4 Jan 2015 20:52:09 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 38993 invoked by uid 500); 4 Jan 2015 20:52:09 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 38932 invoked by uid 99); 4 Jan 2015 20:52:09 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Jan 2015 20:52:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sun, 04 Jan 2015 20:50:56 +0000 Received: (qmail 37398 invoked by uid 99); 4 Jan 2015 20:50:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Jan 2015 20:50:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 10F94A3E8E5; Sun, 4 Jan 2015 20:50:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbalassi@apache.org To: commits@flink.incubator.apache.org Date: Sun, 04 Jan 2015 20:51:15 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [25/27] incubator-flink git commit: [streaming] [scala] Restructured streaming scala project and examples X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala new file mode 100644 index 0000000..d587d56 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import scala.Array.canBuildFrom +import scala.reflect.ClassTag + +import org.apache.commons.lang.Validate +import org.apache.flink.api.common.functions.JoinFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.operators.Keys +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.typeutils.CaseClassSerializer +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream} +import org.apache.flink.streaming.api.datastream.TemporalOperator +import org.apache.flink.streaming.api.function.co.JoinWindowFunction +import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean +import org.apache.flink.streaming.api.scala.StreamingConversions._ +import org.apache.flink.streaming.util.keys.KeySelectorUtil + +class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends +TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) { + + override def createNextWindowOperator() = { + new StreamJoinOperator.JoinWindow[I1, I2](this) + } +} + +object StreamJoinOperator { + + class JoinWindow[I1, I2](private[flink] op: StreamJoinOperator[I1, I2]) { + + private[flink] val type1 = op.input1.getType(); + + /** + * Continues a temporal Join transformation by defining + * the fields in the first stream to be used as keys for the join. + * The resulting incomplete join can be completed by JoinPredicate.equalTo() + * to define the second key. + */ + def where(fields: Int*) = { + new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys( + new Keys.ExpressionKeys(fields.toArray,type1),type1)) + } + + /** + * Continues a temporal Join transformation by defining + * the fields in the first stream to be used as keys for the join. + * The resulting incomplete join can be completed by JoinPredicate.equalTo() + * to define the second key. + */ + def where(firstField: String, otherFields: String*) = + new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys( + new Keys.ExpressionKeys(firstField +: otherFields.toArray,type1),type1)) + + /** + * Continues a temporal Join transformation by defining + * the keyselector function that will be used to extract keys from the first stream + * for the join. + * The resulting incomplete join can be completed by JoinPredicate.equalTo() + * to define the second key. + */ + def where[K: TypeInformation](fun: (I1) => K) = { + val keyType = implicitly[TypeInformation[K]] + val keyExtractor = new KeySelector[I1, K] { + val cleanFun = op.input1.clean(fun) + def getKey(in: I1) = cleanFun(in) + } + new JoinPredicate[I1, I2](op, keyExtractor) + } + + } + + class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], + private[flink] val keys1: KeySelector[I1, _]) { + private[flink] var keys2: KeySelector[I2, _] = null + private[flink] val type2 = op.input2.getType(); + + /** + * Creates a temporal join transformation by defining the second join key. + * The returned transformation wrapes each joined element pair in a tuple2: + * (first, second) + * To define a custom wrapping, use JoinedStream.apply(...) + */ + def equalTo(fields: Int*): JoinedStream[I1, I2] = { + finish(KeySelectorUtil.getSelectorForKeys( + new Keys.ExpressionKeys(fields.toArray,type2),type2)) + } + + /** + * Creates a temporal join transformation by defining the second join key. + * The returned transformation wrapes each joined element pair in a tuple2: + * (first, second) + * To define a custom wrapping, use JoinedStream.apply(...) + */ + def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = + finish(KeySelectorUtil.getSelectorForKeys( + new Keys.ExpressionKeys(firstField +: otherFields.toArray,type2),type2)) + + /** + * Creates a temporal join transformation by defining the second join key. + * The returned transformation wrapes each joined element pair in a tuple2: + * (first, second) + * To define a custom wrapping, use JoinedStream.apply(...) + */ + def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = { + val keyType = implicitly[TypeInformation[K]] + val keyExtractor = new KeySelector[I2, K] { + val cleanFun = op.input1.clean(fun) + def getKey(in: I2) = cleanFun(in) + } + finish(keyExtractor) + } + + private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = { + this.keys2 = keys2 + new JoinedStream[I1, I2](this, createJoinOperator()) + } + + private def createJoinOperator(): JavaStream[(I1, I2)] = { + + val returnType = new CaseClassTypeInfo[(I1, I2)]( + + classOf[(I1, I2)], Seq(op.input1.getType, op.input2.getType), Array("_1", "_2")) { + + override def createSerializer: TypeSerializer[(I1, I2)] = { + val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) + for (i <- 0 until getArity) { + fieldSerializers(i) = types(i).createSerializer + } + + new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) { + override def createInstance(fields: Array[AnyRef]) = { + (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2]) + } + } + } + } + + return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)) + .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)), + returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2) + } + } + + class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends + DataStream[(I1, I2)](javaStream) { + + private val op = jp.op + + /** + * Sets a wrapper for the joined elements. For each joined pair, the result of the + * udf call will be emitted. + */ + def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = { + + val invokable = new CoWindowInvokable[I1, I2, R]( + clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1, + op.timeStamp2) + + javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), + invokable) + + javaStream.setType(implicitly[TypeInformation[R]]) + } + } + + private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2], + joinFunction: (I1, I2) => R) = { + Validate.notNull(joinFunction, "Join function must not be null.") + + val joinFun = new JoinFunction[I1, I2, R] { + + val cleanFun = jp.op.input1.clean(joinFunction) + + override def join(first: I1, second: I2): R = { + cleanFun(first, second) + } + } + + new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala new file mode 100644 index 0000000..fb3745f --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } +import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream } +import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } +import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream } + +object StreamingConversions { + + implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] = + new DataStream[R](javaStream) + + implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] = + new WindowedDataStream[R](javaWStream) + + implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] = + new SplitDataStream[R](javaStream) + + implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]): + ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream) + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala new file mode 100644 index 0000000..deda3d9 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import scala.Array.canBuildFrom +import scala.collection.JavaConversions.iterableAsScalaIterable +import scala.reflect.ClassTag + +import org.apache.flink.api.common.functions.GroupReduceFunction +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase +import org.apache.flink.api.scala._ +import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator +import org.apache.flink.streaming.api.datastream.{WindowedDataStream => JavaWStream} +import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType +import org.apache.flink.streaming.api.function.aggregation.SumFunction +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean +import org.apache.flink.streaming.api.scala.StreamingConversions._ +import org.apache.flink.streaming.api.windowing.helper.WindowingHelper +import org.apache.flink.streaming.api.windowing.helper._ +import org.apache.flink.util.Collector + +class WindowedDataStream[T](javaStream: JavaWStream[T]) { + + /** + * Defines the slide size (trigger frequency) for the windowed data stream. + * This controls how often the user defined function will be triggered on + * the window. + */ + def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = + javaStream.every(windowingHelper: _*) + + /** + * Groups the elements of the WindowedDataStream using the given + * field positions. The window sizes (evictions) and slide sizes + * (triggers) will be calculated on the whole stream (in a central fashion), + * but the user defined functions will be applied on a per group basis. + *

To get windows and triggers on a per group basis apply the + * DataStream.window(...) operator on an already grouped data stream. + * + */ + def groupBy(fields: Int*): WindowedDataStream[T] = javaStream.groupBy(fields: _*) + + /** + * Groups the elements of the WindowedDataStream using the given + * field expressions. The window sizes (evictions) and slide sizes + * (triggers) will be calculated on the whole stream (in a central fashion), + * but the user defined functions will be applied on a per group basis. + *

To get windows and triggers on a per group basis apply the + * DataStream.window(...) operator on an already grouped data stream. + * + */ + def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T] = + javaStream.groupBy(firstField +: otherFields.toArray: _*) + + /** + * Groups the elements of the WindowedDataStream using the given + * KeySelector function. The window sizes (evictions) and slide sizes + * (triggers) will be calculated on the whole stream (in a central fashion), + * but the user defined functions will be applied on a per group basis. + *

To get windows and triggers on a per group basis apply the + * DataStream.window(...) operator on an already grouped data stream. + * + */ + def groupBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = { + + val keyExtractor = new KeySelector[T, K] { + val cleanFun = clean(fun) + def getKey(in: T) = cleanFun(in) + } + javaStream.groupBy(keyExtractor) + } + + /** + * Applies a reduce transformation on the windowed data stream by reducing + * the current window at every trigger. + * + */ + def reduce(reducer: ReduceFunction[T]): DataStream[T] = { + if (reducer == null) { + throw new NullPointerException("Reduce function must not be null.") + } + javaStream.reduce(reducer) + } + + /** + * Applies a reduce transformation on the windowed data stream by reducing + * the current window at every trigger. + * + */ + def reduce(fun: (T, T) => T): DataStream[T] = { + if (fun == null) { + throw new NullPointerException("Reduce function must not be null.") + } + val reducer = new ReduceFunction[T] { + val cleanFun = clean(fun) + def reduce(v1: T, v2: T) = { cleanFun(v1, v2) } + } + reduce(reducer) + } + + /** + * Applies a reduceGroup transformation on the windowed data stream by reducing + * the current window at every trigger. In contrast with the simple binary reduce operator, + * groupReduce exposes the whole window through the Iterable interface. + *
+ *
+ * Whenever possible try to use reduce instead of groupReduce for increased efficiency + */ + def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]): + DataStream[R] = { + if (reducer == null) { + throw new NullPointerException("GroupReduce function must not be null.") + } + javaStream.reduceGroup(reducer, implicitly[TypeInformation[R]]) + } + + /** + * Applies a reduceGroup transformation on the windowed data stream by reducing + * the current window at every trigger. In contrast with the simple binary reduce operator, + * groupReduce exposes the whole window through the Iterable interface. + *
+ *
+ * Whenever possible try to use reduce instead of groupReduce for increased efficiency + */ + def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit): + DataStream[R] = { + if (fun == null) { + throw new NullPointerException("GroupReduce function must not be null.") + } + val reducer = new GroupReduceFunction[T, R] { + val cleanFun = clean(fun) + def reduce(in: java.lang.Iterable[T], out: Collector[R]) = { cleanFun(in, out) } + } + reduceGroup(reducer) + } + + /** + * Applies an aggregation that that gives the maximum of the elements in the window at + * the given position. + * + */ + def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position) + + /** + * Applies an aggregation that that gives the minimum of the elements in the window at + * the given position. + * + */ + def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position) + + /** + * Applies an aggregation that sums the elements in the window at the given position. + * + */ + def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position) + + /** + * Applies an aggregation that that gives the maximum element of the window by + * the given position. When equality, returns the first. + * + */ + def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY, + position, first) + + /** + * Applies an aggregation that that gives the minimum element of the window by + * the given position. When equality, returns the first. + * + */ + def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY, + position, first) + + def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true): + DataStream[T] = { + + val jStream = javaStream.asInstanceOf[JavaWStream[Product]] + val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]] + + val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position) + + val reducer = aggregationType match { + case AggregationType.SUM => new agg.Sum(SumFunction.getForClass( + outType.getTypeAt(position).getTypeClass())); + case _ => new agg.ProductComparableAggregator(aggregationType, first) + } + + new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]] + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala new file mode 100644 index 0000000..83f2293 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.windowing + +import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta } +import org.apache.commons.lang.Validate +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean; +import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction + +object Delta { + + /** + * Creates a delta helper representing a delta trigger or eviction policy. + *

This policy calculates a delta between the data point which + * triggered last and the currently arrived data point. It triggers if the + * delta is higher than a specified threshold.

In case it gets + * used for eviction, this policy starts from the first element of the + * buffer and removes all elements from the buffer which have a higher delta + * then the threshold. As soon as there is an element with a lower delta, + * the eviction stops. + */ + def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): JavaDelta[T] = { + Validate.notNull(deltaFunction, "Delta function must not be null") + val df = new DeltaFunction[T] { + val cleanFun = clean(deltaFunction) + override def getDelta(first: T, second: T) = cleanFun(first, second) + } + JavaDelta.of(threshold, df, initVal) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala new file mode 100644 index 0000000..e1b9768 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.windowing + +import java.util.concurrent.TimeUnit +import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime } + +import org.apache.flink.api.scala.ClosureCleaner +import org.apache.commons.net.ntp.TimeStamp +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean; +import org.apache.flink.streaming.api.windowing.helper.Timestamp +import org.apache.commons.lang.Validate + +object Time { + + /** + * Creates a helper representing a time trigger which triggers every given + * length (slide size) or a time eviction which evicts all elements older + * than length (window size) using System time. + * + */ + def of(windowSize: Long, timeUnit: TimeUnit): JavaTime[_] = + JavaTime.of(windowSize, timeUnit) + + /** + * Creates a helper representing a time trigger which triggers every given + * length (slide size) or a time eviction which evicts all elements older + * than length (window size) using a user defined timestamp extractor. + * + */ + def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): JavaTime[R] = { + Validate.notNull(timestamp, "Timestamp must not be null.") + val ts = new Timestamp[R] { + val fun = clean(timestamp, true) + override def getTimestamp(in: R) = fun(in) + } + JavaTime.of(windowSize, ts, startTime) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/pom.xml ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/pom.xml b/flink-addons/flink-streaming/pom.xml index c20554b..386ef73 100644 --- a/flink-addons/flink-streaming/pom.xml +++ b/flink-addons/flink-streaming/pom.xml @@ -35,6 +35,7 @@ under the License. flink-streaming-core + flink-streaming-scala flink-streaming-examples flink-streaming-connectors http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala deleted file mode 100644 index e39fb11..0000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.scala.streaming.windowing - - -import java.util.concurrent.TimeUnit._ -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment -import org.apache.flink.api.scala.streaming.windowing.Delta -import org.apache.flink.streaming.api.windowing.helper.Time -import org.apache.flink.util.Collector - -import scala.math.{max, min} -import scala.util.Random - -/** - * An example of grouped stream windowing where different eviction and - * trigger policies can be used.A source fetches events from cars - * every 1 sec containing their id, their current speed (kmh), - * overall elapsed distance (m) and a timestamp. The streaming - * example triggers the top speed of each car every x meters elapsed - * for the last y seconds. - */ -object TopSpeedWindowing { - - case class CarSpeed(carId: Int, speed: Int, distance: Double, time: Long) - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - val env = StreamExecutionEnvironment.getExecutionEnvironment - val cars = env.addSource(carSource _).groupBy("carId") - .window(Time.of(evictionSec, SECONDS)) - .every(Delta.of[CarSpeed](triggerMeters, - (oldSp,newSp) => newSp.distance-oldSp.distance, CarSpeed(0,0,0,0))) - .reduce((x, y) => if (x.speed > y.speed) x else y) - - cars print - - env.execute("TopSpeedWindowing") - - } - - def carSource(out: Collector[CarSpeed]) = { - - val speeds = new Array[Int](numOfCars) - val distances = new Array[Double](numOfCars) - - while (true) { - Thread sleep 1000 - for (i <- 0 until speeds.length) { - speeds(i) = if (Random.nextBoolean) min(100, speeds(i) + 5) else max(0, speeds(i) - 5) - distances(i) += speeds(i) / 3.6d - out.collect(new CarSpeed(i, speeds(i), distances(i), System.currentTimeMillis)) - } - } - } - - def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - if (args.length == 3) { - numOfCars = args(0).toInt - evictionSec = args(1).toInt - triggerMeters = args(2).toDouble - } - else { - System.err.println("Usage: TopSpeedWindowing ") - false - } - } - true - } - - var numOfCars = 2 - var evictionSec = 10 - var triggerMeters = 50d - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala deleted file mode 100644 index caf5eb9..0000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.scala.streaming.windowing - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment -import org.apache.flink.util.Collector -import scala.util.Random - -object WindowJoin { - - case class Name(id: Long, name: String) - case class Age(id: Long, age: Int) - case class Person(name: String, age: Long) - - def main(args: Array[String]) { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - - //Create streams for names and ages by mapping the inputs to the corresponding objects - val names = env.addSource(nameStream _).map(x => Name(x._1, x._2)) - val ages = env.addSource(ageStream _).map(x => Age(x._1, x._2)) - - //Join the two input streams by id on the last second and create new Person objects - //containing both name and age - val joined = - names.join(ages).onWindow(1000) - .where("id").equalTo("id") { (n, a) => Person(n.name, a.age) } - - joined print - - env.execute("WindowJoin") - } - - //Stream source for generating (id, name) pairs - def nameStream(out: Collector[(Long, String)]) = { - val names = Array("tom", "jerry", "alice", "bob", "john", "grace") - - for (i <- 1 to 10000) { - if (i % 100 == 0) Thread.sleep(1000) else { - out.collect((i, names(Random.nextInt(names.length)))) - } - } - } - - //Stream source for generating (id, age) pairs - def ageStream(out: Collector[(Long, Int)]) = { - for (i <- 1 to 10000) { - if (i % 100 == 0) Thread.sleep(1000) else { - out.collect((i, Random.nextInt(90))) - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/pom.xml ---------------------------------------------------------------------- diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml index 4ccbb98..b902655 100644 --- a/flink-scala/pom.xml +++ b/flink-scala/pom.xml @@ -50,12 +50,6 @@ under the License. flink-compiler ${project.version} - - - org.apache.flink - flink-streaming-core - ${project.version} - org.scala-lang http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/java/org/apache/flink/api/scala/streaming/ScalaStreamingAggregator.java ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/streaming/ScalaStreamingAggregator.java b/flink-scala/src/main/java/org/apache/flink/api/scala/streaming/ScalaStreamingAggregator.java deleted file mode 100644 index 2f587d7..0000000 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/streaming/ScalaStreamingAggregator.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.streaming; - -import java.io.Serializable; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; -import org.apache.flink.streaming.api.function.aggregation.AggregationFunction; -import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator; -import org.apache.flink.streaming.api.function.aggregation.SumFunction; - -import scala.Product; - -public class ScalaStreamingAggregator implements Serializable { - - private static final long serialVersionUID = 1L; - - TupleSerializerBase serializer; - Object[] fields; - int length; - int position; - - public ScalaStreamingAggregator(TypeSerializer serializer, int pos) { - this.serializer = (TupleSerializerBase) serializer; - this.length = this.serializer.getArity(); - this.fields = new Object[this.length]; - this.position = pos; - } - - public class Sum extends AggregationFunction { - private static final long serialVersionUID = 1L; - SumFunction sumFunction; - - public Sum(SumFunction func) { - super(ScalaStreamingAggregator.this.position); - this.sumFunction = func; - } - - @Override - public IN reduce(IN value1, IN value2) throws Exception { - for (int i = 0; i < length; i++) { - fields[i] = value2.productElement(i); - } - - fields[position] = sumFunction.add(fields[position], value1.productElement(position)); - - return serializer.createInstance(fields); - } - } - - public class ProductComparableAggregator extends ComparableAggregator { - - private static final long serialVersionUID = 1L; - - public ProductComparableAggregator(AggregationFunction.AggregationType aggregationType, - boolean first) { - super(ScalaStreamingAggregator.this.position, aggregationType, first); - } - - @SuppressWarnings("unchecked") - @Override - public IN reduce(IN value1, IN value2) throws Exception { - Object v1 = value1.productElement(position); - Object v2 = value2.productElement(position); - - int c = comparator.isExtremal((Comparable) v1, v2); - - if (byAggregate) { - if (c == 1) { - return value1; - } - if (first) { - if (c == 0) { - return value1; - } - } - - return value2; - } else { - for (int i = 0; i < length; i++) { - fields[i] = value2.productElement(i); - } - - if (c == 1) { - fields[position] = v1; - } - - return serializer.createInstance(fields); - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala deleted file mode 100644 index 985e512..0000000 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala +++ /dev/null @@ -1,380 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.streaming - -import java.util - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment._ -import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaCStream } -import org.apache.flink.streaming.api.function.co.{ CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction } -import org.apache.flink.streaming.api.invokable.operator.co.{ CoFlatMapInvokable, CoMapInvokable, CoReduceInvokable } -import org.apache.flink.util.Collector -import org.apache.flink.api.scala.streaming.StreamingConversions._ - -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag - -class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { - - /** - * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps - * the output to a common type. The transformation calls a - * @param fun1 for each element of the first input and - * @param fun2 for each element of the second input. Each - * CoMapFunction call returns exactly one element. - * - * The CoMapFunction used to jointly transform the two input - * DataStreams - * @return The transformed { @link DataStream} - */ - def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): - DataStream[R] = { - if (fun1 == null || fun2 == null) { - throw new NullPointerException("Map function must not be null.") - } - val comapper = new CoMapFunction[IN1, IN2, R] { - def map1(in1: IN1): R = clean(fun1)(in1) - def map2(in2: IN2): R = clean(fun2)(in2) - } - - new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]], - new CoMapInvokable[IN1, IN2, R](comapper))) - } - - /** - * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps - * the output to a common type. The transformation calls a - * {@link CoMapFunction#map1} for each element of the first input and - * {@link CoMapFunction#map2} for each element of the second input. Each - * CoMapFunction call returns exactly one element. The user can also extend - * {@link RichCoMapFunction} to gain access to other features provided by - * the {@link RichFuntion} interface. - * - * @param coMapper - * The CoMapFunction used to jointly transform the two input - * DataStreams - * @return The transformed { @link DataStream} - */ - def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): - DataStream[R] = { - if (coMapper == null) { - throw new NullPointerException("Map function must not be null.") - } - - new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]], - new CoMapInvokable[IN1, IN2, R](coMapper))) - } - - /** - * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and - * maps the output to a common type. The transformation calls a - * {@link CoFlatMapFunction#flatMap1} for each element of the first input - * and {@link CoFlatMapFunction#flatMap2} for each element of the second - * input. Each CoFlatMapFunction call returns any number of elements - * including none. The user can also extend {@link RichFlatMapFunction} to - * gain access to other features provided by the {@link RichFuntion} - * interface. - * - * @param coFlatMapper - * The CoFlatMapFunction used to jointly transform the two input - * DataStreams - * @return The transformed { @link DataStream} - */ - def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): - DataStream[R] = { - if (coFlatMapper == null) { - throw new NullPointerException("FlatMap function must not be null.") - } - new DataStream[R](javaStream.addCoFunction("flatMap", implicitly[TypeInformation[R]], - new CoFlatMapInvokable[IN1, IN2, R](coFlatMapper))) - } - - /** - * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and - * maps the output to a common type. The transformation calls a - * @param fun1 for each element of the first input - * and @param fun2 for each element of the second - * input. Each CoFlatMapFunction call returns any number of elements - * including none. - * - * @return The transformed { @link DataStream} - */ - def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, - fun2: (IN2, Collector[R]) => Unit): DataStream[R] = { - if (fun1 == null || fun2 == null) { - throw new NullPointerException("FlatMap functions must not be null.") - } - val flatMapper = new CoFlatMapFunction[IN1, IN2, R] { - def flatMap1(value: IN1, out: Collector[R]): Unit = clean(fun1)(value, out) - def flatMap2(value: IN2, out: Collector[R]): Unit = clean(fun2)(value, out) - } - flatMap(flatMapper) - } - - /** - * GroupBy operation for connected data stream. Groups the elements of - * input1 and input2 according to keyPosition1 and keyPosition2. Used for - * applying function on grouped data streams for example - * {@link ConnectedDataStream#reduce} - * - * @param keyPosition1 - * The field used to compute the hashcode of the elements in the - * first input stream. - * @param keyPosition2 - * The field used to compute the hashcode of the elements in the - * second input stream. - * @return @return The transformed { @link ConnectedDataStream} - */ - def groupBy(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = { - javaStream.groupBy(keyPosition1, keyPosition2) - } - - /** - * GroupBy operation for connected data stream. Groups the elements of - * input1 and input2 according to keyPositions1 and keyPositions2. Used for - * applying function on grouped data streams for example - * {@link ConnectedDataStream#reduce} - * - * @param keyPositions1 - * The fields used to group the first input stream. - * @param keyPositions2 - * The fields used to group the second input stream. - * @return @return The transformed { @link ConnectedDataStream} - */ - def groupBy(keyPositions1: Array[Int], keyPositions2: Array[Int]): - ConnectedDataStream[IN1, IN2] = { - javaStream.groupBy(keyPositions1, keyPositions2) - } - - /** - * GroupBy operation for connected data stream using key expressions. Groups - * the elements of input1 and input2 according to field1 and field2. A field - * expression is either the name of a public field or a getter method with - * parentheses of the {@link DataStream}S underlying type. A dot can be used - * to drill down into objects, as in {@code "field1.getInnerField2()" }. - * - * @param field1 - * The grouping expression for the first input - * @param field2 - * The grouping expression for the second input - * @return The grouped { @link ConnectedDataStream} - */ - def groupBy(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = { - javaStream.groupBy(field1, field2) - } - - /** - * GroupBy operation for connected data stream using key expressions. Groups - * the elements of input1 and input2 according to fields1 and fields2. A - * field expression is either the name of a public field or a getter method - * with parentheses of the {@link DataStream}S underlying type. A dot can be - * used to drill down into objects, as in {@code "field1.getInnerField2()" } - * . - * - * @param fields1 - * The grouping expressions for the first input - * @param fields2 - * The grouping expressions for the second input - * @return The grouped { @link ConnectedDataStream} - */ - def groupBy(fields1: Array[String], fields2: Array[String]): - ConnectedDataStream[IN1, IN2] = { - javaStream.groupBy(fields1, fields2) - } - - /** - * GroupBy operation for connected data stream. Groups the elements of - * input1 and input2 using fun1 and fun2. Used for applying - * function on grouped data streams for example - * {@link ConnectedDataStream#reduce} - * - * @param fun1 - * The function used for grouping the first input - * @param fun2 - * The function used for grouping the second input - * @return @return The transformed { @link ConnectedDataStream} - */ - def groupBy[K: TypeInformation](fun1: IN1 => _, fun2: IN2 => _): - ConnectedDataStream[IN1, IN2] = { - - val keyExtractor1 = new KeySelector[IN1, Any] { - def getKey(in: IN1) = clean(fun1)(in) - } - val keyExtractor2 = new KeySelector[IN2, Any] { - def getKey(in: IN2) = clean(fun2)(in) - } - - javaStream.groupBy(keyExtractor1, keyExtractor2) - } - - /** - * Applies a reduce transformation on a {@link ConnectedDataStream} and maps - * the outputs to a common type. If the {@link ConnectedDataStream} is - * batched or windowed then the reduce transformation is applied on every - * sliding batch/window of the data stream. If the connected data stream is - * grouped then the reducer is applied on every group of elements sharing - * the same key. This type of reduce is much faster than reduceGroup since - * the reduce function can be applied incrementally. - * - * @param coReducer - * The { @link CoReduceFunction} that will be called for every - * element of the inputs. - * @return The transformed { @link DataStream}. - */ - def reduce[R: TypeInformation: ClassTag](coReducer: CoReduceFunction[IN1, IN2, R]): - DataStream[R] = { - if (coReducer == null) { - throw new NullPointerException("Reduce function must not be null.") - } - - new DataStream[R](javaStream.addCoFunction("coReduce", implicitly[TypeInformation[R]], - new CoReduceInvokable[IN1, IN2, R](coReducer))) - } - - /** - * Applies a reduce transformation on a {@link ConnectedDataStream} and maps - * the outputs to a common type. If the {@link ConnectedDataStream} is - * batched or windowed then the reduce transformation is applied on every - * sliding batch/window of the data stream. If the connected data stream is - * grouped then the reducer is applied on every group of elements sharing - * the same key. This type of reduce is much faster than reduceGroup since - * the reduce function can be applied incrementally. - * - * @param reducer1 - * @param reducer2 - * @param mapper1 - * @param mapper2 - * - * @return The transformed { @link DataStream}. - */ - def reduce[R: TypeInformation: ClassTag](reducer1: (IN1, IN1) => IN1, - reducer2: (IN2, IN2) => IN2,mapper1: IN1 => R, mapper2: IN2 => R): DataStream[R] = { - if (mapper1 == null || mapper2 == null) { - throw new NullPointerException("Map functions must not be null.") - } - if (reducer1 == null || reducer2 == null) { - throw new NullPointerException("Reduce functions must not be null.") - } - - val reducer = new CoReduceFunction[IN1, IN2, R] { - def reduce1(value1: IN1, value2: IN1): IN1 = clean(reducer1)(value1, value2) - def map2(value: IN2): R = clean(mapper2)(value) - def reduce2(value1: IN2, value2: IN2): IN2 = clean(reducer2)(value1, value2) - def map1(value: IN1): R = clean(mapper1)(value) - } - reduce(reducer) - } - - /** - * Applies a CoWindow transformation on the connected DataStreams. The - * transformation calls the {@link CoWindowFunction#coWindow} method for for - * time aligned windows of the two data streams. System time is used as - * default to compute windows. - * - * @param coWindowFunction - * The { @link CoWindowFunction} that will be applied for the time - * windows. - * @param windowSize - * Size of the windows that will be aligned for both streams in - * milliseconds. - * @param slideInterval - * After every function call the windows will be slid by this - * interval. - * - * @return The transformed { @link DataStream}. - */ - def windowReduce[R: TypeInformation: ClassTag](coWindowFunction: - CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long) = { - if (coWindowFunction == null) { - throw new NullPointerException("CoWindow function must no be null") - } - - javaStream.windowReduce(coWindowFunction, windowSize, slideInterval) - } - - /** - * Applies a CoWindow transformation on the connected DataStreams. The - * transformation calls the {@link CoWindowFunction#coWindow} method for for - * time aligned windows of the two data streams. System time is used as - * default to compute windows. - * - * @param coWindower - * The coWindowing function to be applied for the time windows. - * @param windowSize - * Size of the windows that will be aligned for both streams in - * milliseconds. - * @param slideInterval - * After every function call the windows will be slid by this - * interval. - * - * @return The transformed { @link DataStream}. - */ - def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], Seq[IN2], - Collector[R]) => Unit, windowSize: Long, slideInterval: Long) = { - if (coWindower == null) { - throw new NullPointerException("CoWindow function must no be null") - } - - val coWindowFun = new CoWindowFunction[IN1, IN2, R] { - def coWindow(first: util.List[IN1], second: util.List[IN2], - out: Collector[R]): Unit = clean(coWindower)(first, second, out) - } - - javaStream.windowReduce(coWindowFun, windowSize, slideInterval) - } - - /** - * Returns the first {@link DataStream}. - * - * @return The first DataStream. - */ - def getFirst(): DataStream[IN1] = { - javaStream.getFirst - } - - /** - * Returns the second {@link DataStream}. - * - * @return The second DataStream. - */ - def getSecond(): DataStream[IN2] = { - javaStream.getSecond - } - - /** - * Gets the type of the first input - * - * @return The type of the first input - */ - def getInputType1(): TypeInformation[IN1] = { - javaStream.getInputType1 - } - - /** - * Gets the type of the second input - * - * @return The type of the second input - */ - def getInputType2(): TypeInformation[IN2] = { - javaStream.getInputType2 - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala deleted file mode 100644 index ccfd176..0000000 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala +++ /dev/null @@ -1,558 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.streaming -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, - SingleOutputStreamOperator, GroupedDataStream} -import org.apache.flink.api.common.typeinfo.TypeInformation -import scala.reflect.ClassTag -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.streaming.api.invokable.operator.MapInvokable -import org.apache.flink.util.Collector -import org.apache.flink.api.common.functions.FlatMapFunction -import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable -import org.apache.flink.api.common.functions.ReduceFunction -import org.apache.flink.streaming.api.invokable.StreamInvokable -import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable -import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable -import org.apache.flink.api.common.functions.ReduceFunction -import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.common.functions.FilterFunction -import org.apache.flink.streaming.api.function.sink.SinkFunction -import org.apache.flink.streaming.api.windowing.helper.WindowingHelper -import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy -import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy -import org.apache.flink.streaming.api.collector.OutputSelector -import scala.collection.JavaConversions._ -import java.util.HashMap -import org.apache.flink.streaming.api.function.aggregation.SumFunction -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase -import org.apache.flink.streaming.api.function.aggregation.AggregationFunction -import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.scala.streaming.StreamingConversions._ - -class DataStream[T](javaStream: JavaStream[T]) { - - /** - * Gets the underlying java DataStream object. - */ - def getJavaStream: JavaStream[T] = javaStream - - /** - * Sets the degree of parallelism of this operation. This must be greater than 1. - */ - def setParallelism(dop: Int): DataStream[T] = { - javaStream match { - case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop) - case _ => - throw new UnsupportedOperationException("Operator " + javaStream.toString + " cannot " + - "have " + - "parallelism.") - } - this - } - - /** - * Returns the degree of parallelism of this operation. - */ - def getParallelism: Int = javaStream match { - case op: SingleOutputStreamOperator[_, _] => op.getParallelism - case _ => - throw new UnsupportedOperationException("Operator " + javaStream.toString + " does not have" + - " " + - "parallelism.") - } - - /** - * Creates a new DataStream by merging DataStream outputs of - * the same type with each other. The DataStreams merged using this operator - * will be transformed simultaneously. - * - */ - def merge(dataStreams: DataStream[T]*): DataStream[T] = - javaStream.merge(dataStreams.map(_.getJavaStream): _*) - - /** - * Creates a new ConnectedDataStream by connecting - * DataStream outputs of different type with each other. The - * DataStreams connected using this operators can be used with CoFunctions. - * - */ - def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] = - javaStream.connect(dataStream.getJavaStream) - - /** - * Groups the elements of a DataStream by the given key positions (for tuple/array types) to - * be used with grouped operators like grouped reduce or grouped aggregations - * - */ - def groupBy(fields: Int*): DataStream[T] = javaStream.groupBy(fields: _*) - - /** - * Groups the elements of a DataStream by the given field expressions to - * be used with grouped operators like grouped reduce or grouped aggregations - * - */ - def groupBy(firstField: String, otherFields: String*): DataStream[T] = - javaStream.groupBy(firstField +: otherFields.toArray: _*) - - /** - * Groups the elements of a DataStream by the given K key to - * be used with grouped operators like grouped reduce or grouped aggregations - * - */ - def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = { - - val keyExtractor = new KeySelector[T, K] { - val cleanFun = clean(fun) - def getKey(in: T) = cleanFun(in) - } - javaStream.groupBy(keyExtractor) - } - - /** - * Sets the partitioning of the DataStream so that the output is - * partitioned by the selected fields. This setting only effects the how the outputs will be - * distributed between the parallel instances of the next processing operator. - * - */ - def partitionBy(fields: Int*): DataStream[T] = - javaStream.partitionBy(fields: _*) - - /** - * Sets the partitioning of the DataStream so that the output is - * partitioned by the selected fields. This setting only effects the how the outputs will be - * distributed between the parallel instances of the next processing operator. - * - */ - def partitionBy(firstField: String, otherFields: String*): DataStream[T] = - javaStream.partitionBy(firstField +: otherFields.toArray: _*) - - /** - * Sets the partitioning of the DataStream so that the output is - * partitioned by the given Key. This setting only effects the how the outputs will be - * distributed between the parallel instances of the next processing operator. - * - */ - def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = { - - val keyExtractor = new KeySelector[T, K] { - val cleanFun = clean(fun) - def getKey(in: T) = cleanFun(in) - } - javaStream.partitionBy(keyExtractor) - } - - /** - * Sets the partitioning of the DataStream so that the output tuples - * are broadcasted to every parallel instance of the next component. This - * setting only effects the how the outputs will be distributed between the - * parallel instances of the next processing operator. - * - */ - def broadcast: DataStream[T] = javaStream.broadcast() - - /** - * Sets the partitioning of the DataStream so that the output tuples - * are shuffled to the next component. This setting only effects the how the - * outputs will be distributed between the parallel instances of the next - * processing operator. - * - */ - def shuffle: DataStream[T] = javaStream.shuffle() - - /** - * Sets the partitioning of the DataStream so that the output tuples - * are forwarded to the local subtask of the next component (whenever - * possible). This is the default partitioner setting. This setting only - * effects the how the outputs will be distributed between the parallel - * instances of the next processing operator. - * - */ - def forward: DataStream[T] = javaStream.forward() - - /** - * Sets the partitioning of the DataStream so that the output tuples - * are distributed evenly to the next component.This setting only effects - * the how the outputs will be distributed between the parallel instances of - * the next processing operator. - * - */ - def distribute: DataStream[T] = javaStream.distribute() - - /** - * Initiates an iterative part of the program that creates a loop by feeding - * back data streams. To create a streaming iteration the user needs to define - * a transformation that creates two DataStreams.The first one one is the output - * that will be fed back to the start of the iteration and the second is the output - * stream of the iterative part. - *

- * stepfunction: initialStream => (feedback, output) - *

- * A common pattern is to use output splitting to create feedback and output DataStream. - * Please refer to the .split(...) method of the DataStream - *

- * By default a DataStream with iteration will never terminate, but the user - * can use the maxWaitTime parameter to set a max waiting time for the iteration head. - * If no data received in the set time the stream terminates. - * - * - */ - def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]), maxWaitTimeMillis: - Long = 0): DataStream[T] = { - val iterativeStream = javaStream.iterate(maxWaitTimeMillis) - - val (feedback, output) = stepFunction(new DataStream[T](iterativeStream)) - iterativeStream.closeWith(feedback.getJavaStream) - output - } - - /** - * Applies an aggregation that that gives the current maximum of the data stream at - * the given position. - * - */ - def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position) - - /** - * Applies an aggregation that that gives the current minimum of the data stream at - * the given position. - * - */ - def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position) - - /** - * Applies an aggregation that sums the data stream at the given position. - * - */ - def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position) - - /** - * Applies an aggregation that that gives the current minimum element of the data stream by - * the given position. When equality, the user can set to get the first or last element with - * the minimal value. - * - */ - def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType - .MINBY, position, first) - - /** - * Applies an aggregation that that gives the current maximum element of the data stream by - * the given position. When equality, the user can set to get the first or last element with - * the maximal value. - * - */ - def maxBy(position: Int, first: Boolean = true): DataStream[T] = - aggregate(AggregationType.MAXBY, position, first) - - private def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true): - DataStream[T] = { - - val jStream = javaStream.asInstanceOf[JavaStream[Product]] - val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]] - - val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position) - - val reducer = aggregationType match { - case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position). - getTypeClass())); - case _ => new agg.ProductComparableAggregator(aggregationType, first) - } - - val invokable = jStream match { - case groupedStream: GroupedDataStream[_] => new GroupedReduceInvokable(reducer, - groupedStream.getKeySelector()) - case _ => new StreamReduceInvokable(reducer) - } - new DataStream[Product](jStream.transform("aggregation", jStream.getType(), - invokable)).asInstanceOf[DataStream[T]] - } - - /** - * Creates a new DataStream containing the current number (count) of - * received records. - * - */ - def count: DataStream[Long] = new DataStream[java.lang.Long]( - javaStream.count()).asInstanceOf[DataStream[Long]] - - /** - * Creates a new DataStream by applying the given function to every element of this DataStream. - */ - def map[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] = { - if (fun == null) { - throw new NullPointerException("Map function must not be null.") - } - val mapper = new MapFunction[T, R] { - val cleanFun = clean(fun) - def map(in: T): R = cleanFun(in) - } - - javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper)) - } - - /** - * Creates a new DataStream by applying the given function to every element of this DataStream. - */ - def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataStream[R] = { - if (mapper == null) { - throw new NullPointerException("Map function must not be null.") - } - - javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper)) - } - - /** - * Creates a new DataStream by applying the given function to every element and flattening - * the results. - */ - def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataStream[R] = { - if (flatMapper == null) { - throw new NullPointerException("FlatMap function must not be null.") - } - javaStream.transform("flatMap", implicitly[TypeInformation[R]], - new FlatMapInvokable[T, R](flatMapper)) - } - - /** - * Creates a new DataStream by applying the given function to every element and flattening - * the results. - */ - def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataStream[R] = { - if (fun == null) { - throw new NullPointerException("FlatMap function must not be null.") - } - val flatMapper = new FlatMapFunction[T, R] { - val cleanFun = clean(fun) - def flatMap(in: T, out: Collector[R]) { cleanFun(in, out) } - } - flatMap(flatMapper) - } - - /** - * Creates a new DataStream by applying the given function to every element and flattening - * the results. - */ - def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataStream[R] = { - if (fun == null) { - throw new NullPointerException("FlatMap function must not be null.") - } - val flatMapper = new FlatMapFunction[T, R] { - val cleanFun = clean(fun) - def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect } - } - flatMap(flatMapper) - } - - /** - * Creates a new [[DataStream]] by reducing the elements of this DataStream - * using an associative reduce function. - */ - def reduce(reducer: ReduceFunction[T]): DataStream[T] = { - if (reducer == null) { - throw new NullPointerException("Reduce function must not be null.") - } - javaStream match { - case ds: GroupedDataStream[_] => javaStream.transform("reduce", - javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector())) - case _ => javaStream.transform("reduce", javaStream.getType(), - new StreamReduceInvokable[T](reducer)) - } - } - - /** - * Creates a new [[DataStream]] by reducing the elements of this DataStream - * using an associative reduce function. - */ - def reduce(fun: (T, T) => T): DataStream[T] = { - if (fun == null) { - throw new NullPointerException("Reduce function must not be null.") - } - val reducer = new ReduceFunction[T] { - val cleanFun = clean(fun) - def reduce(v1: T, v2: T) = { cleanFun(v1, v2) } - } - reduce(reducer) - } - - /** - * Creates a new DataStream that contains only the elements satisfying the given filter predicate. - */ - def filter(filter: FilterFunction[T]): DataStream[T] = { - if (filter == null) { - throw new NullPointerException("Filter function must not be null.") - } - javaStream.filter(filter) - } - - /** - * Creates a new DataStream that contains only the elements satisfying the given filter predicate. - */ - def filter(fun: T => Boolean): DataStream[T] = { - if (fun == null) { - throw new NullPointerException("Filter function must not be null.") - } - val filter = new FilterFunction[T] { - val cleanFun = clean(fun) - def filter(in: T) = cleanFun(in) - } - this.filter(filter) - } - - /** - * Create a WindowedDataStream that can be used to apply - * transformation like .reduce(...) or aggregations on - * preset chunks(windows) of the data stream. To define the windows one or - * more WindowingHelper-s such as Time, Count and - * Delta can be used.

When applied to a grouped data - * stream, the windows (evictions) and slide sizes (triggers) will be - * computed on a per group basis.

For more advanced control over - * the trigger and eviction policies please use to - * window(List(triggers), List(evicters)) - */ - def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = - javaStream.window(windowingHelper: _*) - - /** - * Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s. - * Windowing can be used to apply transformation like .reduce(...) or aggregations on - * preset chunks(windows) of the data stream.

For most common - * use-cases please refer to window(WindowingHelper[_]*) - * - */ - def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]): - WindowedDataStream[T] = javaStream.window(triggers, evicters) - - /** - * - * Operator used for directing tuples to specific named outputs using an - * OutputSelector. Calling this method on an operator creates a new - * SplitDataStream. - */ - def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream match { - case op: SingleOutputStreamOperator[_, _] => op.split(selector) - case _ => - throw new UnsupportedOperationException("Operator " + javaStream.toString + " can not be " + - "split.") - } - - /** - * Creates a new SplitDataStream that contains only the elements satisfying the - * given output selector predicate. - */ - def split(fun: T => String): SplitDataStream[T] = { - if (fun == null) { - throw new NullPointerException("OutputSelector must not be null.") - } - val selector = new OutputSelector[T] { - val cleanFun = clean(fun) - def select(in: T): java.lang.Iterable[String] = { - List(cleanFun(in)) - } - } - split(selector) - } - - /** - * Initiates a temporal Join transformation that joins the elements of two - * data streams on key equality over a specified time window. - * - * This method returns a StreamJoinOperator on which the - * .onWindow(..) should be called to define the - * window, and then the .where(..) and .equalTo(..) methods can be used to defin - * the join keys.

The user can also use the apply method of the returned JoinedStream - * to use custom join function. - * - */ - def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] = - new StreamJoinOperator[T, R](javaStream, stream.getJavaStream) - - /** - * Initiates a temporal cross transformation that builds all pair - * combinations of elements of both DataStreams, i.e., it builds a Cartesian - * product. - * - * This method returns a StreamJoinOperator on which the - * .onWindow(..) should be called to define the - * window, and then the .where(..) and .equalTo(..) methods can be used to defin - * the join keys.

The user can also use the apply method of the returned JoinedStream - * to use custom join function. - * - */ - def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] = - new StreamCrossOperator[T, R](javaStream, stream.getJavaStream) - - /** - * Writes a DataStream to the standard output stream (stdout). For each - * element of the DataStream the result of .toString is - * written. - * - */ - def print(): DataStream[T] = javaStream.print() - - /** - * Writes a DataStream to the file specified by path in text format. The - * writing is performed periodically, in every millis milliseconds. For - * every element of the DataStream the result of .toString - * is written. - * - */ - def writeAsText(path: String, millis: Long = 0): DataStream[T] = - javaStream.writeAsText(path, millis) - - /** - * Writes a DataStream to the file specified by path in text format. The - * writing is performed periodically, in every millis milliseconds. For - * every element of the DataStream the result of .toString - * is written. - * - */ - def writeAsCsv(path: String, millis: Long = 0): DataStream[T] = - javaStream.writeAsCsv(path, millis) - - /** - * Adds the given sink to this DataStream. Only streams with sinks added - * will be executed once the StreamExecutionEnvironment.execute(...) - * method is called. - * - */ - def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] = - javaStream.addSink(sinkFuntion) - - /** - * Adds the given sink to this DataStream. Only streams with sinks added - * will be executed once the StreamExecutionEnvironment.execute(...) - * method is called. - * - */ - def addSink(fun: T => Unit): DataStream[T] = { - if (fun == null) { - throw new NullPointerException("Sink function must not be null.") - } - val sinkFunction = new SinkFunction[T] { - val cleanFun = clean(fun) - def invoke(in: T) = cleanFun(in) - } - this.addSink(sinkFunction) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala deleted file mode 100644 index f61e34b..0000000 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.streaming - -import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } -import org.apache.flink.api.scala.streaming.StreamingConversions._ - -/** - * The SplitDataStream represents an operator that has been split using an - * {@link OutputSelector}. Named outputs can be selected using the - * {@link #select} function. - * - * @param - * The type of the output. - */ -class SplitDataStream[T](javaStream: SplitJavaStream[T]) { - - /** - * Gets the underlying java DataStream object. - */ - private[flink] def getJavaStream: SplitJavaStream[T] = javaStream - - /** - * Sets the output names for which the next operator will receive values. - */ - def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*) - - /** - * Selects all output names from a split data stream. - */ - def selectAll(): DataStream[T] = javaStream.selectAll() - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala deleted file mode 100644 index cac2927..0000000 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.streaming - -import org.apache.flink.api.common.functions.JoinFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.scala.ClosureCleaner -import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } -import org.apache.flink.streaming.api.datastream.TemporalOperator -import scala.reflect.ClassTag -import org.apache.commons.lang.Validate -import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable -import org.apache.flink.streaming.api.function.co.CrossWindowFunction -import org.apache.flink.api.common.functions.CrossFunction -import org.apache.flink.api.scala.typeutils.CaseClassSerializer -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean -import org.apache.flink.api.scala.streaming.StreamingConversions._ - -class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends - TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) { - - override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, I2] = { - - val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this, - (l: I1, r: I2) => (l, r)) - - val returnType = new CaseClassTypeInfo[(I1, I2)]( - - classOf[(I1, I2)], Seq(input1.getType, input2.getType), Array("_1", "_2")) { - - override def createSerializer: TypeSerializer[(I1, I2)] = { - val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) - for (i <- 0 until getArity) { - fieldSerializers(i) = types(i).createSerializer - } - - new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) { - override def createInstance(fields: Array[AnyRef]) = { - (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2]) - } - } - } - } - - val javaStream = input1.connect(input2).addGeneralWindowCombine( - crossWindowFunction, - returnType, windowSize, - slideInterval, timeStamp1, timeStamp2); - - new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream) - } -} -object StreamCrossOperator { - - private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2], - javaStream: JavaStream[(I1, I2)]) extends - DataStream[(I1, I2)](javaStream) { - - /** - * Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf - * call will be emitted. - * - */ - def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = { - - val invokable = new CoWindowInvokable[I1, I2, R]( - clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1, - op.timeStamp2) - - javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), - invokable) - - javaStream.setType(implicitly[TypeInformation[R]]) - } - } - - private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2], - crossFunction: (I1, I2) => R): - CrossWindowFunction[I1, I2, R] = { - Validate.notNull(crossFunction, "Join function must not be null.") - - val crossFun = new CrossFunction[I1, I2, R] { - val cleanFun = op.input1.clean(crossFunction) - - override def cross(first: I1, second: I2): R = { - cleanFun(first, second) - } - } - - new CrossWindowFunction[I1, I2, R](crossFun) - } - -}