flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [1/2] flink git commit: [FLINK-2191] Fix inconsistent use of closure cleaner in Scala Streaming
Date Wed, 10 Jun 2015 15:21:11 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.9 1343f26f8 -> f3a96de1e


[FLINK-2191] Fix inconsistent use of closure cleaner in Scala Streaming

The closure cleaner still cannot be disabled for the Timestamp extractor
in Time and for the delta function in Delta (windowing helpers).

Closes #813


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/255c5545
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/255c5545
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/255c5545

Branch: refs/heads/release-0.9
Commit: 255c5545c672af13313b5e3a8cd4ad98ad182a60
Parents: 1343f26
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Jun 9 16:17:48 2015 +0200
Committer: mbalassi <mbalassi@apache.org>
Committed: Wed Jun 10 17:20:36 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  9 +--
 .../environment/StreamExecutionEnvironment.java | 14 ++++-
 .../api/scala/ConnectedDataStream.scala         | 61 +++++++++++++-------
 .../flink/streaming/api/scala/DataStream.scala  | 33 ++++++-----
 .../api/scala/StreamCrossOperator.scala         | 10 ++--
 .../api/scala/StreamExecutionEnvironment.scala  | 25 +++++---
 .../api/scala/StreamJoinOperator.scala          | 16 ++---
 .../streaming/api/scala/TemporalOperator.scala  | 12 +++-
 .../api/scala/WindowedDataStream.scala          | 24 +++++---
 .../streaming/api/scala/windowing/Delta.scala   |  6 +-
 .../streaming/api/scala/windowing/Time.scala    |  6 +-
 11 files changed, 141 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/255c5545/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 5cd3bb5..09f8155 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -37,7 +37,6 @@ import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.io.CsvOutputFormat;
@@ -238,12 +237,8 @@ public class DataStream<OUT> {
 		this.typeInfo = typeInfo;
 	}
 
-	public <F> F clean(F f) {
-		if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
-			ClosureCleaner.clean(f, true);
-		}
-		ClosureCleaner.ensureSerializable(f);
-		return f;
+	protected <F> F clean(F f) {
+		return getExecutionEnvironment().clean(f);
 	}
 
 	public StreamExecutionEnvironment getExecutionEnvironment() {

http://git-wip-us.apache.org/repos/asf/flink/blob/255c5545/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index a715f05..4e29eee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -988,7 +988,7 @@ public abstract class StreamExecutionEnvironment {
 
 		boolean isParallel = function instanceof ParallelSourceFunction;
 
-		ClosureCleaner.clean(function, true);
+		clean(function);
 		StreamOperator<OUT> sourceOperator = new StreamSource<OUT>(function);
 
 		return new DataStreamSource<OUT>(this, sourceName, typeInfo, sourceOperator,
@@ -1182,4 +1182,16 @@ public abstract class StreamExecutionEnvironment {
 		}
 	}
 
+	/**
+	 * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+	 * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
+	 */
+	public <F> F clean(F f) {
+		if (getConfig().isClosureCleanerEnabled()) {
+			ClosureCleaner.clean(f, true);
+		}
+		ClosureCleaner.ensureSerializable(f);
+		return f;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/255c5545/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
index 6fbc73f..a1e9f74 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
@@ -21,15 +21,12 @@ package org.apache.flink.streaming.api.scala
 import java.util
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream,
DataStream => JavaStream}
 import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoReduceFunction,
CoWindowFunction}
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.util.Collector
-import scala.collection.JavaConversions.asScalaBuffer
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
-import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap
-import org.apache.flink.streaming.api.operators.co.CoStreamMap
-import org.apache.flink.streaming.api.operators.co.CoStreamReduce
 
 class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
 
@@ -49,9 +46,11 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2])
{
     if (fun1 == null || fun2 == null) {
       throw new NullPointerException("Map function must not be null.")
     }
+    val cleanFun1 = clean(fun1)
+    val cleanFun2 = clean(fun2)
     val comapper = new CoMapFunction[IN1, IN2, R] {
-      def map1(in1: IN1): R = clean(fun1)(in1)
-      def map2(in2: IN2): R = clean(fun2)(in2)
+      def map1(in1: IN1): R = cleanFun1(in1)
+      def map2(in2: IN2): R = cleanFun2(in2)
     }
 
     map(comapper)
@@ -121,9 +120,11 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2])
{
     if (fun1 == null || fun2 == null) {
       throw new NullPointerException("FlatMap functions must not be null.")
     }
+    val cleanFun1 = clean(fun1)
+    val cleanFun2 = clean(fun2)
     val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
-      def flatMap1(value: IN1, out: Collector[R]): Unit = clean(fun1)(value, out)
-      def flatMap2(value: IN2, out: Collector[R]): Unit = clean(fun2)(value, out)
+      def flatMap1(value: IN1, out: Collector[R]): Unit = cleanFun1(value, out)
+      def flatMap2(value: IN2, out: Collector[R]): Unit = cleanFun2(value, out)
     }
     flatMap(flatMapper)
   }
