flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] flink git commit: [FLINK-7660] Support sideOutput in ProcessAllWindowFunction
Date Thu, 12 Oct 2017 09:13:52 GMT
Repository: flink
Updated Branches:
  refs/heads/master 89de78c72 -> 4f8d01fba


[FLINK-7660] Support sideOutput in ProcessAllWindowFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39682c45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39682c45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39682c45

Branch: refs/heads/master
Commit: 39682c456a211a773014474e696babff898a76fe
Parents: 89de78c
Author: Bowen Li <bowenli86@gmail.com>
Authored: Wed Sep 27 23:09:20 2017 -0700
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Oct 12 11:12:33 2017 +0200

----------------------------------------------------------------------
 .../FoldApplyProcessAllWindowFunction.java      |  7 +--
 .../InternalProcessApplyAllWindowContext.java   | 13 ++++--
 .../windowing/ProcessAllWindowFunction.java     |  9 ++++
 .../ReduceApplyProcessAllWindowFunction.java    |  6 +--
 .../operators/windowing/WindowOperator.java     |  1 +
 .../InternalProcessAllWindowContext.java        |  6 +++
 .../function/ProcessAllWindowFunction.scala     |  6 +++
 .../ScalaProcessWindowFunctionWrapper.scala     |  4 ++
 .../streaming/api/scala/SideOutputITCase.scala  | 45 ++++++++++++++++++++
 .../streaming/runtime/SideOutputITCase.java     | 37 ++++++++++++++++
 10 files changed, 121 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
index 362956d..591e2af 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -106,8 +106,7 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T,
ACC, R>
 		}
 
 		this.ctx.window = context.window();
-		this.ctx.windowState = context.windowState();
-		this.ctx.globalState = context.globalState();
+		this.ctx.context = context;
 
 		windowFunction.process(ctx, Collections.singletonList(result), out);
 	}
@@ -115,8 +114,7 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T,
ACC, R>
 	@Override
 	public void clear(final Context context) throws Exception {
 		this.ctx.window = context.window();
-		this.ctx.windowState = context.windowState();
-		this.ctx.globalState = context.globalState();
+		this.ctx.context = context;
 		windowFunction.clear(ctx);
 	}
 
@@ -136,5 +134,4 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T,
ACC, R>
 
 		serializedInitialValue = baos.toByteArray();
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
index a27d71b..98557ac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.windowing;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Internal reusable context wrapper.
@@ -34,8 +35,7 @@ public class InternalProcessApplyAllWindowContext<IN, OUT, W extends
Window>
 	extends ProcessAllWindowFunction<IN, OUT, W>.Context {
 
 	W window;
-	KeyedStateStore windowState;
-	KeyedStateStore globalState;
+	ProcessAllWindowFunction.Context context;
 
 	InternalProcessApplyAllWindowContext(ProcessAllWindowFunction<IN, OUT, W> function)
{
 		function.super();
@@ -48,11 +48,16 @@ public class InternalProcessApplyAllWindowContext<IN, OUT, W extends
Window>
 
 	@Override
 	public KeyedStateStore windowState() {
-		return windowState;
+		return context.windowState();
 	}
 
 	@Override
 	public KeyedStateStore globalState() {
-		return globalState;
+		return context.globalState();
+	}
+
+	@Override
+	public <X> void output(OutputTag<X> outputTag, X value) {
+		context.output(outputTag, value);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
index 34a37bf..f27f3c0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Base abstract class for functions that are evaluated over non-keyed windows using a context
@@ -77,5 +78,13 @@ public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window>
extend
 		 * State accessor for per-key global state.
 		 */
 		public abstract KeyedStateStore globalState();
+
+		/**
+		 * Emits a record to the side output identified by the {@link OutputTag}.
+		 *
+		 * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
+		 * @param value The record to emit.
+		 */
+		public abstract <X> void output(OutputTag<X> outputTag, X value);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
index 108ba9e..ee8328a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
@@ -60,8 +60,7 @@ public class ReduceApplyProcessAllWindowFunction<W extends Window, T,
R> extends
 		}
 
 		this.ctx.window = context.window();
-		this.ctx.windowState = context.windowState();
-		this.ctx.globalState = context.globalState();
+		this.ctx.context = context;
 
 		windowFunction.process(ctx, Collections.singletonList(curr), out);
 	}
@@ -69,8 +68,7 @@ public class ReduceApplyProcessAllWindowFunction<W extends Window, T,
R> extends
 	@Override
 	public void clear(final Context context) throws Exception {
 		this.ctx.window = context.window();
-		this.ctx.windowState = context.windowState();
-		this.ctx.globalState = context.globalState();
+		this.ctx.context = context;
 
 		windowFunction.clear(ctx);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index fd90e65..4e75345 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -775,6 +775,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			return WindowOperator.this.getKeyedStateStore();
 		}
 
+		@Override
 		public <X> void output(OutputTag<X> outputTag, X value) {
 			if (outputTag == null) {
 				throw new IllegalArgumentException("OutputTag must not be null.");

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
index 66ec656..f1146b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Internal reusable context wrapper.
@@ -55,4 +56,9 @@ public class InternalProcessAllWindowContext<IN, OUT, W extends Window>
 	public KeyedStateStore globalState() {
 		return internalContext.globalState();
 	}
+
+	@Override
+	public <X> void output(OutputTag<X> outputTag, X value) {
+		internalContext.output(outputTag, value);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
index 49911e4..b91b2a0 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala.function
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.functions.AbstractRichFunction
 import org.apache.flink.api.common.state.KeyedStateStore
+import org.apache.flink.streaming.api.scala.OutputTag
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
@@ -73,6 +74,11 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: Window]
       * State accessor for per-key global state.
       */
     def globalState: KeyedStateStore
+
+    /**
+      * Emits a record to the side output identified by the [[OutputTag]].
+      */
+    def output[X](outputTag: OutputTag[X], value: X)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index 98b050c..9a6156d 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -127,6 +127,8 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
       override def windowState = context.windowState()
 
       override def globalState = context.globalState()
+
+      override def output[X](outputTag: OutputTag[X], value: X) = context.output(outputTag,
value)
     }
     func.process(ctx, elements.asScala, out)
   }
@@ -138,6 +140,8 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
       override def windowState = context.windowState()
 
       override def globalState = context.globalState()
+
+      override def output[X](outputTag: OutputTag[X], value: X) = context.output(outputTag,
value)
     }
     func.clear(ctx)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
