flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/2] flink git commit: [FLINK-3521] Make Iterable part of method signature for WindowFunction
Date Fri, 26 Feb 2016 23:09:47 GMT
Repository: flink
Updated Branches:
  refs/heads/master 64519e1c1 -> 27b5c49e7


http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala
new file mode 100644
index 0000000..4e77d83
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.function
+
+import java.io.Serializable
+
+import org.apache.flink.annotation.Public
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+/**
+  * Base interface for functions that are evaluated over keyed (grouped) windows.
+  *
+  * @tparam IN The type of the input value.
+  * @tparam OUT The type of the output value.
+  */
+@Public
+trait AllWindowFunction[IN, OUT, W <: Window] extends Function with Serializable {
+
+  /**
+    * Evaluates the window and outputs none or several elements.
+    *
+    * @param window The window that is being evaluated.
+    * @param input  The elements in the window being evaluated.
+    * @param out    A collector for emitting elements.
+    * @throws Exception The function may throw exceptions to fail the program and trigger
recovery.
+    */
+  def apply(window: W, input: Iterable[IN], out: Collector[OUT])
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala
new file mode 100644
index 0000000..67236b7
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.function
+
+import java.io.Serializable
+
+import org.apache.flink.annotation.Public
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+/**
+  * Base interface for functions that are evaluated over keyed (grouped) windows.
+  *
+  * @tparam IN The type of the input value.
+  * @tparam OUT The type of the output value.
+  * @tparam KEY The type of the key.
+  */
+@Public
+trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
+
+  /**
+    * Evaluates the window and outputs none or several elements.
+    *
+    * @param key    The key for which this window is evaluated.
+    * @param window The window that is being evaluated.
+    * @param input  The elements in the window being evaluated.
+    * @param out    A collector for emitting elements.
+    * @throws Exception The function may throw exceptions to fail the program and trigger
recovery.
+    */
+  def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index dcdfa91..f73307c 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.common.state.ReducingStateDescriptor
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, AllWindowFunction}
+import org.apache.flink.streaming.api.scala.function.{WindowFunction, AllWindowFunction}
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows,
TumblingTimeWindows, SlidingTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
@@ -76,7 +76,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
       .windowAll(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
-      .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), TimeWindow]()
{
+      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
         def apply(
             window: TimeWindow,
             values: Iterable[(String, Int)],
@@ -122,7 +122,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
     val window2 = source
       .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
-      .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), TimeWindow]()
{
+      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
       def apply(
                     window: TimeWindow,
                     values: Iterable[(String, Int)],
@@ -173,7 +173,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
       .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .evictor(CountEvictor.of(1000))
-      .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), TimeWindow]()
{
+      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
       def apply(
                     window: TimeWindow,
                     values: Iterable[(String, Int)],
@@ -211,7 +211,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,
-                   values: (String, Int),
+                   values: Iterable[(String, Int)],
                    out: Collector[(String, Int)]) { }
       })
 
@@ -236,7 +236,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,
-                   values: (String, Int),
+                   values: Iterable[(String, Int)],
                    out: Collector[(String, Int)]) { }
       })
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 673d7b3..48ff640 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
 
 import org.apache.flink.api.common.state.{ListStateDescriptor, ReducingStateDescriptor}
 import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction
+import org.apache.flink.streaming.api.scala.function.WindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows,
TumblingTimeWindows, SlidingTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
@@ -65,7 +65,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
     val window2 = source
       .keyBy(0)
       .timeWindow(Time.minutes(1))
-      .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, TimeWindow]()
{
+      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
         def apply(
             key: Tuple,
             window: TimeWindow,
@@ -114,7 +114,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase
{
       .keyBy(0)
       .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
-      .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, TimeWindow]()
{
+      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
       def apply(
                     tuple: Tuple,
                     window: TimeWindow,
@@ -168,7 +168,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase
{
       .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .evictor(CountEvictor.of(1000))
-      .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, TimeWindow]()
{
+      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
       def apply(
                     tuple: Tuple,
                     window: TimeWindow,
@@ -207,7 +207,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase
{
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,
-                   values: (String, Int),
+                   values: Iterable[(String, Int)],
                    out: Collector[(String, Int)]) { }
       })
 
@@ -232,7 +232,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase
{
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,
-                   values: (String, Int),
+                   values: Iterable[(String, Int)],
                    out: Collector[(String, Int)]) { }
       })
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 436dd0d..d18a45e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -105,7 +105,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger
{
 							NUM_ELEMENTS_PER_KEY / 3))
 					.rebalance()
 					.timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS))