@@ -143,9 +144,9 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2])
{
     if (fun1 == null || fun2 == null) {
       throw new NullPointerException("FlatMap functions must not be null.")
     }
+    val cleanFun1 = clean(fun1)
+    val cleanFun2 = clean(fun2)
     val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
-      val cleanFun1 = clean(fun1)
-      val cleanFun2 = clean(fun2)
       def flatMap1(value: IN1, out: Collector[R]) = { cleanFun1(value) foreach out.collect
}
       def flatMap2(value: IN2, out: Collector[R]) = { cleanFun2(value) foreach out.collect
}
     }
@@ -238,11 +239,13 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2])
{
   def groupBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 =>
L):
   ConnectedDataStream[IN1, IN2] = {
 
+    val cleanFun1 = clean(fun1)
+    val cleanFun2 = clean(fun2)
     val keyExtractor1 = new KeySelector[IN1, K] {
-      def getKey(in: IN1) = clean(fun1)(in)
+      def getKey(in: IN1) = cleanFun1(in)
     }
     val keyExtractor2 = new KeySelector[IN2, L] {
-      def getKey(in: IN2) = clean(fun2)(in)
+      def getKey(in: IN2) = cleanFun2(in)
     }
 
     javaStream.groupBy(keyExtractor1, keyExtractor2)
@@ -324,11 +327,14 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2])
{
   def partitionByHash[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2
=> L):
   ConnectedDataStream[IN1, IN2] = {
 
+    val cleanFun1 = clean(fun1)
+    val cleanFun2 = clean(fun2)
+
     val keyExtractor1 = new KeySelector[IN1, K] {
-      def getKey(in: IN1) = clean(fun1)(in)
+      def getKey(in: IN1) = cleanFun1(in)
     }
     val keyExtractor2 = new KeySelector[IN2, L] {
-      def getKey(in: IN2) = clean(fun2)(in)
+      def getKey(in: IN2) = cleanFun2(in)
     }
 
     javaStream.partitionByHash(keyExtractor1, keyExtractor2)
@@ -378,11 +384,16 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2])
{
       throw new NullPointerException("Reduce functions must not be null.")
     }
 
+    val cleanReducer1 = clean(reducer1)
+    val cleanReducer2 = clean(reducer2)
+    val cleanMapper1 = clean(mapper1)
+    val cleanMapper2 = clean(mapper2)
+
     val reducer = new CoReduceFunction[IN1, IN2, R] {
-      def reduce1(value1: IN1, value2: IN1): IN1 = clean(reducer1)(value1, value2)
-      def map2(value: IN2): R = clean(mapper2)(value)
-      def reduce2(value1: IN2, value2: IN2): IN2 = clean(reducer2)(value1, value2)
-      def map1(value: IN1): R = clean(mapper1)(value)
+      def reduce1(value1: IN1, value2: IN1): IN1 = cleanReducer1(value1, value2)
+      def reduce2(value1: IN2, value2: IN2): IN2 = cleanReducer2(value1, value2)
+      def map1(value: IN1): R = cleanMapper1(value)
+      def map2(value: IN2): R = cleanMapper2(value)
     }
     reduce(reducer)
   }
@@ -442,9 +453,11 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2])
{
       throw new NullPointerException("CoWindow function must no be null")
     }
 
+    val cleanCoWindower = clean(coWindower)
+
     val coWindowFun = new CoWindowFunction[IN1, IN2, R] {
       def coWindow(first: util.List[IN1], second: util.List[IN2], 
-          out: Collector[R]): Unit = clean(coWindower)(first, second, out)
+          out: Collector[R]): Unit = cleanCoWindower(first.asScala, second.asScala, out)
     }
 
     windowReduce(coWindowFun, windowSize, slideInterval)
@@ -486,4 +499,12 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2])
{
     javaStream.getType2
   }
 
+  /**
+   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
+   */
+  private[flink] def clean[F <: AnyRef](f: F): F = {
+    new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/255c5545/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 5fae85f..0ed028f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.scala
 
 import org.apache.flink.api.common.io.OutputFormat
+import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
 import org.apache.flink.core.fs.{FileSystem, Path}
 
@@ -34,9 +35,8 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink,
GroupedDataStream, SingleOutputStreamOperator}
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.SumFunction
-import org.apache.flink.streaming.api.functions.sink.{FileSinkFunctionByMillis, SinkFunction}
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.operators.{StreamGroupedReduce, StreamReduce}
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy}
 import org.apache.flink.streaming.util.serialization.SerializationSchema
