flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [03/27] incubator-flink git commit: [scala] [streaming] Temporal cross operator added
Date Sun, 04 Jan 2015 20:50:53 GMT
[scala] [streaming] Temporal cross operator added


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e37ec793
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e37ec793
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e37ec793

Branch: refs/heads/master
Commit: e37ec793928442ca7ebe08f3edc318999745f226
Parents: 555837c
Author: Gyula Fora <gyfora@apache.org>
Authored: Sat Dec 20 19:34:55 2014 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Fri Jan 2 18:34:38 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  17 ++-
 .../api/datastream/StreamCrossOperator.java     |  59 ++++-------
 .../flink/api/scala/streaming/DataStream.scala  |  25 +++++
 .../api/scala/streaming/FieldsKeySelector.scala |  18 ++++
 .../scala/streaming/StreamCrossOperator.scala   | 103 +++++++++++++++++++
 .../scala/streaming/StreamJoinOperator.scala    |  16 ++-
 6 files changed, 183 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e37ec793/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 2a0b673..c4e3368 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -547,19 +547,16 @@ public class DataStream<OUT> {
 	 * <p>
 	 * This method returns a {@link StreamCrossOperator} on which the
 	 * {@link StreamCrossOperator#onWindow} should be called to define the
-	 * window, and then call
-	 * {@link StreamCrossOperator.CrossWindow#with(org.apache.flink.api.common.functions.CrossFunction)}
-	 * to define a {@link org.apache.flink.api.common.functions.CrossFunction}
-	 * which is called for each pair of crossed elements. The CrossFunction
-	 * returns a exactly one element for each pair of input elements.
+	 * window.
+	 * <p>
+	 * Call {@link StreamCrossOperator.CrossWindow#with(CrossFunction)} to
+	 * define a custom cross function.
 	 * 
 	 * @param dataStreamToCross
 	 *            The other DataStream with which this DataStream is crossed.
 	 * @return A {@link StreamCrossOperator} to continue the definition of the
-	 *         Join transformation.
+	 *         cross transformation.
 	 * 
-	 * @see org.apache.flink.api.common.functions.CrossFunction
-	 * @see DataStream
 	 */
 	public <IN2> StreamCrossOperator<OUT, IN2> cross(DataStream<IN2> dataStreamToCross)
{
 		return new StreamCrossOperator<OUT, IN2>(this, dataStreamToCross);
@@ -574,7 +571,9 @@ public class DataStream<OUT> {
 	 * {@link StreamJoinOperator#onWindow} should be called to define the
 	 * window, and then the {@link StreamJoinOperator.JoinWindow#where} and
 	 * {@link StreamJoinOperator.JoinPredicate#equalTo} can be used to define
-	 * the join keys.
+	 * the join keys.</p> The user can also use the
+	 * {@link StreamJoinOperator.JoinedStream#with(JoinFunction)} to apply
+	 * custom join function.
 	 * 
 	 * @param other
 	 *            The other DataStream with which this DataStream is joined.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e37ec793/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
index c6cba63..2dd7bc0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
@@ -20,12 +20,12 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.CrossOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.function.co.CrossWindowFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
 
 public class StreamCrossOperator<I1, I2> extends
 		TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> {
@@ -36,23 +36,24 @@ public class StreamCrossOperator<I1, I2> extends
 
 	@Override
 	protected CrossWindow<I1, I2> createNextWindowOperator() {
-		return new CrossWindow<I1, I2>(this);
+
+		CrossWindowFunction<I1, I2, Tuple2<I1, I2>> crossWindowFunction = new CrossWindowFunction<I1,
I2, Tuple2<I1, I2>>(
+				input1.clean(new CrossOperator.DefaultCrossFunction<I1, I2>()));
+
+		return new CrossWindow<I1, I2>(this, input1.connect(input2).addGeneralWindowCombine(
+				crossWindowFunction,
+				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), windowSize,
+				slideInterval, timeStamp1, timeStamp2));
 	}
 
-	public static class CrossWindow<I1, I2> {
+	public static class CrossWindow<I1, I2> extends
+			SingleOutputStreamOperator<Tuple2<I1, I2>, CrossWindow<I1, I2>> {
 
 		private StreamCrossOperator<I1, I2> op;
 
-		public CrossWindow(StreamCrossOperator<I1, I2> operator) {
-			this.op = operator;
-		}
-
-		public <F> F clean(F f) {
-			if (op.input1.getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
-				ClosureCleaner.clean(f, true);
-			}
-			ClosureCleaner.ensureSerializable(f);
-			return f;
+		public CrossWindow(StreamCrossOperator<I1, I2> op, DataStream<Tuple2<I1, I2>>
ds) {
+			super(ds);
+			this.op = op;
 		}
 
 		/**
@@ -63,36 +64,20 @@ public class StreamCrossOperator<I1, I2> extends
 		 * @param function
 		 *            The CrossFunction that is called for each pair of crossed
 		 *            elements.
-		 * @return A CrossOperator that represents the crossed result DataStream
+		 * @return The crossed data streams
 		 * 
-		 * @see CrossFunction
-		 * @see DataSet
 		 */
 		public <R> SingleOutputStreamOperator<R, ?> with(CrossFunction<I1, I2, R>
function) {
-			return createCrossOperator(function);
-		}
-
-		/**
-		 * Finalizes a temporal Cross transformation by emitting all pairs in a
-		 * new Tuple2.
-		 * 
-		 * @return A CrossOperator that represents the crossed result DataStream
-		 */
-		public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> withDefault() {
-			return createCrossOperator(new CrossOperator.DefaultCrossFunction<I1, I2>());
-		}
-
-		protected <R> SingleOutputStreamOperator<R, ?> createCrossOperator(
-				CrossFunction<I1, I2, R> function) {
-
 			TypeInformation<R> outTypeInfo = TypeExtractor.getCrossReturnTypes(function,
 					op.input1.getType(), op.input2.getType());
 
-			CrossWindowFunction<I1, I2, R> crossWindowFunction = new CrossWindowFunction<I1,
I2, R>(
-					clean(function));
+			CoWindowInvokable<I1, I2, R> invokable = new CoWindowInvokable<I1, I2, R>(
+					new CrossWindowFunction<I1, I2, R>(clean(function)), op.windowSize,
+					op.slideInterval, op.timeStamp1, op.timeStamp2);
+
+			jobGraphBuilder.setInvokable(id, invokable);
 
-			return op.input1.connect(op.input2).addGeneralWindowCombine(crossWindowFunction,
-					outTypeInfo, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2);
+			return setType(outTypeInfo);
 
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e37ec793/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index 871fede..42ec709 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -458,9 +458,34 @@ class DataStream[T](javaStream: JavaStream[T]) {
     split(selector)
   }
 
+  /**
+   * Initiates a temporal Join transformation that joins the elements of two
+   * data streams on key equality over a specified time window.
+   *
+   * This method returns a StreamJoinOperator on which the
+   * .onWindow(..) should be called to define the
+   * window, and then the .where(..) and .equalTo(..) methods can be used to defin
+   * the join keys.</p> The user can also use the apply method of the returned JoinedStream
+   * to use custom join function.
+   *
+   */
   def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] = new StreamJoinOperator[T,
R](javaStream, stream.getJavaStream)
 
   /**
+   * Initiates a temporal cross transformation that builds all pair
+   * combinations of elements of both DataStreams, i.e., it builds a Cartesian
+   * product.
+   *
+   * This method returns a StreamJoinOperator on which the
+   * .onWindow(..) should be called to define the
+   * window, and then the .where(..) and .equalTo(..) methods can be used to defin
+   * the join keys.</p> The user can also use the apply method of the returned JoinedStream
+   * to use custom join function.
+   *
+   */
+  def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] = new StreamCrossOperator[T,
R](javaStream, stream.getJavaStream)
+
+  /**
    * Writes a DataStream to the standard output stream (stdout). For each
    * element of the DataStream the result of .toString is
    * written.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e37ec793/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
index 4223512..d7c9f96 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.api.scala.streaming
 
 import org.apache.flink.streaming.util.keys.{ FieldsKeySelector => JavaSelector }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e37ec793/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
new file mode 100644
index 0000000..5dfbc3b
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming
+
+import org.apache.flink.api.common.functions.JoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.api.scala.typeutils.CaseClassSerializer
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.streaming.api.datastream.TemporalOperator
+import org.apache.flink.streaming.api.function.co.JoinWindowFunction
+import org.apache.flink.streaming.util.keys.PojoKeySelector
+import scala.reflect.ClassTag
+import org.apache.commons.lang.Validate
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
+import org.apache.flink.streaming.api.function.co.CrossWindowFunction
+import org.apache.flink.api.common.functions.CrossFunction
+
+class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1,
I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
+
+  override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, I2] = {
+
+    val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this, (l: I1, r:
I2) => (l, r))
+
+    val returnType = new CaseClassTypeInfo[(I1, I2)](
+
+      classOf[(I1, I2)], Seq(input1.getType, input2.getType), Array("_1", "_2")) {
+
+      override def createSerializer: TypeSerializer[(I1, I2)] = {
+        val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
+        for (i <- 0 until getArity) {
+          fieldSerializers(i) = types(i).createSerializer
+        }
+
+        new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
+          override def createInstance(fields: Array[AnyRef]) = {
+            (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
+          }
+        }
+      }
+    }
+
+    val javaStream = input1.connect(input2).addGeneralWindowCombine(
+      crossWindowFunction,
+      returnType, windowSize,
+      slideInterval, timeStamp1, timeStamp2);
+
+    new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream)
+  }
+}
+object StreamCrossOperator {
+
+  private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2], javaStream: JavaStream[(I1,
I2)]) extends DataStream[(I1, I2)](javaStream) {
+
+    /**
+     * Sets a wrapper for the crossed elements. For each crossed pair, the result of the
udf call will be emitted.
+     *
+     */
+    def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
+
+      val invokable = new CoWindowInvokable[I1, I2, R](
+        clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
op.timeStamp2)
+
+      javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
invokable)
+
+      new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]]))
+    }
+  }
+
+  private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2], crossFunction:
(I1, I2) => R): CrossWindowFunction[I1, I2, R] = {
+    Validate.notNull(crossFunction, "Join function must not be null.")
+
+    val crossFun = new CrossFunction[I1, I2, R] {
+      val cleanFun = op.input1.clean(crossFunction)
+
+      override def cross(first: I1, second: I2): R = {
+        cleanFun(first, second)
+      }
+    }
+
+    new CrossWindowFunction[I1, I2, R](crossFun)
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e37ec793/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
index 93950a2..fff5e86 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
@@ -42,11 +42,6 @@ class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2])
extends
 
 object StreamJoinOperator {
 
-  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F =
{
-    ClosureCleaner.clean(f, checkSerializable)
-    f
-  }
-
   class JoinWindow[I1, I2](op: StreamJoinOperator[I1, I2]) {
 
     /**
@@ -94,7 +89,7 @@ object StreamJoinOperator {
      * Creates a temporal join transformation by defining the second join key.
      * The returned transformation wrapes each joined element pair in a tuple2:
      * (first, second)
-     * To define a custom wrapping, use JoinedStream.with(...)
+     * To define a custom wrapping, use JoinedStream.apply(...)
      */
     def equalTo(fields: Int*): JoinedStream[I1, I2] = {
       finish(new FieldsKeySelector[I2](fields: _*))
@@ -104,7 +99,7 @@ object StreamJoinOperator {
      * Creates a temporal join transformation by defining the second join key.
      * The returned transformation wrapes each joined element pair in a tuple2:
      * (first, second)
-     * To define a custom wrapping, use JoinedStream.with(...)
+     * To define a custom wrapping, use JoinedStream.apply(...)
      */
     def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = {
       finish(new PojoKeySelector[I2](op.input2.getType(), (firstField +: otherFields): _*))
@@ -114,7 +109,7 @@ object StreamJoinOperator {
      * Creates a temporal join transformation by defining the second join key.
      * The returned transformation wrapes each joined element pair in a tuple2:
      * (first, second)
-     * To define a custom wrapping, use JoinedStream.with(...)
+     * To define a custom wrapping, use JoinedStream.apply(...)
      */
     def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = {
       val keyType = implicitly[TypeInformation[K]]
@@ -159,6 +154,9 @@ object StreamJoinOperator {
 
     private val op = jp.op
 
+    /**
+     * Sets a wrapper for the joined elements. For each joined pair, the result of the udf
call will be emitted.
+     */
     def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
 
       val invokable = new CoWindowInvokable[I1, I2, R](
@@ -175,7 +173,7 @@ object StreamJoinOperator {
 
     val joinFun = new JoinFunction[I1, I2, R] {
 
-      val cleanFun = clean(joinFunction)
+      val cleanFun = jp.op.input1.clean(joinFunction)
 
       override def join(first: I1, second: I2): R = {
         cleanFun(first, second)


Mime
View raw message