-					.apply(new RichAllWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long,
Long, Long, IntType>, TimeWindow>() {
+					.apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long,
Long, IntType>, TimeWindow>() {
 
 						private boolean open = false;
 
@@ -167,7 +167,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger
{
 					.addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY /
3))
 					.rebalance()
 					.timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
-					.apply(new RichAllWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long,
Long, Long, IntType>, TimeWindow>() {
+					.apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long,
Long, IntType>, TimeWindow>() {
 
 						private boolean open = false;
 
@@ -254,13 +254,18 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger
{
 						@Override
 						public void apply(
 								TimeWindow window,
-								Tuple2<Long, IntType> input,
+								Iterable<Tuple2<Long, IntType>> input,
 								Collector<Tuple4<Long, Long, Long, IntType>> out) {
 
 							// validate that the function has been opened properly
 							assertTrue(open);
 
-							out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1));
+							for (Tuple2<Long, IntType> in: input) {
+								out.collect(new Tuple4<>(in.f0,
+										window.getStart(),
+										window.getEnd(),
+										in.f1));
+							}
 						}
 					})
 					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
@@ -323,13 +328,18 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger
{
 						@Override
 						public void apply(
 								TimeWindow window,
-								Tuple2<Long, IntType> input,
+								Iterable<Tuple2<Long, IntType>> input,
 								Collector<Tuple4<Long, Long, Long, IntType>> out) {
 
 							// validate that the function has been opened properly
 							assertTrue(open);
 
-							out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1));
+							for (Tuple2<Long, IntType> in: input) {
+								out.collect(new Tuple4<>(in.f0,
+										window.getStart(),
+										window.getEnd(),
+										in.f1));
+							}
 						}
 					})
 					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 6f178d8..ce705e1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -152,7 +152,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 					.rebalance()
 					.keyBy(0)
 					.timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
-					.apply(new RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long,
Long, Long, IntType>, Tuple, TimeWindow>() {
+					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long,
Long, IntType>, Tuple, TimeWindow>() {
 
 						private boolean open = false;
 
@@ -216,7 +216,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 					.rebalance()
 					.keyBy(0)
 					.timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
-					.apply(new RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long,
Long, Long, IntType>, Tuple, TimeWindow>() {
+					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long,
Long, IntType>, Tuple, TimeWindow>() {
 
 						private boolean open = false;
 
@@ -285,7 +285,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 					.rebalance()
 					.keyBy(0)
 					.timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
-					.apply(new RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long,
Long, Long, IntType>, Tuple, TimeWindow>() {
+					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long,
Long, IntType>, Tuple, TimeWindow>() {
 
 						private boolean open = false;
 
@@ -373,13 +373,18 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 						public void apply(
 								Tuple tuple,
 								TimeWindow window,
-								Tuple2<Long, IntType> input,
+								Iterable<Tuple2<Long, IntType>> input,
 								Collector<Tuple4<Long, Long, Long, IntType>> out) {
 
 							// validate that the function has been opened properly
 							assertTrue(open);
 
-							out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1));
+							for (Tuple2<Long, IntType> in: input) {
+								out.collect(new Tuple4<>(in.f0,
+										window.getStart(),
+										window.getEnd(),
+										in.f1));
+							}
 						}
 					})
 					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
@@ -443,13 +448,18 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 						public void apply(
 								Tuple tuple,
 								TimeWindow window,
-								Tuple2<Long, IntType> input,
+								Iterable<Tuple2<Long, IntType>> input,
 								Collector<Tuple4<Long, Long, Long, IntType>> out) {
 
 							// validate that the function has been opened properly
 							assertTrue(open);
 
-							out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1));
+							for (Tuple2<Long, IntType> in: input) {
+								out.collect(new Tuple4<>(in.f0,
+										window.getStart(),
+										window.getEnd(),
+										in.f1));
+							}
 						}
 					})
 					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index eb5ef5a..aa5ff3b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -117,7 +117,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 					.rebalance()
 					.keyBy(0)
 					.timeWindow(Time.of(100, MILLISECONDS))
-					.apply(new RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple2<Long,
IntType>, Tuple, TimeWindow>() {
+					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>,
Tuple, TimeWindow>() {
 
 						private boolean open = false;
 
@@ -175,7 +175,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 					.rebalance()
 					.keyBy(0)
 					.timeWindow(Time.of(150, MILLISECONDS), Time.of(50, MILLISECONDS))
-					.apply(new RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple2<Long,
IntType>, Tuple, TimeWindow>() {
+					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>,
Tuple, TimeWindow>() {
 
 						private boolean open = false;
 


Mime
View raw message