Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0EE4918D85 for ; Mon, 5 Oct 2015 14:42:39 +0000 (UTC) Received: (qmail 41338 invoked by uid 500); 5 Oct 2015 14:42:36 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 41276 invoked by uid 500); 5 Oct 2015 14:42:35 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 41258 invoked by uid 99); 5 Oct 2015 14:42:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Oct 2015 14:42:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6D09BE04BE; Mon, 5 Oct 2015 14:42:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.apache.org Date: Mon, 05 Oct 2015 14:42:37 -0000 Message-Id: In-Reply-To: <36465f6d44d147ffa2258efd2f342f51@git.apache.org> References: <36465f6d44d147ffa2258efd2f342f51@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/13] flink git commit: Add Scala API for new Windowing Add Scala API for new Windowing This adds window/timeWindow to KeyedStream along with windowAll/timeWindowAll on DataStream. The added API classes are AllWindowedStream and WindowedStream. This also adds Translations tests similar to those for the Java API: - AllWindowTranslationTest.scala - WindowTranslationTest.scala Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0c9e78f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0c9e78f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0c9e78f Branch: refs/heads/master Commit: d0c9e78fdbcfe60db4bcfc9e6c2d4ba70fa00935 Parents: 9baadfe Author: Aljoscha Krettek Authored: Thu Oct 1 21:23:56 2015 +0200 Committer: Aljoscha Krettek Committed: Mon Oct 5 16:36:35 2015 +0200 ---------------------------------------------------------------------- .../api/datastream/AllWindowedStream.java | 4 + .../api/datastream/WindowedStream.java | 5 + .../streaming/api/scala/AllWindowedStream.scala | 123 ++++++++++++ .../flink/streaming/api/scala/DataStream.scala | 84 +++++++- .../flink/streaming/api/scala/KeyedStream.scala | 91 ++++++++- .../streaming/api/scala/WindowedStream.scala | 126 ++++++++++++ .../api/scala/AllWindowTranslationTest.scala | 192 +++++++++++++++++++ .../StreamingScalaAPICompletenessTest.scala | 24 +++ .../api/scala/WindowTranslationTest.scala | 185 ++++++++++++++++++ 9 files changed, 830 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index e5c7c18..134029f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -228,4 +229,7 @@ public class AllWindowedStream { return null; } + public StreamExecutionEnvironment getExecutionEnvironment() { + return input.getExecutionEnvironment(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 16898dd..41adab5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -299,4 +300,8 @@ public class WindowedStream { return null; } + + public StreamExecutionEnvironment getExecutionEnvironment() { + return input.getExecutionEnvironment(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala new file mode 100644 index 0000000..4f36722 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream} +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction +import org.apache.flink.streaming.api.windowing.evictors.Evictor +import org.apache.flink.streaming.api.windowing.triggers.Trigger +import org.apache.flink.streaming.api.windowing.windows.Window + +import scala.reflect.ClassTag + +/** + * A [[AllWindowedStream]] represents a data stream where the stream of + * elements is split into windows based on a + * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]. Window emission + * is triggered based on a [[Trigger]]. + * + * If an [[Evictor]] is specified it will be + * used to evict elements from the window after + * evaluation was triggered by the [[Trigger]] but before the actual evaluation of the window. + * When using an evictor window performance will degrade significantly, since + * pre-aggregation of window results cannot be used. + * + * Note that the [[AllWindowedStream()]] is purely and API construct, during runtime + * the [[AllWindowedStream()]] will be collapsed together with the + * operation over the window into one single operation. + * + * @tparam T The type of elements in the stream. + * @tparam W The type of [[Window]] that the + * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]] + * assigns the elements to. + */ +class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { + + /** + * Sets the [[Trigger]] that should be used to trigger window emission. + */ + def trigger(trigger: Trigger[_ >: T, _ >: W]): AllWindowedStream[T, W] = { + javaStream.trigger(trigger) + this + } + + /** + * Sets the [[Evictor]] that should be used to evict elements from a window before emission. + * + * Note: When using an evictor window performance will degrade significantly, since + * pre-aggregation of window results cannot be used. + */ + def evictor(evictor: Evictor[_ >: T, _ >: W]): AllWindowedStream[T, W] = { + javaStream.evictor(evictor) + this + } + + // ------------------------------------------------------------------------ + // Operations on the keyed windows + // ------------------------------------------------------------------------ + /** + * Applies a reduce function to the window. The window function is called for each evaluation + * of the window for each key individually. The output of the reduce function is interpreted + * as a regular non-windowed stream. + * + * This window will try and pre-aggregate data as much as the window policies permit. For example, + * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per + * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide + * interval, so a few elements are stored per key (one per slide interval). + * Custom windows may not be able to pre-aggregate, or may need to store extra values in an + * aggregation tree. + * + * @param function The reduce function. + * @return The data stream that is the result of applying the reduce function to the window. + */ + def reduceWindow(function: ReduceFunction[T]): DataStream[T] = { + javaStream.reduceWindow(clean(function)) + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Not that this function requires that all data in the windows is buffered until the window + * is evaluated, as the function provides no means of pre-aggregation. + * + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def apply[R: TypeInformation: ClassTag](function: AllWindowFunction[T, R, W]): DataStream[R] = { + javaStream.apply(clean(function), implicitly[TypeInformation[R]]) + } + + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. + */ + private[flink] def clean[F <: AnyRef](f: F): F = { + new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 07828db..7dfaeef 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -19,6 +19,10 @@ package org.apache.flink.streaming.api.scala import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.{ProcessingTime, EventTime, AbstractTime} +import org.apache.flink.streaming.api.windowing.windows.{Window, TimeWindow} +import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream} import scala.collection.JavaConverters._ import scala.reflect.ClassTag @@ -31,7 +35,7 @@ import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat import org.apache.flink.core.fs.{FileSystem, Path} import org.apache.flink.streaming.api.collector.selector.OutputSelector -import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator, KeyedDataStream} +import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator} import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.windowing.helper.WindowingHelper import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy} @@ -610,6 +614,82 @@ class DataStream[T](javaStream: JavaStream[T]) { javaStream.every(windowingHelper) /** + * Windows this DataStream into tumbling time windows. + * + * This is a shortcut for either `.window(TumblingTimeWindows.of(size))` or + * `.window(TumblingProcessingTimeWindows.of(size))` depending on the time characteristic + * set using + * [[StreamExecutionEnvironment.setStreamTimeCharacteristic]]. + * + * @param size The size of the window. + */ + def timeWindowAll(size: AbstractTime): AllWindowedStream[T, TimeWindow] = { + val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment) + val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic) + + actualSize match { + case t: EventTime => + val assigner = TumblingTimeWindows.of(actualSize.toMilliseconds) + .asInstanceOf[WindowAssigner[T, TimeWindow]] + windowAll(assigner) + case t: ProcessingTime => + val assigner = TumblingProcessingTimeWindows.of(actualSize.toMilliseconds) + .asInstanceOf[WindowAssigner[T, TimeWindow]] + windowAll(assigner) + case _ => throw new RuntimeException("Invalid time: " + actualSize) + } + } + + /** + * Windows this DataStream into sliding time windows. + * + * This is a shortcut for either `.window(SlidingTimeWindows.of(size, slide))` or + * `.window(SlidingProcessingTimeWindows.of(size, slide))` depending on the time characteristic + * set using + * [[StreamExecutionEnvironment.setStreamTimeCharacteristic]]. + * + * @param size The size of the window. + */ + def timeWindowAll(size: AbstractTime, slide: AbstractTime): AllWindowedStream[T, TimeWindow] = { + val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment) + val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic) + val actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic) + + actualSize match { + case t: EventTime => + val assigner = SlidingTimeWindows.of( + actualSize.toMilliseconds, + actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]] + windowAll(assigner) + case t: ProcessingTime => + val assigner = SlidingProcessingTimeWindows.of( + actualSize.toMilliseconds, + actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]] + windowAll(assigner) + case _ => throw new RuntimeException("Invalid time: " + actualSize) + } + } + + /** + * Windows this data stream to a [[AllWindowedStream]], which evaluates windows + * over a key grouped stream. Elements are put into windows by a [[WindowAssigner]]. The grouping + * of elements is done both by key and by window. + * + * A [[org.apache.flink.streaming.api.windowing.triggers.Trigger]] can be defined to specify + * when windows are evaluated. However, `WindowAssigner` have a default `Trigger` + * that is used if a `Trigger` is not specified. + * + * Note: This operation can be inherently non-parallel since all elements have to pass through + * the same operator instance. (Only for special cases, such as aligned time windows is + * it possible to perform this operation in parallel). + * + * @param assigner The `WindowAssigner` that assigns elements to windows. + * @return The trigger windows data stream. + */ + def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W] = { + new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](javaStream, assigner)) + } + /** * Extracts a timestamp from an element and assigns it as the internal timestamp of that element. * The internal timestamps are, for example, used to to event-time window operations. * @@ -780,7 +860,7 @@ class DataStream[T](javaStream: JavaStream[T]) { /** * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning - * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig} + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. */ private[flink] def clean[F <: AnyRef](f: F): F = { new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f) http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index 25244cd..232e4bb 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -18,11 +18,14 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream, DataStream => JavaStream } +import org.apache.flink.streaming.api.datastream.{KeyedStream => KeyedJavaStream, DataStream => JavaStream, WindowedStream => WindowedJavaStream} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.SumAggregator import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator import org.apache.flink.streaming.api.operators.StreamGroupedReduce +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.{ProcessingTime, EventTime, AbstractTime} +import org.apache.flink.streaming.api.windowing.windows.{Window, TimeWindow} import scala.reflect.ClassTag import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.functions.FoldFunction @@ -30,7 +33,91 @@ import org.apache.flink.api.common.functions.ReduceFunction class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T](javaStream) { - + + // ------------------------------------------------------------------------ + // Windowing + // ------------------------------------------------------------------------ + + /** + * Windows this [[KeyedStream]] into tumbling time windows. + * + *

+ * This is a shortcut for either `.window(TumblingTimeWindows.of(size))` or + * `.window(TumblingProcessingTimeWindows.of(size))` depending on the time characteristic + * set using + * [[StreamExecutionEnvironment.setStreamTimeCharacteristic()]] + * + * @param size The size of the window. + */ + def timeWindow(size: AbstractTime): WindowedStream[T, K, TimeWindow] = { + val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment) + val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic) + + actualSize match { + case t: EventTime => + val assigner = TumblingTimeWindows.of(actualSize.toMilliseconds) + .asInstanceOf[WindowAssigner[T, TimeWindow]] + window(assigner) + case t: ProcessingTime => + val assigner = TumblingProcessingTimeWindows.of(actualSize.toMilliseconds) + .asInstanceOf[WindowAssigner[T, TimeWindow]] + window(assigner) + case _ => throw new RuntimeException("Invalid time: " + actualSize) + } + } + + /** + * Windows this [[KeyedStream]] into sliding time windows. + * + *

+ * This is a shortcut for either `.window(SlidingTimeWindows.of(size))` or + * `.window(SlidingProcessingTimeWindows.of(size))` depending on the time characteristic + * set using + * [[StreamExecutionEnvironment.setStreamTimeCharacteristic()]] + * + * @param size The size of the window. + */ + def timeWindow(size: AbstractTime, slide: AbstractTime): WindowedStream[T, K, TimeWindow] = { + val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment) + val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic) + val actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic) + + actualSize match { + case t: EventTime => + val assigner = SlidingTimeWindows.of( + actualSize.toMilliseconds, + actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]] + window(assigner) + case t: ProcessingTime => + val assigner = SlidingProcessingTimeWindows.of( + actualSize.toMilliseconds, + actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]] + window(assigner) + case _ => throw new RuntimeException("Invalid time: " + actualSize) + } + } + + /** + * Windows this data stream to a [[WindowedStream]], which evaluates windows + * over a key grouped stream. Elements are put into windows by a [[WindowAssigner]]. The + * grouping of elements is done both by key and by window. + * + *

+ * A [[org.apache.flink.streaming.api.windowing.triggers.Trigger]] can be defined to specify + * when windows are evaluated. However, `WindowAssigner` have a default `Trigger` + * that is used if a `Trigger` is not specified. + * + * @param assigner The `WindowAssigner` that assigns elements to windows. + * @return The trigger windows data stream. + */ + def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W] = { + new WindowedStream(new WindowedJavaStream[T, K, W](javaStream, assigner)) + } + + // ------------------------------------------------------------------------ + // Non-Windowed aggregation operations + // ------------------------------------------------------------------------ + /** * Creates a new [[DataStream]] by reducing the elements of this DataStream * using an associative reduce function. An independent aggregate is kept per key. http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala new file mode 100644 index 0000000..a688846 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream} +import org.apache.flink.streaming.api.functions.windowing.WindowFunction +import org.apache.flink.streaming.api.windowing.evictors.Evictor +import org.apache.flink.streaming.api.windowing.triggers.Trigger +import org.apache.flink.streaming.api.windowing.windows.Window + +import scala.reflect.ClassTag + +/** + * A [[WindowedStream]] represents a data stream where elements are grouped by + * key, and for each key, the stream of elements is split into windows based on a + * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]. Window emission + * is triggered based on a [[Trigger]]. + * + * The windows are conceptually evaluated for each key individually, meaning windows can trigger at + * different points for each key. + * + * If an [[org.apache.flink.streaming.api.windowing.evictors.Evictor]] is specified it will + * be used to evict elements from the window after evaluation was triggered by the [[Trigger]] + * but before the actual evaluation of the window. When using an evictor window performance will + * degrade significantly, since pre-aggregation of window results cannot be used. + * + * Note that the [[WindowedStream]] is purely and API construct, during runtime + * the [[WindowedStream]] will be collapsed together with the + * [[KeyedStream]] and the operation over the window into one single operation. + * + * @tparam T The type of elements in the stream. + * @tparam K The type of the key by which elements are grouped. + * @tparam W The type of [[Window]] that the + * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]] + * assigns the elements to. + */ +class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { + + /** + * Sets the [[Trigger]] that should be used to trigger window emission. + */ + def trigger(trigger: Trigger[_ >: T, _ >: W]): WindowedStream[T, K, W] = { + javaStream.trigger(trigger) + this + } + + /** + * Sets the [[Evictor]] that should be used to evict elements from a window before emission. + * + * Note: When using an evictor window performance will degrade significantly, since + * pre-aggregation of window results cannot be used. + */ + def evictor(evictor: Evictor[_ >: T, _ >: W]): WindowedStream[T, K, W] = { + javaStream.evictor(evictor) + this + } + + // ------------------------------------------------------------------------ + // Operations on the keyed windows + // ------------------------------------------------------------------------ + /** + * Applies a reduce function to the window. The window function is called for each evaluation + * of the window for each key individually. The output of the reduce function is interpreted + * as a regular non-windowed stream. + * + * This window will try and pre-aggregate data as much as the window policies permit. For example, + * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per + * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide + * interval, so a few elements are stored per key (one per slide interval). + * Custom windows may not be able to pre-aggregate, or may need to store extra values in an + * aggregation tree. + * + * @param function The reduce function. + * @return The data stream that is the result of applying the reduce function to the window. + */ + def reduceWindow(function: ReduceFunction[T]): DataStream[T] = { + javaStream.reduceWindow(clean(function)) + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Not that this function requires that all data in the windows is buffered until the window + * is evaluated, as the function provides no means of pre-aggregation. + * + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def apply[R: TypeInformation: ClassTag](function: WindowFunction[T, R, K, W]): DataStream[R] = { + javaStream.apply(clean(function), implicitly[TypeInformation[R]]) + } + + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. + */ + private[flink] def clean[F <: AnyRef](f: F): F = { + new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala new file mode 100644 index 0000000..35c7fcc --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + + +import org.apache.flink.api.common.functions.RichReduceFunction +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} +import org.apache.flink.streaming.api.transformations.OneInputTransformation +import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, SlidingProcessingTimeWindows} +import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor} +import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, WatermarkTrigger, CountTrigger} +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer} +import org.apache.flink.streaming.runtime.operators.windowing._ +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.util.Collector + +import org.junit.Assert._ +import org.junit.{Ignore, Test} + +class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { + + /** + * These tests ensure that the fast aligned time windows operator is used if the + * conditions are right. + * + * TODO: update once we have optimized aligned time windows operator for all-windows + */ + @Ignore + @Test + def testFastTimeWindows(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val reducer = new DummyReducer + + val window1 = source + .windowAll(SlidingProcessingTimeWindows.of(1000, 100)) + .reduceWindow(reducer) + + val transform1 = window1.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator1 = transform1.getOperator + + assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]]) + + val window2 = source + .keyBy(0) + .window(SlidingProcessingTimeWindows.of(1000, 100)) + .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + def apply( + tuple: Tuple, + window: TimeWindow, + values: java.lang.Iterable[(String, Int)], + out: Collector[(String, Int)]) { } + }) + + val transform2 = window2.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator2 = transform2.getOperator + + assertTrue(operator2.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]]) + } + + @Test + def testNonEvicting(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val reducer = new DummyReducer + + val window1 = source + .windowAll(SlidingProcessingTimeWindows.of(1000, 100)) + .trigger(CountTrigger.of(100)) + .reduceWindow(reducer) + + val transform1 = window1.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator1 = transform1.getOperator + + assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]]) + val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]] + assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows]) + assertTrue( + winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]]) + + + val window2 = source + .windowAll(TumblingProcessingTimeWindows.of(1000)) + .trigger(CountTrigger.of(100)) + .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { + def apply( + window: TimeWindow, + values: java.lang.Iterable[(String, Int)], + out: Collector[(String, Int)]) { } + }) + + val transform2 = window2.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator2 = transform2.getOperator + + assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]]) + val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]] + assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) + } + + @Test + def testEvicting(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val reducer = new DummyReducer + + val window1 = source + .windowAll(SlidingProcessingTimeWindows.of(1000, 100)) + .evictor(TimeEvictor.of(1000)) + .reduceWindow(reducer) + + val transform1 = window1.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator1 = transform1.getOperator + + assertTrue(operator1.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]) + val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]] + assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]]) + assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows]) + assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) + + + val window2 = source + .windowAll(TumblingProcessingTimeWindows.of(1000)) + .trigger(CountTrigger.of(100)) + .evictor(CountEvictor.of(1000)) + .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { + def apply( + window: TimeWindow, + values: java.lang.Iterable[(String, Int)], + out: Collector[(String, Int)]) { } + }) + + val transform2 = window2.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator2 = transform2.getOperator + + assertTrue(operator2.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]) + val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]] + assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]]) + assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) + } +} + +// ------------------------------------------------------------------------ +// UDFs +// ------------------------------------------------------------------------ + +class DummyReducer extends RichReduceFunction[(String, Int)] { + def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = { + value1 + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala index 66fe197..6ecdb85 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala @@ -44,6 +44,9 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { "org.apache.flink.streaming.api.datastream.DataStream.getTransformation", "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.copy", "org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment", + "org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment", + "org.apache.flink.streaming.api.datastream.ConnectedStreams.getFirstInput", + "org.apache.flink.streaming.api.datastream.ConnectedStreams.getSecondInput", "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType1", "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2", "org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine", @@ -51,6 +54,12 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { "org.apache.flink.streaming.api.datastream.WindowedDataStream.getType", "org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig", + "org.apache.flink.streaming.api.datastream.WindowedStream.getExecutionEnvironment", + "org.apache.flink.streaming.api.datastream.AllWindowedStream.getExecutionEnvironment", + + "org.apache.flink.streaming.api.datastream.KeyedStream.transform", + "org.apache.flink.streaming.api.datastream.KeyedStream.getKeySelector", + "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.isChainingEnabled", "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment." + "getStateHandleProvider", @@ -114,6 +123,21 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { classOf[SplitDataStream[_]]) checkMethods( + "WindowedStream", "WindowedStream", + classOf[org.apache.flink.streaming.api.datastream.WindowedStream[_, _, _]], + classOf[WindowedStream[_, _, _]]) + + checkMethods( + "AllWindowedStream", "AllWindowedStream", + classOf[org.apache.flink.streaming.api.datastream.AllWindowedStream[_, _]], + classOf[AllWindowedStream[_, _]]) + + checkMethods( + "KeyedStream", "KeyedStream", + classOf[org.apache.flink.streaming.api.datastream.KeyedStream[_, _]], + classOf[KeyedStream[_, _]]) + + checkMethods( "StreamJoinOperator", "StreamJoinOperator", classOf[org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator[_,_]], classOf[StreamJoinOperator[_,_]]) http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala new file mode 100644 index 0000000..49d0a1a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.functions.windowing.WindowFunction +import org.apache.flink.streaming.api.transformations.OneInputTransformation +import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, SlidingProcessingTimeWindows} +import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor} +import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger} +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer} +import org.apache.flink.streaming.runtime.operators.windowing.{EvictingWindowOperator, WindowOperator, AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator} +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.util.Collector + +import org.junit.Assert._ +import org.junit.Test + +class WindowTranslationTest extends StreamingMultipleProgramsTestBase { + + /** + * These tests ensure that the fast aligned time windows operator is used if the + * conditions are right. + */ + @Test + def testFastTimeWindows(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val reducer = new DummyReducer + + val window1 = source + .keyBy(0) + .window(SlidingProcessingTimeWindows.of(1000, 100)) + .reduceWindow(reducer) + + val transform1 = window1.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator1 = transform1.getOperator + + assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]]) + + val window2 = source + .keyBy(0) + .window(SlidingProcessingTimeWindows.of(1000, 100)) + .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + def apply( + key: Tuple, + window: TimeWindow, + values: java.lang.Iterable[(String, Int)], + out: Collector[(String, Int)]) { } + }) + + val transform2 = window2.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator2 = transform2.getOperator + + assertTrue(operator2.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]]) + } + + @Test + def testNonEvicting(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val reducer = new DummyReducer + + val window1 = source + .keyBy(0) + .window(SlidingProcessingTimeWindows.of(1000, 100)) + .trigger(CountTrigger.of(100)) + .reduceWindow(reducer) + + val transform1 = window1.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator1 = transform1.getOperator + + assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]]) + val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]] + assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows]) + assertTrue( + winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]]) + + + val window2 = source + .keyBy(0) + .window(TumblingProcessingTimeWindows.of(1000)) + .trigger(CountTrigger.of(100)) + .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + def apply( + tuple: Tuple, + window: TimeWindow, + values: java.lang.Iterable[(String, Int)], + out: Collector[(String, Int)]) { } + }) + + val transform2 = window2.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator2 = transform2.getOperator + + assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]]) + val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]] + assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) + } + + @Test + def testEvicting(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val reducer = new DummyReducer + + val window1 = source + .keyBy(0) + .window(SlidingProcessingTimeWindows.of(1000, 100)) + .evictor(TimeEvictor.of(1000)) + .reduceWindow(reducer) + + val transform1 = window1.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator1 = transform1.getOperator + + assertTrue(operator1.isInstanceOf[EvictingWindowOperator[_, _, _, _]]) + val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]] + assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]]) + assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows]) + assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) + + + val window2 = source + .keyBy(0) + .window(TumblingProcessingTimeWindows.of(1000)) + .trigger(CountTrigger.of(100)) + .evictor(CountEvictor.of(1000)) + .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + def apply( + tuple: Tuple, + window: TimeWindow, + values: java.lang.Iterable[(String, Int)], + out: Collector[(String, Int)]) { } + }) + + val transform2 = window2.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator2 = transform2.getOperator + + assertTrue(operator2.isInstanceOf[EvictingWindowOperator[_, _, _, _]]) + val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]] + assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]]) + assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) + } +}