flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [07/11] flink git commit: [FLINK-4460] Add proper side output API for Scala API
Date Sat, 18 Mar 2017 07:13:46 GMT
[FLINK-4460] Add proper side output API for Scala API

The Scala side output API uses context bounds to get a TypeInformation
for an OutputTag. This also adds a SideOutputITCase for the Scala API.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/639dee3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/639dee3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/639dee3b

Branch: refs/heads/master
Commit: 639dee3b56c037684410bdb67390d16369fbb52c
Parents: f828657
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Mar 7 11:06:31 2017 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Sat Mar 18 07:44:17 2017 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/scala/DataStream.scala  |   2 +-
 .../flink/streaming/api/scala/OutputTag.scala   |  42 ++++
 .../streaming/api/scala/SideOutputITCase.scala  | 247 +++++++++++++++++++
 3 files changed, 290 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/639dee3b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index c4a38c0..cccd377 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -240,7 +240,7 @@ class DataStream[T](stream: JavaStream[T]) {
   }
 
   @PublicEvolving
-  def getSideOutput[X: OutputTag](tag: OutputTag[X]): DataStream[X] = javaStream match {
+  def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X] = javaStream match
{
     case stream : SingleOutputStreamOperator[X] =>
       asScalaStream(stream.getSideOutput(tag: OutputTag[X]))
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/639dee3b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/OutputTag.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/OutputTag.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/OutputTag.scala
new file mode 100644
index 0000000..416c890
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/OutputTag.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.util.{OutputTag => JOutputTag}
+
+
+/**
+ * An [[OutputTag]] is a typed and named tag to use for tagging side outputs
+ * of an operator.
+ *
+ * Example:
+ * {{{
+ *   val outputTag = OutputTag[String]("late-data")
+ * }}}
+ *
+ * @tparam T the type of elements in the side-output stream.
+ */
+@PublicEvolving
+class OutputTag[T: TypeInformation](
+    id: String) extends JOutputTag[T](id, implicitly[TypeInformation[T]])
+
+object OutputTag {
+  def apply[T: TypeInformation](id: String): OutputTag[T] = new OutputTag(id)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/639dee3b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
new file mode 100644
index 0000000..29bcbcf
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import java.util
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, ProcessFunction}
+import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction, ProcessWindowFunction}
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.streaming.runtime.util.TestListResultSink
+import org.apache.flink.util.Collector
+import org.junit.Assert._
+import org.junit.Test
+
+/**
+ * Integration test for streaming programs using side outputs.
+ */
+class SideOutputITCase extends StreamingMultipleProgramsTestBase {
+
+  /**
+    * Test ProcessFunction side output.
+    */
+  @Test
+  def testProcessFunctionSideOutput() {
+    val sideOutputResultSink = new TestListResultSink[String]
+    val resultSink = new TestListResultSink[Int]
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(3)
+
+    val dataStream = env.fromElements(1, 2, 5, 3, 4)
+
+    val outputTag = OutputTag[String]("side")
+
+    val passThroughtStream = dataStream
+      .process(new ProcessFunction[Int, Int] {
+        override def processElement(
+            value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit
= {
+          out.collect(value)
+          ctx.output(outputTag, "sideout-" + String.valueOf(value))
+        }
+      })
+
+    passThroughtStream.getSideOutput(outputTag).addSink(sideOutputResultSink)
+    passThroughtStream.addSink(resultSink)
+
+    env.execute()
+    
+    assertEquals(
+      util.Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"),
+      sideOutputResultSink.getSortedResult)
+    
+    assertEquals(util.Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult)
+  }
+
+  /**
+   * Test keyed ProcessFunction side output.
+   */
+  @Test
+  def testKeyedProcessFunctionSideOutput() {
+    val sideOutputResultSink = new TestListResultSink[String]
+    val resultSink = new TestListResultSink[Int]
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(3)
+
+    val dataStream = env.fromElements(1, 2, 5, 3, 4)
+
+    val outputTag = OutputTag[String]("side")
+
+    val passThroughtStream = dataStream
+      .keyBy(x => x)
+      .process(new ProcessFunction[Int, Int] {
+        override def processElement(
+            value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit
= {
+          out.collect(value)
+          ctx.output(outputTag, "sideout-" + String.valueOf(value))
+        }
+      })
+
+    passThroughtStream.getSideOutput(outputTag).addSink(sideOutputResultSink)
+    passThroughtStream.addSink(resultSink)
+
+    env.execute()
+
+    assertEquals(
+      util.Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"),
+      sideOutputResultSink.getSortedResult)
+
+    assertEquals(util.Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult)
+  }
+
+  /**
+   * Test ProcessFunction side outputs with wrong [[OutputTag]].
+   */
+  @Test
+  def testProcessFunctionSideOutputWithWrongTag() {
+    val sideOutputResultSink = new TestListResultSink[String]
+    val resultSink = new TestListResultSink[Int]
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(3)
+
+    val dataStream = env.fromElements(1, 2, 5, 3, 4)
+
+    val outputTag = OutputTag[String]("side")
+    val otherOutputTag = OutputTag[String]("other-side")
+
+    val passThroughtStream = dataStream
+      .process(new ProcessFunction[Int, Int] {
+        override def processElement(
+            value: Int,
+            ctx: ProcessFunction[Int, Int]#Context,
+            out: Collector[Int]): Unit = {
+          ctx.output(otherOutputTag, "sideout-" + String.valueOf(value))
+        }
+      })
+
+    passThroughtStream.getSideOutput(outputTag).addSink(sideOutputResultSink)
+
+    env.execute()
+
+    assertTrue(sideOutputResultSink.getSortedResult.isEmpty)
+  }
+
+  /**
+   * Test window late arriving events stream
+   */
+  @Test
+  def testAllWindowLateArrivingEvents() {
+    val resultSink = new TestListResultSink[String]
+    val lateResultSink = new TestListResultSink[(String, Int)]
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4))
+
+
+    val lateDataTag = OutputTag[(String, Int)]("late")
+
+    val windowOperator = dataStream
+      .assignTimestampsAndWatermarks(new TestAssigner)
+      .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(1)))
+      .sideOutputLateData(lateDataTag)
+      .process(new ProcessAllWindowFunction[(String, Int), String, TimeWindow] {
+        override def process(
+            context: Context,
+            elements: Iterable[(String, Int)],
+            out: Collector[String]): Unit = {
+          for (in <- elements) {
+            out.collect(in._1)
+          }
+        }
+      })
+
+    windowOperator
+      .getSideOutput(lateDataTag)
+      .addSink(lateResultSink)
+
+    windowOperator.addSink(resultSink)
+
+    env.execute()
+    
+    assertEquals(util.Arrays.asList("1", "2", "5"), resultSink.getResult)
+    assertEquals(util.Arrays.asList(("3", 3), ("4", 4)), lateResultSink.getResult)
+  }
+
+  /**
+   * Test window late arriving events stream
+   */
+  @Test
+  def testKeyedWindowLateArrivingEvents() {
+    val resultSink = new TestListResultSink[String]
+    val lateResultSink = new TestListResultSink[(String, Int)]
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4))
+
+
+    val lateDataTag = OutputTag[(String, Int)]("late")
+
+    val windowOperator = dataStream
+      .assignTimestampsAndWatermarks(new TestAssigner)
+      .keyBy(i => i._1)
+      .window(TumblingEventTimeWindows.of(Time.milliseconds(1)))
+      .sideOutputLateData(lateDataTag)
+      .process(new ProcessWindowFunction[(String, Int), String, String, TimeWindow] {
+        override def process(
+            key:String,
+            context: Context,
+            elements: Iterable[(String, Int)],
+            out: Collector[String]): Unit = {
+          for (in <- elements) {
+            out.collect(in._1)
+          }
+        }
+      })
+
+    windowOperator
+      .getSideOutput(lateDataTag)
+      .addSink(lateResultSink)
+
+    windowOperator.addSink(resultSink)
+
+    env.execute()
+
+    assertEquals(util.Arrays.asList("1", "2", "5"), resultSink.getResult)
+    assertEquals(util.Arrays.asList(("3", 3), ("4", 4)), lateResultSink.getResult)
+  }
+
+}
+
+class TestAssigner extends AssignerWithPunctuatedWatermarks[(String, Int)] {
+  override def checkAndGetNextWatermark(
+  lastElement: (String, Int),
+  extractedTimestamp: Long): Watermark = new Watermark(extractedTimestamp)
+
+  override def extractTimestamp(
+  element: (String, Int),
+  previousElementTimestamp: Long): Long = element._2.toLong
+}


Mime
View raw message