@@ -225,8 +225,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    */
   def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = {
 
+    val cleanFun = clean(fun)
     val keyExtractor = new KeySelector[T, K] {
-      val cleanFun = clean(fun)
       def getKey(in: T) = cleanFun(in)
     }
     javaStream.groupBy(keyExtractor)
@@ -251,8 +251,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    */
   def partitionByHash[K: TypeInformation](fun: T => K): DataStream[T] = {
 
+    val cleanFun = clean(fun)
     val keyExtractor = new KeySelector[T, K] {
-      val cleanFun = clean(fun)
       def getKey(in: T) = cleanFun(in)
     }
     javaStream.partitionByHash(keyExtractor)
@@ -472,8 +472,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (fun == null) {
       throw new NullPointerException("Map function must not be null.")
     }
+    val cleanFun = clean(fun)
     val mapper = new MapFunction[T, R] {
-      val cleanFun = clean(fun)
       def map(in: T): R = cleanFun(in)
     }
     
@@ -513,8 +513,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (fun == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
+    val cleanFun = clean(fun)
     val flatMapper = new FlatMapFunction[T, R] {
-      val cleanFun = clean(fun)
       def flatMap(in: T, out: Collector[R]) { cleanFun(in, out) }
     }
     flatMap(flatMapper)
@@ -528,8 +528,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (fun == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
+    val cleanFun = clean(fun)
     val flatMapper = new FlatMapFunction[T, R] {
-      val cleanFun = clean(fun)
       def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect }
     }
     flatMap(flatMapper)
@@ -555,8 +555,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (fun == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
+    val cleanFun = clean(fun)
     val reducer = new ReduceFunction[T] {
-      val cleanFun = clean(fun)
       def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
     }
     reduce(reducer)
@@ -584,9 +584,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (fun == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
+    val cleanFun = clean(fun)
     val folder = new FoldFunction[T,R] {
-      val cleanFun = clean(fun)
-
       def fold(acc: R, v: T) = {
         cleanFun(acc, v)
       }
@@ -611,8 +610,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (fun == null) {
       throw new NullPointerException("Filter function must not be null.")
     }
+    val cleanFun = clean(fun)
     val filter = new FilterFunction[T] {
-      val cleanFun = clean(fun)
       def filter(in: T) = cleanFun(in)
     }
     this.filter(filter)
@@ -665,8 +664,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (fun == null) {
       throw new NullPointerException("OutputSelector must not be null.")
     }
+    val cleanFun = clean(fun)
     val selector = new OutputSelector[T] {
-      val cleanFun = clean(fun)
       def select(in: T): java.lang.Iterable[String] = {
         cleanFun(in).toIterable.asJava
       }
@@ -786,11 +785,19 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (fun == null) {
       throw new NullPointerException("Sink function must not be null.")
     }
+    val cleanFun = clean(fun)
     val sinkFunction = new SinkFunction[T] {
-      val cleanFun = clean(fun)
       def invoke(in: T) = cleanFun(in)
     }
     this.addSink(sinkFunction)
   }
 
+  /**
+   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
+   */
+  private[flink] def clean[F <: AnyRef](f: F): F = {
+    new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/255c5545/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index 8d98e23..7033218 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -29,7 +29,6 @@ import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, SingleOutputStreamOperator}
 import org.apache.flink.streaming.api.functions.co.CrossWindowFunction
 import org.apache.flink.streaming.api.operators.co.CoStreamWindow
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 
 import scala.reflect.ClassTag
 
@@ -82,8 +81,12 @@ object StreamCrossOperator {
      */
     def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
 
+      val cleanCrossWindowFunction = clean(getCrossWindowFunction(op, fun))
       val operator = new CoStreamWindow[I1, I2, R](
-        clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
+        cleanCrossWindowFunction,
+        op.windowSize,
+        op.slideInterval,
+        op.timeStamp1,
         op.timeStamp2)
 
       javaStream.getExecutionEnvironment().getStreamGraph().setOperator(javaStream.getId(),
@@ -110,9 +113,8 @@ object StreamCrossOperator {
   CrossWindowFunction[I1, I2, R] = {
     require(crossFunction != null, "Join function must not be null.")
 
+    val cleanFun = op.input1.clean(crossFunction)
     val crossFun = new CrossFunction[I1, I2, R] {
-      val cleanFun = op.input1.clean(crossFunction)
-
       override def cross(first: I1, second: I2): R = {
         cleanFun(first, second)
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/255c5545/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index d9e3850..cbdcc7a 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -29,11 +29,13 @@ import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.Wa
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction}
 import org.apache.flink.types.StringValue
-import org.apache.flink.util.{Collector, SplittableIterator}
+import org.apache.flink.util.SplittableIterator
 
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
+import _root_.scala.language.implicitConversions
+
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
@@ -415,7 +417,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T]
= {
     require(function != null, "Function must not be null.")
-    val cleanFun = StreamExecutionEnvironment.clean(function)
+    val cleanFun = scalaClean(function)
     val typeInfo = implicitly[TypeInformation[T]]
     javaEnv.addSource(cleanFun).returns(typeInfo)
   }
@@ -428,7 +430,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   def addSource[T: ClassTag: TypeInformation](function: SourceContext[T] => Unit): DataStream[T]
= {
     require(function != null, "Function must not be null.")
     val sourceFunction = new SourceFunction[T] {
-      val cleanFun = StreamExecutionEnvironment.clean(function)
+      val cleanFun = scalaClean(function)
       override def run(ctx: SourceContext[T]) {
         cleanFun(ctx)
       }
@@ -474,14 +476,21 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   def getStreamGraph = javaEnv.getStreamGraph
 
+  /**
+   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
+   */
+  private[flink] def scalaClean[F <: AnyRef](f: F): F = {
+    if (getConfig.isClosureCleanerEnabled) {
+      ClosureCleaner.clean(f, true)
+    } else {
+      ClosureCleaner.ensureSerializable(f)
+    }
+    f
+  }
 }
 
 object StreamExecutionEnvironment {
-  
-  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F =
{
-    ClosureCleaner.clean(f, checkSerializable)
-    f
-  }
 
   /**
    * Sets the default parallelism that will be used for the local execution

http://git-wip-us.apache.org/repos/asf/flink/blob/255c5545/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index def5679..09329ca 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -31,7 +31,6 @@ import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, SingleOutputStreamOperator}
 import org.apache.flink.streaming.api.functions.co.JoinWindowFunction
 import org.apache.flink.streaming.api.operators.co.CoStreamWindow
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.util.keys.KeySelectorUtil
 
 import scala.Array.canBuildFrom
@@ -86,8 +85,8 @@ object StreamJoinOperator {
      */
     def where[K: TypeInformation](fun: (I1) => K) = {
       val keyType = implicitly[TypeInformation[K]]
+      val cleanFun = op.input1.clean(fun)
       val keyExtractor = new KeySelector[I1, K] {
-        val cleanFun = op.input1.clean(fun)
         def getKey(in: I1) = cleanFun(in)
       }
       new JoinPredicate[I1, I2](op, keyExtractor)
@@ -142,8 +141,8 @@ object StreamJoinOperator {
      */
     def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = {
       val keyType = implicitly[TypeInformation[K]]
+      val cleanFun = op.input1.clean(fun)
       val keyExtractor = new KeySelector[I2, K] {
-        val cleanFun = op.input1.clean(fun)
         def getKey(in: I2) = cleanFun(in)
       }
       finish(keyExtractor)
@@ -194,8 +193,12 @@ object StreamJoinOperator {
      */
     def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
 
+      val cleanFun = clean(getJoinWindowFunction(jp, fun))
       val operator = new CoStreamWindow[I1, I2, R](
-        clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
+        cleanFun,
+        op.windowSize,
+        op.slideInterval,
+        op.timeStamp1,
         op.timeStamp2)
 
       javaStream.getExecutionEnvironment().getStreamGraph().setOperator(javaStream.getId(),
@@ -210,10 +213,9 @@ object StreamJoinOperator {
     joinFunction: (I1, I2) => R) = {
     require(joinFunction != null, "Join function must not be null.")
 
-    val joinFun = new JoinFunction[I1, I2, R] {
-
-      val cleanFun = jp.op.input1.clean(joinFunction)
+    val cleanFun = jp.op.input1.clean(joinFunction)
 
+    val joinFun = new JoinFunction[I1, I2, R] {
       override def join(first: I1, second: I2): R = {
         cleanFun(first, second)
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/255c5545/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
index 8e6dc36..8357c4d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
@@ -18,11 +18,11 @@
 
 package org.apache.flink.streaming.api.scala
 
+import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.streaming.api.datastream.temporal.{ TemporalOperator => JTempOp
}
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
 import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
 import org.apache.flink.streaming.api.windowing.helper.Timestamp
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment._
 
 abstract class TemporalOperator[I1, I2, OP <: TemporalWindow[OP]](
   i1: JavaStream[I1], i2: JavaStream[I2]) extends JTempOp[I1, I2, OP](i1, i2) {
@@ -34,10 +34,18 @@ abstract class TemporalOperator[I1, I2, OP <: TemporalWindow[OP]](
   }
 
   def getTS[R](ts: R => Long): Timestamp[R] = {
+    val cleanFun = clean(ts)
     new Timestamp[R] {
-      val cleanFun = clean(ts, true)
       def getTimestamp(in: R) = cleanFun(in)
     }
   }
 
+  /**
+   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
+   */
+  private[flink] def clean[F <: AnyRef](f: F): F = {
+    new StreamExecutionEnvironment(i1.getExecutionEnvironment).scalaClean(f)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/255c5545/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
index 3173b8d..e1fdb96 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -18,8 +18,10 @@
 
 package org.apache.flink.streaming.api.scala
 
+import org.apache.flink.api.scala.ClosureCleaner
+
 import scala.Array.canBuildFrom
-import scala.collection.JavaConversions.iterableAsScalaIterable
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
@@ -31,7 +33,6 @@ import org.apache.flink.streaming.api.datastream.{WindowedDataStream =>
JavaWStr
 import org.apache.flink.streaming.api.functions.WindowMapFunction
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.SumFunction
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.windowing.StreamWindow
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.util.Collector
@@ -105,8 +106,8 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    */
   def groupBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = {
 
+    val cleanFun = clean(fun)
     val keyExtractor = new KeySelector[T, K] {
-      val cleanFun = clean(fun)
       def getKey(in: T) = cleanFun(in)
     }
     javaStream.groupBy(keyExtractor)
@@ -151,8 +152,8 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
     if (fun == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
+    val cleanFun = clean(fun)
     val reducer = new ReduceFunction[T] {
-      val cleanFun = clean(fun)
       def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
     }
     reduceWindow(reducer)
@@ -181,8 +182,8 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
     if (fun == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
+    val cleanFun = clean(fun)
     val folder = new FoldFunction[T,R] {
-      val cleanFun = clean(fun)
       def fold(acc: R, v: T) = { cleanFun(acc, v) }
     }
     foldWindow(initialValue, folder)
@@ -217,9 +218,9 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
     if (fun == null) {
       throw new NullPointerException("GroupReduce function must not be null.")
     }
+    val cleanFun = clean(fun)
     val reducer = new WindowMapFunction[T, R] {
-      val cleanFun = clean(fun)
-      def mapWindow(in: java.lang.Iterable[T], out: Collector[R]) = { cleanFun(in, out) }
+      def mapWindow(in: java.lang.Iterable[T], out: Collector[R]) = { cleanFun(in.asScala,
out) }
     }
     mapWindow(reducer)
   }
@@ -329,4 +330,13 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    */
   def getType(): TypeInformation[T] = javaStream.getType
 
+  /**
+   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
+   */
+  private[flink] def clean[F <: AnyRef](f: F): F = {
+    new StreamExecutionEnvironment(
+      javaStream.getDiscretizedStream.getExecutionEnvironment).scalaClean(f)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/255c5545/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
index 84286c0..f490726 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.api.scala.windowing
 
+import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta }
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction
 
 object Delta {
@@ -37,8 +37,8 @@ object Delta {
   def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): JavaDelta[T]
= {
     require(deltaFunction != null, "Delta function must not be null")
     val df = new DeltaFunction[T] {
-      val cleanFun = clean(deltaFunction)
-      override def getDelta(first: T, second: T) = cleanFun(first, second)
+      ClosureCleaner.clean(deltaFunction, true)
+      override def getDelta(first: T, second: T) = deltaFunction(first, second)
     }
     JavaDelta.of(threshold, df, initVal)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/255c5545/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
index a935440..5cea95b 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.streaming.api.scala.windowing
 
 import java.util.concurrent.TimeUnit
+import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime }
 
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.windowing.helper.Timestamp
 
 object Time {
@@ -44,8 +44,8 @@ object Time {
   def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): JavaTime[R]
= {
     require(timestamp != null, "Timestamp must not be null.")
     val ts = new Timestamp[R] {
-      val fun = clean(timestamp, true)
-      override def getTimestamp(in: R) = fun(in)
+      ClosureCleaner.clean(timestamp, true)
+      override def getTimestamp(in: R) = timestamp(in)
     }
     JavaTime.of(windowSize, ts, startTime)
   }


Mime
View raw message