flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/6] flink git commit: [FLINK-4957] Provide API for TimelyCoFlatMapFunction
Date Mon, 07 Nov 2016 15:26:29 GMT
Repository: flink
Updated Branches:
  refs/heads/master 718f6e4e3 -> 891950eab


[FLINK-4957] Provide API for TimelyCoFlatMapFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/891950ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/891950ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/891950ea

Branch: refs/heads/master
Commit: 891950eabaaed1fdfc1c0c88806f1125b085c4b6
Parents: 0b873ac
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri Oct 28 14:36:06 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100

----------------------------------------------------------------------
 .../api/datastream/ConnectedStreams.java        | 63 ++++++++++++++++++++
 .../streaming/api/scala/ConnectedStreams.scala  | 30 +++++++++-
 2 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/891950ea/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 50ef95b..dc763cb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -26,9 +27,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+import org.apache.flink.streaming.api.operators.co.CoStreamTimelyFlatMap;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 
 import static java.util.Objects.requireNonNull;
@@ -230,6 +233,66 @@ public class ConnectedStreams<IN1, IN2> {
 		return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
 	}
 
+	/**
+	 * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams,
+	 * thereby creating a transformed output stream.
+	 *
+	 * <p>The function will be called for every element in the streams and can produce
+	 * zero or more output. The function can also query the time and set timers. When
+	 * reacting to the firing of set timers the function can emit yet more elements.
+	 *
+	 * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+	 * can be used to gain access to features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 *
+	 * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element
+	 *                      in the stream.
+	 *
+	 * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}.
+	 *
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <R> SingleOutputStreamOperator<R> flatMap(
+			TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper) {
+
+		TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
+				TimelyCoFlatMapFunction.class, false, true, getType1(), getType2(),
+				Utils.getCallLocationName(), true);
+
+		return flatMap(coFlatMapper, outTypeInfo);
+	}
+
+	/**
+	 * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams,
+	 * thereby creating a transformed output stream.
+	 *
+	 * <p>The function will be called for every element in the streams and can produce
+	 * zero or more output. The function can also query the time and set timers. When
+	 * reacting to the firing of set timers the function can emit yet more elements.
+	 *
+	 * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+	 * can be used to gain access to features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 *
+	 * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element
+	 *                      in the stream.
+	 *
+	 * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}.
+	 *
+	 * @return The transformed {@link DataStream}.
+	 */
+	@Internal
+	public <R> SingleOutputStreamOperator<R> flatMap(
+			TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper,
+			TypeInformation<R> outputType) {
+
+		CoStreamTimelyFlatMap<Object, IN1, IN2, R> operator = new CoStreamTimelyFlatMap<>(
+				inputStream1.clean(coFlatMapper));
+
+		return transform("Co-Flat Map", outputType, operator);
+	}
+
+
 	@PublicEvolving
 	public <R> SingleOutputStreamOperator<R> transform(String functionName,
 			TypeInformation<R> outTypeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/891950ea/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 141625e..50526b5 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream
=> JavaStream}
-import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction}
+import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, TimelyCoFlatMapFunction}
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
 import org.apache.flink.util.Collector
 
@@ -101,6 +101,34 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
   }
 
   /**
+   * Applies the given [[TimelyCoFlatMapFunction]] on the connected input streams,
+   * thereby creating a transformed output stream.
+   *
+   * The function will be called for every element in the streams and can produce
+   * zero or more output. The function can also query the time and set timers. When
+   * reacting to the firing of set timers the function can emit yet more elements.
+   *
+   * A [[org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction]]
+   * can be used to gain access to features provided by the
+   * [[org.apache.flink.api.common.functions.RichFunction]] interface.
+   *
+   * @param coFlatMapper The [[TimelyCoFlatMapFunction]] that is called for each element
+    *                     in the stream.
+    *
+   * @return The transformed { @link DataStream}.
+   */
+  def flatMap[R: TypeInformation](
+      coFlatMapper: TimelyCoFlatMapFunction[IN1, IN2, R]) : DataStream[R] = {
+
+    if (coFlatMapper == null) throw new NullPointerException("FlatMap function must not be
null.")
+
+    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+
+    asScalaStream(javaStream.flatMap(coFlatMapper, outType))
+  }
+
+
+  /**
    * Applies a CoFlatMap transformation on these connected streams.
    *
    * The transformation calls [[CoFlatMapFunction#flatMap1]] for each element


Mime
View raw message