index f09323c..8e66171 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
@@ -280,6 +280,51 @@ class SideOutputITCase extends StreamingMultipleProgramsTestBase {
     assertEquals(util.Arrays.asList("sideout-1", "sideout-2", "sideout-5"),
                   sideOutputResultSink.getResult)
   }
+
+  /**
+    * Test ProcessAllWindowFunction side output.
+    */
+  @Test
+  def testProcessAllWindowFunctionSideOutput() {
+    val resultSink = new TestListResultSink[String]
+    val sideOutputResultSink = new TestListResultSink[String]
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4))
+
+
+    val sideOutputTag = OutputTag[String]("side")
+
+    val windowOperator = dataStream
+      .assignTimestampsAndWatermarks(new TestAssigner)
+      .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(1)))
+      .process(new ProcessAllWindowFunction[(String, Int), String, TimeWindow] {
+        override def process(
+                              context: Context,
+                              elements: Iterable[(String, Int)],
+                              out: Collector[String]): Unit = {
+          for (in <- elements) {
+            out.collect(in._1)
+            context.output(sideOutputTag, "sideout-" + in._1)
+          }
+        }
+      })
+
+    windowOperator
+      .getSideOutput(sideOutputTag)
+      .addSink(sideOutputResultSink)
+
+    windowOperator.addSink(resultSink)
+
+    env.execute()
+
+    assertEquals(util.Arrays.asList("1", "2", "5"), resultSink.getResult)
+    assertEquals(util.Arrays.asList("sideout-1", "sideout-2", "sideout-5"),
+      sideOutputResultSink.getResult)
+  }
 }
 
 class TestAssigner extends AssignerWithPunctuatedWatermarks[(String, Int)] {

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index f74f8ff..7f3fe8b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -582,4 +583,40 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase
implemen
 		assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult());
 		assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult());
 	}
+
+	@Test
+	public void testProcessAllWindowFunctionSideOutput() throws Exception {
+		TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+		TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(1);
+		see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		DataStream<Integer> dataStream = see.fromCollection(elements);
+
+		OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
+
+		SingleOutputStreamOperator<Integer> windowOperator = dataStream
+				.assignTimestampsAndWatermarks(new TestWatermarkAssigner())
+				.timeWindowAll(Time.milliseconds(1), Time.milliseconds(1))
+				.process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(Context context, Iterable<Integer> elements, Collector<Integer>
out) throws Exception {
+						for (Integer e : elements) {
+							out.collect(e);
+							context.output(sideOutputTag, "sideout-" + String.valueOf(e));
+						}
+					}
+				});
+
+		windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+		windowOperator.addSink(resultSink);
+		see.execute();
+
+		assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult());
+		assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult());
+	}
 }


Mime
View raw message