Return-Path:
X-Original-To: apmail-flink-commits-archive@minotaur.apache.org
Delivered-To: apmail-flink-commits-archive@minotaur.apache.org
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by minotaur.apache.org (Postfix) with SMTP id 905F210BE7
for ;
Sun, 4 Jan 2015 20:52:08 +0000 (UTC)
Received: (qmail 39025 invoked by uid 500); 4 Jan 2015 20:52:09 -0000
Delivered-To: apmail-flink-commits-archive@flink.apache.org
Received: (qmail 38993 invoked by uid 500); 4 Jan 2015 20:52:09 -0000
Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: dev@flink.incubator.apache.org
Delivered-To: mailing list commits@flink.incubator.apache.org
Received: (qmail 38932 invoked by uid 99); 4 Jan 2015 20:52:09 -0000
Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230)
by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Jan 2015 20:52:09 +0000
X-ASF-Spam-Status: No, hits=-2000.0 required=5.0
tests=ALL_TRUSTED,T_RP_MATCHES_RCVD
X-Spam-Check-By: apache.org
Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3)
by apache.org (qpsmtpd/0.29) with SMTP; Sun, 04 Jan 2015 20:50:56 +0000
Received: (qmail 37398 invoked by uid 99); 4 Jan 2015 20:50:52 -0000
Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org)
(140.211.11.114)
by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Jan 2015 20:50:52 +0000
Received: by tyr.zones.apache.org (Postfix, from userid 65534)
id 10F94A3E8E5; Sun, 4 Jan 2015 20:50:52 +0000 (UTC)
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
From: mbalassi@apache.org
To: commits@flink.incubator.apache.org
Date: Sun, 04 Jan 2015 20:51:15 -0000
Message-Id:
In-Reply-To:
References:
X-Mailer: ASF-Git Admin Mailer
Subject: [25/27] incubator-flink git commit: [streaming] [scala] Restructured
streaming scala project and examples
X-Virus-Checked: Checked by ClamAV on apache.org
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
new file mode 100644
index 0000000..d587d56
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import scala.Array.canBuildFrom
+import scala.reflect.ClassTag
+
+import org.apache.commons.lang.Validate
+import org.apache.flink.api.common.functions.JoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.operators.Keys
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.CaseClassSerializer
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
+import org.apache.flink.streaming.api.datastream.TemporalOperator
+import org.apache.flink.streaming.api.function.co.JoinWindowFunction
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
+import org.apache.flink.streaming.api.scala.StreamingConversions._
+import org.apache.flink.streaming.util.keys.KeySelectorUtil
+
+class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
+TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
+
+ override def createNextWindowOperator() = {
+ new StreamJoinOperator.JoinWindow[I1, I2](this)
+ }
+}
+
+object StreamJoinOperator {
+
+ class JoinWindow[I1, I2](private[flink] op: StreamJoinOperator[I1, I2]) {
+
+ private[flink] val type1 = op.input1.getType();
+
+ /**
+ * Continues a temporal Join transformation by defining
+ * the fields in the first stream to be used as keys for the join.
+ * The resulting incomplete join can be completed by JoinPredicate.equalTo()
+ * to define the second key.
+ */
+ def where(fields: Int*) = {
+ new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys(fields.toArray,type1),type1))
+ }
+
+ /**
+ * Continues a temporal Join transformation by defining
+ * the fields in the first stream to be used as keys for the join.
+ * The resulting incomplete join can be completed by JoinPredicate.equalTo()
+ * to define the second key.
+ */
+ def where(firstField: String, otherFields: String*) =
+ new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys(firstField +: otherFields.toArray,type1),type1))
+
+ /**
+ * Continues a temporal Join transformation by defining
+ * the keyselector function that will be used to extract keys from the first stream
+ * for the join.
+ * The resulting incomplete join can be completed by JoinPredicate.equalTo()
+ * to define the second key.
+ */
+ def where[K: TypeInformation](fun: (I1) => K) = {
+ val keyType = implicitly[TypeInformation[K]]
+ val keyExtractor = new KeySelector[I1, K] {
+ val cleanFun = op.input1.clean(fun)
+ def getKey(in: I1) = cleanFun(in)
+ }
+ new JoinPredicate[I1, I2](op, keyExtractor)
+ }
+
+ }
+
+ class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2],
+ private[flink] val keys1: KeySelector[I1, _]) {
+ private[flink] var keys2: KeySelector[I2, _] = null
+ private[flink] val type2 = op.input2.getType();
+
+ /**
+ * Creates a temporal join transformation by defining the second join key.
+ * The returned transformation wrapes each joined element pair in a tuple2:
+ * (first, second)
+ * To define a custom wrapping, use JoinedStream.apply(...)
+ */
+ def equalTo(fields: Int*): JoinedStream[I1, I2] = {
+ finish(KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys(fields.toArray,type2),type2))
+ }
+
+ /**
+ * Creates a temporal join transformation by defining the second join key.
+ * The returned transformation wrapes each joined element pair in a tuple2:
+ * (first, second)
+ * To define a custom wrapping, use JoinedStream.apply(...)
+ */
+ def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] =
+ finish(KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys(firstField +: otherFields.toArray,type2),type2))
+
+ /**
+ * Creates a temporal join transformation by defining the second join key.
+ * The returned transformation wrapes each joined element pair in a tuple2:
+ * (first, second)
+ * To define a custom wrapping, use JoinedStream.apply(...)
+ */
+ def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = {
+ val keyType = implicitly[TypeInformation[K]]
+ val keyExtractor = new KeySelector[I2, K] {
+ val cleanFun = op.input1.clean(fun)
+ def getKey(in: I2) = cleanFun(in)
+ }
+ finish(keyExtractor)
+ }
+
+ private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = {
+ this.keys2 = keys2
+ new JoinedStream[I1, I2](this, createJoinOperator())
+ }
+
+ private def createJoinOperator(): JavaStream[(I1, I2)] = {
+
+ val returnType = new CaseClassTypeInfo[(I1, I2)](
+
+ classOf[(I1, I2)], Seq(op.input1.getType, op.input2.getType), Array("_1", "_2")) {
+
+ override def createSerializer: TypeSerializer[(I1, I2)] = {
+ val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
+ for (i <- 0 until getArity) {
+ fieldSerializers(i) = types(i).createSerializer
+ }
+
+ new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
+ override def createInstance(fields: Array[AnyRef]) = {
+ (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
+ }
+ }
+ }
+ }
+
+ return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
+ .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
+ returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
+ }
+ }
+
+ class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends
+ DataStream[(I1, I2)](javaStream) {
+
+ private val op = jp.op
+
+ /**
+ * Sets a wrapper for the joined elements. For each joined pair, the result of the
+ * udf call will be emitted.
+ */
+ def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
+
+ val invokable = new CoWindowInvokable[I1, I2, R](
+ clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
+ op.timeStamp2)
+
+ javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
+ invokable)
+
+ javaStream.setType(implicitly[TypeInformation[R]])
+ }
+ }
+
+ private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2],
+ joinFunction: (I1, I2) => R) = {
+ Validate.notNull(joinFunction, "Join function must not be null.")
+
+ val joinFun = new JoinFunction[I1, I2, R] {
+
+ val cleanFun = jp.op.input1.clean(joinFunction)
+
+ override def join(first: I1, second: I2): R = {
+ cleanFun(first, second)
+ }
+ }
+
+ new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala
new file mode 100644
index 0000000..fb3745f
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
+import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream }
+import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream }
+
+object StreamingConversions {
+
+ implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
+ new DataStream[R](javaStream)
+
+ implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] =
+ new WindowedDataStream[R](javaWStream)
+
+ implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] =
+ new SplitDataStream[R](javaStream)
+
+ implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]):
+ ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream)
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
new file mode 100644
index 0000000..deda3d9
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import scala.Array.canBuildFrom
+import scala.collection.JavaConversions.iterableAsScalaIterable
+import scala.reflect.ClassTag
+
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.scala._
+import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
+import org.apache.flink.streaming.api.datastream.{WindowedDataStream => JavaWStream}
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
+import org.apache.flink.streaming.api.function.aggregation.SumFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
+import org.apache.flink.streaming.api.scala.StreamingConversions._
+import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
+import org.apache.flink.streaming.api.windowing.helper._
+import org.apache.flink.util.Collector
+
+class WindowedDataStream[T](javaStream: JavaWStream[T]) {
+
+ /**
+ * Defines the slide size (trigger frequency) for the windowed data stream.
+ * This controls how often the user defined function will be triggered on
+ * the window.
+ */
+ def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
+ javaStream.every(windowingHelper: _*)
+
+ /**
+ * Groups the elements of the WindowedDataStream using the given
+ * field positions. The window sizes (evictions) and slide sizes
+ * (triggers) will be calculated on the whole stream (in a central fashion),
+ * but the user defined functions will be applied on a per group basis.
+ * To get windows and triggers on a per group basis apply the
+ * DataStream.window(...) operator on an already grouped data stream.
+ *
+ */
+ def groupBy(fields: Int*): WindowedDataStream[T] = javaStream.groupBy(fields: _*)
+
+ /**
+ * Groups the elements of the WindowedDataStream using the given
+ * field expressions. The window sizes (evictions) and slide sizes
+ * (triggers) will be calculated on the whole stream (in a central fashion),
+ * but the user defined functions will be applied on a per group basis.
+ * To get windows and triggers on a per group basis apply the
+ * DataStream.window(...) operator on an already grouped data stream.
+ *
+ */
+ def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T] =
+ javaStream.groupBy(firstField +: otherFields.toArray: _*)
+
+ /**
+ * Groups the elements of the WindowedDataStream using the given
+ * KeySelector function. The window sizes (evictions) and slide sizes
+ * (triggers) will be calculated on the whole stream (in a central fashion),
+ * but the user defined functions will be applied on a per group basis.
+ * To get windows and triggers on a per group basis apply the
+ * DataStream.window(...) operator on an already grouped data stream.
+ *
+ */
+ def groupBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = {
+
+ val keyExtractor = new KeySelector[T, K] {
+ val cleanFun = clean(fun)
+ def getKey(in: T) = cleanFun(in)
+ }
+ javaStream.groupBy(keyExtractor)
+ }
+
+ /**
+ * Applies a reduce transformation on the windowed data stream by reducing
+ * the current window at every trigger.
+ *
+ */
+ def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
+ if (reducer == null) {
+ throw new NullPointerException("Reduce function must not be null.")
+ }
+ javaStream.reduce(reducer)
+ }
+
+ /**
+ * Applies a reduce transformation on the windowed data stream by reducing
+ * the current window at every trigger.
+ *
+ */
+ def reduce(fun: (T, T) => T): DataStream[T] = {
+ if (fun == null) {
+ throw new NullPointerException("Reduce function must not be null.")
+ }
+ val reducer = new ReduceFunction[T] {
+ val cleanFun = clean(fun)
+ def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+ }
+ reduce(reducer)
+ }
+
+ /**
+ * Applies a reduceGroup transformation on the windowed data stream by reducing
+ * the current window at every trigger. In contrast with the simple binary reduce operator,
+ * groupReduce exposes the whole window through the Iterable interface.
+ *
+ *
+ * Whenever possible try to use reduce instead of groupReduce for increased efficiency
+ */
+ def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]):
+ DataStream[R] = {
+ if (reducer == null) {
+ throw new NullPointerException("GroupReduce function must not be null.")
+ }
+ javaStream.reduceGroup(reducer, implicitly[TypeInformation[R]])
+ }
+
+ /**
+ * Applies a reduceGroup transformation on the windowed data stream by reducing
+ * the current window at every trigger. In contrast with the simple binary reduce operator,
+ * groupReduce exposes the whole window through the Iterable interface.
+ *
+ *
+ * Whenever possible try to use reduce instead of groupReduce for increased efficiency
+ */
+ def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit):
+ DataStream[R] = {
+ if (fun == null) {
+ throw new NullPointerException("GroupReduce function must not be null.")
+ }
+ val reducer = new GroupReduceFunction[T, R] {
+ val cleanFun = clean(fun)
+ def reduce(in: java.lang.Iterable[T], out: Collector[R]) = { cleanFun(in, out) }
+ }
+ reduceGroup(reducer)
+ }
+
+ /**
+ * Applies an aggregation that that gives the maximum of the elements in the window at
+ * the given position.
+ *
+ */
+ def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
+
+ /**
+ * Applies an aggregation that that gives the minimum of the elements in the window at
+ * the given position.
+ *
+ */
+ def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
+
+ /**
+ * Applies an aggregation that sums the elements in the window at the given position.
+ *
+ */
+ def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
+
+ /**
+ * Applies an aggregation that that gives the maximum element of the window by
+ * the given position. When equality, returns the first.
+ *
+ */
+ def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY,
+ position, first)
+
+ /**
+ * Applies an aggregation that that gives the minimum element of the window by
+ * the given position. When equality, returns the first.
+ *
+ */
+ def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY,
+ position, first)
+
+ def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true):
+ DataStream[T] = {
+
+ val jStream = javaStream.asInstanceOf[JavaWStream[Product]]
+ val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
+
+ val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position)
+
+ val reducer = aggregationType match {
+ case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(
+ outType.getTypeAt(position).getTypeClass()));
+ case _ => new agg.ProductComparableAggregator(aggregationType, first)
+ }
+
+ new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
new file mode 100644
index 0000000..83f2293
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.windowing
+
+import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta }
+import org.apache.commons.lang.Validate
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean;
+import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction
+
+object Delta {
+
+ /**
+ * Creates a delta helper representing a delta trigger or eviction policy.
+ * This policy calculates a delta between the data point which
+ * triggered last and the currently arrived data point. It triggers if the
+ * delta is higher than a specified threshold. In case it gets
+ * used for eviction, this policy starts from the first element of the
+ * buffer and removes all elements from the buffer which have a higher delta
+ * then the threshold. As soon as there is an element with a lower delta,
+ * the eviction stops.
+ */
+ def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): JavaDelta[T] = {
+ Validate.notNull(deltaFunction, "Delta function must not be null")
+ val df = new DeltaFunction[T] {
+ val cleanFun = clean(deltaFunction)
+ override def getDelta(first: T, second: T) = cleanFun(first, second)
+ }
+ JavaDelta.of(threshold, df, initVal)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
new file mode 100644
index 0000000..e1b9768
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.windowing
+
+import java.util.concurrent.TimeUnit
+import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime }
+
+import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.commons.net.ntp.TimeStamp
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp
+import org.apache.commons.lang.Validate
+
+object Time {
+
+ /**
+ * Creates a helper representing a time trigger which triggers every given
+ * length (slide size) or a time eviction which evicts all elements older
+ * than length (window size) using System time.
+ *
+ */
+ def of(windowSize: Long, timeUnit: TimeUnit): JavaTime[_] =
+ JavaTime.of(windowSize, timeUnit)
+
+ /**
+ * Creates a helper representing a time trigger which triggers every given
+ * length (slide size) or a time eviction which evicts all elements older
+ * than length (window size) using a user defined timestamp extractor.
+ *
+ */
+ def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): JavaTime[R] = {
+ Validate.notNull(timestamp, "Timestamp must not be null.")
+ val ts = new Timestamp[R] {
+ val fun = clean(timestamp, true)
+ override def getTimestamp(in: R) = fun(in)
+ }
+ JavaTime.of(windowSize, ts, startTime)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/pom.xml b/flink-addons/flink-streaming/pom.xml
index c20554b..386ef73 100644
--- a/flink-addons/flink-streaming/pom.xml
+++ b/flink-addons/flink-streaming/pom.xml
@@ -35,6 +35,7 @@ under the License.
flink-streaming-core
+ flink-streaming-scalaflink-streaming-examplesflink-streaming-connectors
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
deleted file mode 100644
index e39fb11..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.scala.streaming.windowing
-
-
-import java.util.concurrent.TimeUnit._
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment
-import org.apache.flink.api.scala.streaming.windowing.Delta
-import org.apache.flink.streaming.api.windowing.helper.Time
-import org.apache.flink.util.Collector
-
-import scala.math.{max, min}
-import scala.util.Random
-
-/**
- * An example of grouped stream windowing where different eviction and
- * trigger policies can be used.A source fetches events from cars
- * every 1 sec containing their id, their current speed (kmh),
- * overall elapsed distance (m) and a timestamp. The streaming
- * example triggers the top speed of each car every x meters elapsed
- * for the last y seconds.
- */
-object TopSpeedWindowing {
-
- case class CarSpeed(carId: Int, speed: Int, distance: Double, time: Long)
-
- def main(args: Array[String]) {
- if (!parseParameters(args)) {
- return
- }
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val cars = env.addSource(carSource _).groupBy("carId")
- .window(Time.of(evictionSec, SECONDS))
- .every(Delta.of[CarSpeed](triggerMeters,
- (oldSp,newSp) => newSp.distance-oldSp.distance, CarSpeed(0,0,0,0)))
- .reduce((x, y) => if (x.speed > y.speed) x else y)
-
- cars print
-
- env.execute("TopSpeedWindowing")
-
- }
-
- def carSource(out: Collector[CarSpeed]) = {
-
- val speeds = new Array[Int](numOfCars)
- val distances = new Array[Double](numOfCars)
-
- while (true) {
- Thread sleep 1000
- for (i <- 0 until speeds.length) {
- speeds(i) = if (Random.nextBoolean) min(100, speeds(i) + 5) else max(0, speeds(i) - 5)
- distances(i) += speeds(i) / 3.6d
- out.collect(new CarSpeed(i, speeds(i), distances(i), System.currentTimeMillis))
- }
- }
- }
-
- def parseParameters(args: Array[String]): Boolean = {
- if (args.length > 0) {
- if (args.length == 3) {
- numOfCars = args(0).toInt
- evictionSec = args(1).toInt
- triggerMeters = args(2).toDouble
- }
- else {
- System.err.println("Usage: TopSpeedWindowing ")
- false
- }
- }
- true
- }
-
- var numOfCars = 2
- var evictionSec = 10
- var triggerMeters = 50d
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala
deleted file mode 100644
index caf5eb9..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.scala.streaming.windowing
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment
-import org.apache.flink.util.Collector
-import scala.util.Random
-
-object WindowJoin {
-
- case class Name(id: Long, name: String)
- case class Age(id: Long, age: Int)
- case class Person(name: String, age: Long)
-
- def main(args: Array[String]) {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- //Create streams for names and ages by mapping the inputs to the corresponding objects
- val names = env.addSource(nameStream _).map(x => Name(x._1, x._2))
- val ages = env.addSource(ageStream _).map(x => Age(x._1, x._2))
-
- //Join the two input streams by id on the last second and create new Person objects
- //containing both name and age
- val joined =
- names.join(ages).onWindow(1000)
- .where("id").equalTo("id") { (n, a) => Person(n.name, a.age) }
-
- joined print
-
- env.execute("WindowJoin")
- }
-
- //Stream source for generating (id, name) pairs
- def nameStream(out: Collector[(Long, String)]) = {
- val names = Array("tom", "jerry", "alice", "bob", "john", "grace")
-
- for (i <- 1 to 10000) {
- if (i % 100 == 0) Thread.sleep(1000) else {
- out.collect((i, names(Random.nextInt(names.length))))
- }
- }
- }
-
- //Stream source for generating (id, age) pairs
- def ageStream(out: Collector[(Long, Int)]) = {
- for (i <- 1 to 10000) {
- if (i % 100 == 0) Thread.sleep(1000) else {
- out.collect((i, Random.nextInt(90)))
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index 4ccbb98..b902655 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -50,12 +50,6 @@ under the License.
flink-compiler${project.version}
-
-
- org.apache.flink
- flink-streaming-core
- ${project.version}
- org.scala-lang
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/java/org/apache/flink/api/scala/streaming/ScalaStreamingAggregator.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/streaming/ScalaStreamingAggregator.java b/flink-scala/src/main/java/org/apache/flink/api/scala/streaming/ScalaStreamingAggregator.java
deleted file mode 100644
index 2f587d7..0000000
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/streaming/ScalaStreamingAggregator.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.streaming;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.function.aggregation.SumFunction;
-
-import scala.Product;
-
-public class ScalaStreamingAggregator implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- TupleSerializerBase serializer;
- Object[] fields;
- int length;
- int position;
-
- public ScalaStreamingAggregator(TypeSerializer serializer, int pos) {
- this.serializer = (TupleSerializerBase) serializer;
- this.length = this.serializer.getArity();
- this.fields = new Object[this.length];
- this.position = pos;
- }
-
- public class Sum extends AggregationFunction {
- private static final long serialVersionUID = 1L;
- SumFunction sumFunction;
-
- public Sum(SumFunction func) {
- super(ScalaStreamingAggregator.this.position);
- this.sumFunction = func;
- }
-
- @Override
- public IN reduce(IN value1, IN value2) throws Exception {
- for (int i = 0; i < length; i++) {
- fields[i] = value2.productElement(i);
- }
-
- fields[position] = sumFunction.add(fields[position], value1.productElement(position));
-
- return serializer.createInstance(fields);
- }
- }
-
- public class ProductComparableAggregator extends ComparableAggregator {
-
- private static final long serialVersionUID = 1L;
-
- public ProductComparableAggregator(AggregationFunction.AggregationType aggregationType,
- boolean first) {
- super(ScalaStreamingAggregator.this.position, aggregationType, first);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public IN reduce(IN value1, IN value2) throws Exception {
- Object v1 = value1.productElement(position);
- Object v2 = value2.productElement(position);
-
- int c = comparator.isExtremal((Comparable
The user can also use the apply method of the returned JoinedStream
- * to use custom join function.
- *
- */
- def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] =
- new StreamCrossOperator[T, R](javaStream, stream.getJavaStream)
-
- /**
- * Writes a DataStream to the standard output stream (stdout). For each
- * element of the DataStream the result of .toString is
- * written.
- *
- */
- def print(): DataStream[T] = javaStream.print()
-
- /**
- * Writes a DataStream to the file specified by path in text format. The
- * writing is performed periodically, in every millis milliseconds. For
- * every element of the DataStream the result of .toString
- * is written.
- *
- */
- def writeAsText(path: String, millis: Long = 0): DataStream[T] =
- javaStream.writeAsText(path, millis)
-
- /**
- * Writes a DataStream to the file specified by path in text format. The
- * writing is performed periodically, in every millis milliseconds. For
- * every element of the DataStream the result of .toString
- * is written.
- *
- */
- def writeAsCsv(path: String, millis: Long = 0): DataStream[T] =
- javaStream.writeAsCsv(path, millis)
-
- /**
- * Adds the given sink to this DataStream. Only streams with sinks added
- * will be executed once the StreamExecutionEnvironment.execute(...)
- * method is called.
- *
- */
- def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] =
- javaStream.addSink(sinkFuntion)
-
- /**
- * Adds the given sink to this DataStream. Only streams with sinks added
- * will be executed once the StreamExecutionEnvironment.execute(...)
- * method is called.
- *
- */
- def addSink(fun: T => Unit): DataStream[T] = {
- if (fun == null) {
- throw new NullPointerException("Sink function must not be null.")
- }
- val sinkFunction = new SinkFunction[T] {
- val cleanFun = clean(fun)
- def invoke(in: T) = cleanFun(in)
- }
- this.addSink(sinkFunction)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
deleted file mode 100644
index f61e34b..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.streaming
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream }
-import org.apache.flink.api.scala.streaming.StreamingConversions._
-
-/**
- * The SplitDataStream represents an operator that has been split using an
- * {@link OutputSelector}. Named outputs can be selected using the
- * {@link #select} function.
- *
- * @param
- * The type of the output.
- */
-class SplitDataStream[T](javaStream: SplitJavaStream[T]) {
-
- /**
- * Gets the underlying java DataStream object.
- */
- private[flink] def getJavaStream: SplitJavaStream[T] = javaStream
-
- /**
- * Sets the output names for which the next operator will receive values.
- */
- def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*)
-
- /**
- * Selects all output names from a split data stream.
- */
- def selectAll(): DataStream[T] = javaStream.selectAll()
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
deleted file mode 100644
index cac2927..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.streaming
-
-import org.apache.flink.api.common.functions.JoinFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.TemporalOperator
-import scala.reflect.ClassTag
-import org.apache.commons.lang.Validate
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
-import org.apache.flink.streaming.api.function.co.CrossWindowFunction
-import org.apache.flink.api.common.functions.CrossFunction
-import org.apache.flink.api.scala.typeutils.CaseClassSerializer
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
-import org.apache.flink.api.scala.streaming.StreamingConversions._
-
-class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
- TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
-
- override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, I2] = {
-
- val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this,
- (l: I1, r: I2) => (l, r))
-
- val returnType = new CaseClassTypeInfo[(I1, I2)](
-
- classOf[(I1, I2)], Seq(input1.getType, input2.getType), Array("_1", "_2")) {
-
- override def createSerializer: TypeSerializer[(I1, I2)] = {
- val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
- for (i <- 0 until getArity) {
- fieldSerializers(i) = types(i).createSerializer
- }
-
- new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
- override def createInstance(fields: Array[AnyRef]) = {
- (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
- }
- }
- }
- }
-
- val javaStream = input1.connect(input2).addGeneralWindowCombine(
- crossWindowFunction,
- returnType, windowSize,
- slideInterval, timeStamp1, timeStamp2);
-
- new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream)
- }
-}
-object StreamCrossOperator {
-
- private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2],
- javaStream: JavaStream[(I1, I2)]) extends
- DataStream[(I1, I2)](javaStream) {
-
- /**
- * Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf
- * call will be emitted.
- *
- */
- def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
-
- val invokable = new CoWindowInvokable[I1, I2, R](
- clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
- op.timeStamp2)
-
- javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
- invokable)
-
- javaStream.setType(implicitly[TypeInformation[R]])
- }
- }
-
- private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2],
- crossFunction: (I1, I2) => R):
- CrossWindowFunction[I1, I2, R] = {
- Validate.notNull(crossFunction, "Join function must not be null.")
-
- val crossFun = new CrossFunction[I1, I2, R] {
- val cleanFun = op.input1.clean(crossFunction)
-
- override def cross(first: I1, second: I2): R = {
- cleanFun(first, second)
- }
- }
-
- new CrossWindowFunction[I1, I2, R](crossFun)
- }
-
-}