flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [6/6] flink git commit: [FLINK-3413] [streaming] Make implicit conversions from Java DataStream to Scala DataStream explicit
Date Wed, 17 Feb 2016 14:18:06 GMT
[FLINK-3413] [streaming] Make implicit conversions from Java DataStream to Scala DataStream explicit

This also clean up a lot of JavaDocs in various Scala DataStream API classes.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6dda5316
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6dda5316
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6dda5316

Branch: refs/heads/master
Commit: 6dda5316f6ad60cae877f15b24bc056638de7fc8
Parents: bd137ae
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Feb 16 17:36:40 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 17 15:09:13 2016 +0100

----------------------------------------------------------------------
 .../scala/table/ScalaStreamingTranslator.scala  |   4 +-
 .../api/datastream/CoGroupedStreams.java        |   4 +-
 .../api/datastream/ConnectedStreams.java        |  37 ++-
 .../streaming/api/datastream/DataStream.java    |   2 +-
 .../api/functions/co/CoFlatMapFunction.java     |  40 ++-
 .../api/functions/co/CoMapFunction.java         |  33 +-
 .../streaming/api/scala/AllWindowedStream.scala |  28 +-
 .../streaming/api/scala/CoGroupedStreams.scala  |   4 +-
 .../streaming/api/scala/ConnectedStreams.scala  | 333 +++++++++----------
 .../flink/streaming/api/scala/DataStream.scala  |  90 ++---
 .../streaming/api/scala/JoinedStreams.scala     |   8 +-
 .../flink/streaming/api/scala/KeyedStream.scala |  10 +-
 .../flink/streaming/api/scala/SplitStream.scala |   4 +-
 .../api/scala/StreamExecutionEnvironment.scala  |  35 +-
 .../streaming/api/scala/WindowedStream.scala    |  28 +-
 .../flink/streaming/api/scala/package.scala     |  39 ++-
 16 files changed, 391 insertions(+), 308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
index 86b9044..091d893 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.table.JavaStreamingTranslator
 import org.apache.flink.api.table.Table
 import org.apache.flink.api.table.plan._
 import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.streaming.api.scala.{DataStream, javaToScalaStream}
+import org.apache.flink.streaming.api.scala.{DataStream, asScalaStream}
 
 /**
  * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataStream]]s and
@@ -41,7 +41,7 @@ class ScalaStreamingTranslator extends PlanTranslator {
 
   override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): DataStream[O] = {
     // fake it till you make it ...
-    javaToScalaStream(javaTranslator.translate(op))
+    asScalaStream(javaTranslator.translate(op))
   }
 
   override def createTable[A](

http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 9e2bc5d..e921940 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -321,8 +321,8 @@ public class CoGroupedStreams<T1, T2> {
 	private static class UnionTypeInfo<T1, T2> extends TypeInformation<TaggedUnion<T1, T2>> {
 		private static final long serialVersionUID = 1L;
 
-		TypeInformation<T1> oneType;
-		TypeInformation<T2> twoType;
+		private final TypeInformation<T1> oneType;
+		private final TypeInformation<T2> twoType;
 
 		public UnionTypeInfo(TypeInformation<T1> oneType,
 				TypeInformation<T2> twoType) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index b340e6e..9da9e34 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -31,10 +31,21 @@ import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 
+import static java.util.Objects.requireNonNull;
+
 /**
- * {@code ConnectedStreams} represents two connected streams of (possible) different data types. It
- * can be used to apply transformations such as {@link CoMapFunction} on two
- * {@link DataStream DataStreams}
+ * ConnectedStreams represent two connected streams of (possibly) different data types.
+ * Connected streams are useful for cases where operations on one stream directly
+ * affect the operations on the other stream, usually via shared state between the streams.
+ *
+ * <p>An example for the use of connected streams would be to apply rules that change over time
+ * onto another stream. One of the connected streams has the rules, the other stream the
+ * elements to apply the rules to. The operation on the connected stream maintains the 
+ * current set of rules in the state. It may receive either a rule update and update the state
+ * or a data element and apply the rules in the state to the element.
+ *
+ * <p>The connected stream can be conceptually viewed as a union stream of an Either type, that
+ * holds either the first stream's type or the second stream's type.
  * 
  * @param <IN1> Type of the first input data steam.
  * @param <IN2> Type of the second input data stream.
@@ -42,20 +53,14 @@ import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 @Public
 public class ConnectedStreams<IN1, IN2> {
 
-	protected StreamExecutionEnvironment environment;
-	protected DataStream<IN1> inputStream1;
-	protected DataStream<IN2> inputStream2;
+	protected final StreamExecutionEnvironment environment;
+	protected final DataStream<IN1> inputStream1;
+	protected final DataStream<IN2> inputStream2;
 
-	protected ConnectedStreams(StreamExecutionEnvironment env,
-			DataStream<IN1> input1,
-			DataStream<IN2> input2) {
-		this.environment = env;
-		if (input1 != null) {
-			this.inputStream1 = input1;
-		}
-		if (input2 != null) {
-			this.inputStream2 = input2;
-		}
+	protected ConnectedStreams(StreamExecutionEnvironment env, DataStream<IN1> input1, DataStream<IN2> input2) {
+		this.environment = requireNonNull(env);
+		this.inputStream1 = requireNonNull(input1);
+		this.inputStream2 = requireNonNull(input2);
 	}
 
 	public StreamExecutionEnvironment getExecutionEnvironment() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index a44b650..07a91e9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -355,7 +355,7 @@ public class DataStream<T> {
 	 * Note: This method works only on single field keys.
 	 *
 	 * @param partitioner The partitioner to assign partitions to keys.
-	 * @param field The field index on which the DataStream is to partitioned.
+	 * @param field The expression for the field on which the DataStream is to partitioned.
 	 * @return The partitioned DataStream.
 	 */
 	public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, String field) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
index 03ce2f5..3f996f5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
@@ -25,20 +25,44 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.util.Collector;
 
 /**
- * A CoFlatMapFunction represents a FlatMap transformation with two different
- * input types.
+ * A CoFlatMapFunction implements a flat-map transformation over two
+ * connected streams.
+ * 
+ * <p>The same instance of the transformation function is used to transform
+ * both of the connected streams. That way, the stream transformations can
+ * share state.
+ * 
+ * <p>An example for the use of connected streams would be to apply rules that change over time
+ * onto elements of a stream. One of the connected streams has the rules, the other stream the
+ * elements to apply the rules to. The operation on the connected stream maintains the 
+ * current set of rules in the state. It may receive either a rule update (from the first stream)
+ * and update the state, or a data element (from the second stream) and apply the rules in the
+ * state to the element. The result of applying the rules would be emitted.
  *
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Output type.
  */
 @Public
 public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {
 
+	/**
+	 * This method is called for each element in the first of the connected streams.
+	 * 
+	 * @param value The stream element
+	 * @param out The collector to emit resulting elements to
+	 * @throws Exception The function may throw exceptions which cause the streaming program
+	 *                   to fail and go into recovery.
+	 */
 	void flatMap1(IN1 value, Collector<OUT> out) throws Exception;
 
+	/**
+	 * This method is called for each element in the second of the connected streams.
+	 * 
+	 * @param value The stream element
+	 * @param out The collector to emit resulting elements to
+	 * @throws Exception The function may throw exceptions which cause the streaming program
+	 *                   to fail and go into recovery.
+	 */
 	void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
index 6a3b4e0..71fcbc8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
@@ -24,20 +24,37 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.Function;
 
 /**
- * A CoMapFunction represents a Map transformation with two different input
- * types.
+ * A CoFlatMapFunction implements a map() transformation over two
+ * connected streams.
  *
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
+ * <p>The same instance of the transformation function is used to transform
+ * both of the connected streams. That way, the stream transformations can
+ * share state.
+ *
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Output type.
  */
 @Public
 public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
 
+	/**
+	 * This method is called for each element in the first of the connected streams.
+	 *
+	 * @param value The stream element
+	 * @return The resulting element
+	 * @throws Exception The function may throw exceptions which cause the streaming program
+	 *                   to fail and go into recovery.
+	 */
 	OUT map1(IN1 value) throws Exception;
 
+	/**
+	 * This method is called for each element in the second of the connected streams.
+	 *
+	 * @param value The stream element
+	 * @return The resulting element
+	 * @throws Exception The function may throw exceptions which cause the streaming program
+	 *                   to fail and go into recovery.
+	 */
 	OUT map2(IN2 value) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index e8d3e05..a4df980 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -99,7 +99,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    * @return The data stream that is the result of applying the reduce function to the window.
    */
   def reduce(function: ReduceFunction[T]): DataStream[T] = {
-    javaStream.reduce(clean(function))
+    asScalaStream(javaStream.reduce(clean(function)))
   }
 
   /**
@@ -139,13 +139,13 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
   def fold[R: TypeInformation: ClassTag](
       initialValue: R,
       function: FoldFunction[T,R]): DataStream[R] = {
+    
     if (function == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
 
     val resultType : TypeInformation[R] = implicitly[TypeInformation[R]]
-
-    javaStream.fold(initialValue, function, resultType)
+    asScalaStream(javaStream.fold(initialValue, function, resultType))
   }
 
   /**
@@ -182,13 +182,14 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    */
   def apply[R: TypeInformation: ClassTag](
       function: AllWindowFunction[Iterable[T], R, W]): DataStream[R] = {
+    
     val cleanedFunction = clean(function)
     val javaFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] {
       def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
         cleanedFunction(window, elements.asScala, out)
       }
     }
-    javaStream.apply(javaFunction, implicitly[TypeInformation[R]])
+    asScalaStream(javaStream.apply(javaFunction, implicitly[TypeInformation[R]]))
   }
 
   /**
@@ -204,13 +205,14 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    */
   def apply[R: TypeInformation: ClassTag](
       function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+    
     val cleanedFunction = clean(function)
     val applyFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] {
       def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
         cleanedFunction(window, elements.asScala, out)
       }
     }
-    javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
+    asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]]))
   }
 
   /**
@@ -227,7 +229,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
   def apply[R: TypeInformation: ClassTag](
       preAggregator: ReduceFunction[T],
       function: AllWindowFunction[T, R, W]): DataStream[R] = {
-    javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]])
+
+    val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.apply(clean(preAggregator), clean(function), returnType))
   }
 
   /**
@@ -262,7 +266,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
         cleanApply(window, input, out)
       }
     }
-    javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
+    
+    val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.apply(reducer, applyFunction, returnType))
   }
 
   /**
@@ -281,11 +287,12 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
       initialValue: R,
       preAggregator: FoldFunction[T, R],
       function: AllWindowFunction[R, R, W]): DataStream[R] = {
-    javaStream.apply(
+    
+    asScalaStream(javaStream.apply(
       initialValue,
       clean(preAggregator),
       clean(function),
-      implicitly[TypeInformation[R]])
+      implicitly[TypeInformation[R]]))
   }
 
   /**
@@ -322,7 +329,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
         cleanApply(window, input, out)
       }
     }
-    javaStream.apply(initialValue, folder, applyFunction, implicitly[TypeInformation[R]])
+    val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.apply(initialValue, folder, applyFunction, returnType))
   }
 
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
index ce96e4f..7572885 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -278,13 +278,13 @@ object CoGroupedStreams {
 
       val coGroup = new JavaCoGroupedStreams[T1, T2](input1.javaStream, input2.javaStream)
 
-      coGroup
+      asScalaStream(coGroup
         .where(keySelector1)
         .equalTo(keySelector2)
         .window(windowAssigner)
         .trigger(trigger)
         .evictor(evictor)
-        .apply(clean(function), implicitly[TypeInformation[T]])
+        .apply(clean(function), implicitly[TypeInformation[T]]))
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index c3ea144..278090d 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -29,27 +29,40 @@ import org.apache.flink.util.Collector
 import scala.reflect.ClassTag
 
 /**
- * [[ConnectedStreams]] represents two connected streams of (possible) different data types. It
- * can be used to apply transformations such as [[CoMapFunction]] on two
- * [[DataStream]]s.
+ * [[ConnectedStreams]] represents two connected streams of (possibly) different data types.
+ * Connected streams are useful for cases where operations on one stream directly
+ * affect the operations on the other stream, usually via shared state between the streams.
+ * 
+ * An example for the use of connected streams would be to apply rules that change over time
+ * onto another stream. One of the connected streams has the rules, the other stream the
+ * elements to apply the rules to. The operation on the connected stream maintains the 
+ * current set of rules in the state. It may receive either a rule update and update the state
+ * or a data element and apply the rules in the state to the element.
+ * 
+ * The connected stream can be conceptually viewed as a union stream of an Either type, that
+ * holds either the first stream's type or the second stream's type.
  */
 @Public
 class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
 
+  // ------------------------------------------------------
+  //  Transformations
+  // ------------------------------------------------------
+  
   /**
-   * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
-   * the output to a common type. The transformation calls a
- *
-   * @param fun1 for each element of the first input and
-   * @param fun2 for each element of the second input. Each
-   * CoMapFunction call returns exactly one element.
-   *
-   * The CoMapFunction used to jointly transform the two input
-   * DataStreams
-   * @return The transformed { @link DataStream}
+   * Applies a CoMap transformation on the connected streams.
+   * 
+   * The transformation consists of two separate functions, where
+   * the first one is called for each element of the first connected stream,
+   * and the second one is called for each element of the second connected stream.
+   * 
+   * @param fun1 Function called per element of the first input.
+   * @param fun2 Function called per element of the second input.
+   * @return The resulting data stream.
    */
   def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): 
-  DataStream[R] = {
+      DataStream[R] = {
+    
     if (fun1 == null || fun2 == null) {
       throw new NullPointerException("Map function must not be null.")
     }
@@ -64,66 +77,72 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
   }
 
   /**
-   * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
-   * the output to a common type. The transformation calls a
-   * {@link CoMapFunction#map1} for each element of the first input and
-   * {@link CoMapFunction#map2} for each element of the second input. Each
-   * CoMapFunction call returns exactly one element. The user can also extend
-   * {@link RichCoMapFunction} to gain access to other features provided by
-   * the {@link RichFuntion} interface.
+   * Applies a CoMap transformation on these connected streams.
+   * 
+   * The transformation calls [[CoMapFunction#map1]] for each element
+   * in the first stream and [[CoMapFunction#map2]] for each element
+   * of the second stream.
+   * 
+   * On can pass a subclass of [[org.apache.flink.streaming.api.functions.co.RichCoMapFunction]]
+   * to gain access to the [[org.apache.flink.api.common.functions.RuntimeContext]]
+   * and to additional life cycle methods.
    *
    * @param coMapper
-   * The CoMapFunction used to jointly transform the two input
-   * DataStreams
-   * @return The transformed { @link DataStream}
+   *         The CoMapFunction used to transform the two connected streams
+   * @return
+    *        The resulting data stream
    */
-  def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): 
-  DataStream[R] = {
+  def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): DataStream[R] = {
     if (coMapper == null) {
       throw new NullPointerException("Map function must not be null.")
     }
 
     val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
-    javaStream.map(coMapper).returns(outType).asInstanceOf[JavaStream[R]]
+    asScalaStream(javaStream.map(coMapper).returns(outType).asInstanceOf[JavaStream[R]])
   }
 
   /**
-   * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
-   * maps the output to a common type. The transformation calls a
-   * {@link CoFlatMapFunction#flatMap1} for each element of the first input
-   * and {@link CoFlatMapFunction#flatMap2} for each element of the second
-   * input. Each CoFlatMapFunction call returns any number of elements
-   * including none. The user can also extend {@link RichFlatMapFunction} to
-   * gain access to other features provided by the {@link RichFuntion}
-   * interface.
+   * Applies a CoFlatMap transformation on these connected streams.
+   *
+   * The transformation calls [[CoFlatMapFunction#flatMap1]] for each element
+   * in the first stream and [[CoFlatMapFunction#flatMap2]] for each element
+   * of the second stream.
+   *
+   * On can pass a subclass of [[org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction]]
+   * to gain access to the [[org.apache.flink.api.common.functions.RuntimeContext]]
+   * and to additional life cycle methods.
    *
    * @param coFlatMapper
-   * The CoFlatMapFunction used to jointly transform the two input
-   * DataStreams
-   * @return The transformed { @link DataStream}
+   *         The CoFlatMapFunction used to transform the two connected streams
+   * @return
+    *        The resulting data stream.
    */
   def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): 
-  DataStream[R] = {
+          DataStream[R] = {
+    
     if (coFlatMapper == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
     
     val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
-    javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]]
+    asScalaStream(javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]])
   }
 
   /**
-   * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
-   * maps the output to a common type. The transformation calls a
- *
-   * @param fun1 for each element of the first input
-   * and @param fun2 for each element of the second
-   * input. Each CoFlatMapFunction call returns any number of elements
-   * including none.
-   * @return The transformed { @link DataStream}
+   * Applies a CoFlatMap transformation on the connected streams.
+   *
+   * The transformation consists of two separate functions, where
+   * the first one is called for each element of the first connected stream,
+   * and the second one is called for each element of the second connected stream.
+   *
+   * @param fun1 Function called per element of the first input.
+   * @param fun2 Function called per element of the second input.
+   * @return The resulting data stream.
    */
-  def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, 
+  def flatMap[R: TypeInformation: ClassTag](
+      fun1: (IN1, Collector[R]) => Unit, 
       fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
+    
     if (fun1 == null || fun2 == null) {
       throw new NullPointerException("FlatMap functions must not be null.")
     }
@@ -137,114 +156,101 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
   }
 
   /**
-   * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
-   * maps the output to a common type. The transformation calls a
- *
-   * @param fun1 for each element of the first input
-   * and @param fun2 for each element of the second
-   * input. Each CoFlatMapFunction call returns any number of elements
-   * including none.
-   * @return The transformed { @link DataStream}
+   * Applies a CoFlatMap transformation on the connected streams.
+   *
+   * The transformation consists of two separate functions, where
+   * the first one is called for each element of the first connected stream,
+   * and the second one is called for each element of the second connected stream.
+   *
+   * @param fun1 Function called per element of the first input.
+   * @param fun2 Function called per element of the second input.
+   * @return The resulting data stream.
    */
-  def flatMap[R: TypeInformation: ClassTag](fun1: IN1 => TraversableOnce[R],
+  def flatMap[R: TypeInformation: ClassTag](
+      fun1: IN1 => TraversableOnce[R],
       fun2: IN2 => TraversableOnce[R]): DataStream[R] = {
+    
     if (fun1 == null || fun2 == null) {
       throw new NullPointerException("FlatMap functions must not be null.")
     }
     val cleanFun1 = clean(fun1)
     val cleanFun2 = clean(fun2)
+    
     val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
       def flatMap1(value: IN1, out: Collector[R]) = { cleanFun1(value) foreach out.collect }
       def flatMap2(value: IN2, out: Collector[R]) = { cleanFun2(value) foreach out.collect }
     }
+    
     flatMap(flatMapper)
   }
 
+  // ------------------------------------------------------
+  //  grouping and partitioning
+  // ------------------------------------------------------
+  
   /**
-   * GroupBy operation for connected data stream. Groups the elements of
-   * input1 and input2 according to keyPosition1 and keyPosition2. Used for
-   * applying function on grouped data streams for example
-   * {@link ConnectedStreams#reduce}
+   * Keys the two connected streams together. After this operation, all
+   * elements with the same key from both streams will be sent to the
+   * same parallel instance of the transformation functions.
    *
-   * @param keyPosition1
-   * The field used to compute the hashcode of the elements in the
-   * first input stream.
-   * @param keyPosition2
-   * The field used to compute the hashcode of the elements in the
-   * second input stream.
-   * @return @return The transformed { @link ConnectedStreams}
+   * @param keyPosition1 The first stream's key field
+   * @param keyPosition2 The second stream's key field
+   * @return The key-grouped connected streams
    */
   def keyBy(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
-    javaStream.keyBy(keyPosition1, keyPosition2)
+    asScalaStream(javaStream.keyBy(keyPosition1, keyPosition2))
   }
 
   /**
-   * GroupBy operation for connected data stream. Groups the elements of
-   * input1 and input2 according to keyPositions1 and keyPositions2. Used for
-   * applying function on grouped data streams for example
-   * {@link ConnectedStreams#reduce}
+   * Keys the two connected streams together. After this operation, all
+   * elements with the same key from both streams will be sent to the
+   * same parallel instance of the transformation functions.
    *
-   * @param keyPositions1
-   * The fields used to group the first input stream.
-   * @param keyPositions2
-   * The fields used to group the second input stream.
-   * @return @return The transformed { @link ConnectedStreams}
+   * @param keyPositions1 The first stream's key fields
+   * @param keyPositions2 The second stream's key fields
+   * @return The key-grouped connected streams
    */
-  def keyBy(keyPositions1: Array[Int], keyPositions2: Array[Int]):
-  ConnectedStreams[IN1, IN2] = {
-    javaStream.keyBy(keyPositions1, keyPositions2)
+  def keyBy(keyPositions1: Array[Int], keyPositions2: Array[Int]): ConnectedStreams[IN1, IN2] = {
+    asScalaStream(javaStream.keyBy(keyPositions1, keyPositions2))
   }
 
   /**
-   * GroupBy operation for connected data stream using key expressions. Groups
-   * the elements of input1 and input2 according to field1 and field2. A field
-   * expression is either the name of a public field or a getter method with
-   * parentheses of the {@link DataStream}S underlying type. A dot can be used
-   * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+   * Keys the two connected streams together. After this operation, all
+   * elements with the same key from both streams will be sent to the
+   * same parallel instance of the transformation functions.
    *
-   * @param field1
-   * The grouping expression for the first input
-   * @param field2
-   * The grouping expression for the second input
-   * @return The grouped { @link ConnectedStreams}
+   * @param field1 The first stream's key expression
+   * @param field2 The second stream's key expression
+   * @return The key-grouped connected streams
    */
   def keyBy(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
-    javaStream.keyBy(field1, field2)
+    asScalaStream(javaStream.keyBy(field1, field2))
   }
 
   /**
-   * GroupBy operation for connected data stream using key expressions. Groups
-   * the elements of input1 and input2 according to fields1 and fields2. A
-   * field expression is either the name of a public field or a getter method
-   * with parentheses of the {@link DataStream}S underlying type. A dot can be
-   * used to drill down into objects, as in {@code "field1.getInnerField2()" }
-   * .
+   * Keys the two connected streams together. After this operation, all
+   * elements with the same key from both streams will be sent to the
+   * same parallel instance of the transformation functions.
    *
-   * @param fields1
-   * The grouping expressions for the first input
-   * @param fields2
-   * The grouping expressions for the second input
-   * @return The grouped { @link ConnectedStreams}
+   * @param fields1 The first stream's key expressions
+   * @param fields2 The second stream's key expressions
+   * @return The key-grouped connected streams
    */
-  def keyBy(fields1: Array[String], fields2: Array[String]):
-  ConnectedStreams[IN1, IN2] = {
-    javaStream.keyBy(fields1, fields2)
+  def keyBy(fields1: Array[String], fields2: Array[String]): ConnectedStreams[IN1, IN2] = {
+    asScalaStream(javaStream.keyBy(fields1, fields2))
   }
 
   /**
-   * GroupBy operation for connected data stream. Groups the elements of
-   * input1 and input2 using fun1 and fun2. Used for applying
-   * function on grouped data streams for example
-   * {@link ConnectedStreams#reduce}
+   * Keys the two connected streams together. After this operation, all
+   * elements with the same key from both streams will be sent to the
+   * same parallel instance of the transformation functions.
    *
-   * @param fun1
-   * The function used for grouping the first input
-   * @param fun2
-   * The function used for grouping the second input
-   * @return The grouped { @link ConnectedStreams}
+   * @param fun1 The first stream's key function
+   * @param fun2 The second stream's key function
+   * @return The key-grouped connected streams
    */
   def keyBy[K1: TypeInformation, K2: TypeInformation](fun1: IN1 => K1, fun2: IN2 => K2):
-  ConnectedStreams[IN1, IN2] = {
+      ConnectedStreams[IN1, IN2] = {
 
     val keyType1 = implicitly[TypeInformation[K1]]
     val keyType2 = implicitly[TypeInformation[K2]]
@@ -255,84 +261,74 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
     val keyExtractor1 = new KeySelectorWithType[IN1, K1](cleanFun1, keyType1)
     val keyExtractor2 = new KeySelectorWithType[IN2, K2](cleanFun2, keyType2)
     
-    javaStream.keyBy(keyExtractor1, keyExtractor2)
+    asScalaStream(javaStream.keyBy(keyExtractor1, keyExtractor2))
   }
 
   /**
-   * PartitionBy operation for connected data stream. Partitions the elements of
-   * input1 and input2 according to keyPosition1 and keyPosition2.
+   * Partitions the two connected streams together. After this operation, all
+   * elements with the same partition key from both streams will be sent to the
+   * same parallel instance of the transformation functions.
    *
-   * @param keyPosition1
-   * The field used to compute the hashcode of the elements in the
-   * first input stream.
-   * @param keyPosition2
-   * The field used to compute the hashcode of the elements in the
-   * second input stream.
-   * @return The transformed { @link ConnectedStreams}
+   * @param keyPosition1 The first stream's partition key field
+   * @param keyPosition2 The second stream's partition key field
+   * @return The co-partitioned connected streams
    */
   def partitionByHash(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
-    javaStream.partitionByHash(keyPosition1, keyPosition2)
+    asScalaStream(javaStream.partitionByHash(keyPosition1, keyPosition2))
   }
 
   /**
-   * PartitionBy operation for connected data stream. Partitions the elements of
-   * input1 and input2 according to keyPositions1 and keyPositions2.
+   * Partitions the two connected streams together. After this operation, all
+   * elements with the same partition key from both streams will be sent to the
+   * same parallel instance of the transformation functions.
    *
-   * @param keyPositions1
-   * The fields used to partition the first input stream.
-   * @param keyPositions2
-   * The fields used to partition the second input stream.
-   * @return The transformed { @link ConnectedStreams}
+   * @param keyPositions1 The first stream's partition key fields
+   * @param keyPositions2 The second stream's partition key fields
+   * @return The co-partitioned connected streams
    */
   def partitionByHash(keyPositions1: Array[Int], keyPositions2: Array[Int]):
-  ConnectedStreams[IN1, IN2] = {
-    javaStream.partitionByHash(keyPositions1, keyPositions2)
+      ConnectedStreams[IN1, IN2] = {
+    asScalaStream(javaStream.partitionByHash(keyPositions1, keyPositions2))
   }
 
   /**
-   * PartitionBy operation for connected data stream using key expressions. Partitions
-   * the elements of input1 and input2 according to field1 and field2. A field
-   * expression is either the name of a public field or a getter method with
-   * parentheses of the {@link DataStream}S underlying type. A dot can be used
-   * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+   * Partitions the two connected streams together. After this operation, all
+   * elements with the same partition key from both streams will be sent to the
+   * same parallel instance of the transformation functions.
    *
-   * @param field1
-   * The partitioning expression for the first input
-   * @param field2
-   * The partitioning expression for the second input
-   * @return The grouped { @link ConnectedStreams}
+   * @param field1 The first stream's partition key expression
+   * @param field2 The second stream's partition key expression
+   * @return The co-partitioned connected streams
    */
   def partitionByHash(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
-    javaStream.partitionByHash(field1, field2)
+    asScalaStream(javaStream.partitionByHash(field1, field2))
   }
 
   /**
-   * PartitionBy operation for connected data stream using key expressions. Partitions
-   * the elements of input1 and input2 according to fields1 and fields2.
+   * Partitions the two connected streams together. After this operation, all
+   * elements with the same partition key from both streams will be sent to the
+   * same parallel instance of the transformation functions.
    *
-   * @param fields1
-   * The partitioning expressions for the first input
-   * @param fields2
-   * The partitioning expressions for the second input
-   * @return The partitioned { @link ConnectedStreams}
+   * @param fields1 The first stream's partition key field expressions
+   * @param fields2 The second stream's partition key field expressions
+   * @return The co-partitioned connected streams
    */
-  def partitionByHash(fields1: Array[String], fields2: Array[String]):
-  ConnectedStreams[IN1, IN2] = {
-    javaStream.partitionByHash(fields1, fields2)
+  def partitionByHash(fields1: Array[String], fields2: Array[String]): 
+      ConnectedStreams[IN1, IN2] = {
+    asScalaStream(javaStream.partitionByHash(fields1, fields2))
   }
 
   /**
-   * PartitionBy operation for connected data stream. Partitions the elements of
-   * input1 and input2 using fun1 and fun2.
+   * Partitions the two connected streams together. After this operation, all
+   * elements with the same partition key from both streams will be sent to the
+   * same parallel instance of the transformation functions.
    *
-   * @param fun1
-   * The function used for partitioning the first input
-   * @param fun2
-   * The function used for partitioning the second input
-   * @return The partitioned { @link ConnectedStreams}
+   * @param fun1 The first stream's partition key function
+   * @param fun2 The second stream's partition key function
+   * @return The co-partitioned connected streams
    */
   def partitionByHash[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
-  ConnectedStreams[IN1, IN2] = {
+      ConnectedStreams[IN1, IN2] = {
 
     val cleanFun1 = clean(fun1)
     val cleanFun2 = clean(fun2)
@@ -344,17 +340,16 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
       def getKey(in: IN2) = cleanFun2(in)
     }
 
-    javaStream.partitionByHash(keyExtractor1, keyExtractor2)
+    asScalaStream(javaStream.partitionByHash(keyExtractor1, keyExtractor2))
   }
 
   /**
    * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
-   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
+   * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]
    */
   private[flink] def clean[F <: AnyRef](f: F): F = {
     new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
   }
-
 }
 
 @Internal

http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 0f6ec7d..808f4ea 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -49,10 +49,10 @@ class DataStream[T](stream: JavaStream[T]) {
   def javaStream: JavaStream[T] = stream
 
   /**
-    * Returns the [[StreamExecutionEnvironment]] associated with the current [[DataStream]].
- *
-    * @return associated execution environment
-    */
+   * Returns the [[StreamExecutionEnvironment]] associated with the current [[DataStream]].
+   *
+   * @return associated execution environment
+   */
   def getExecutionEnvironment: StreamExecutionEnvironment =
     new StreamExecutionEnvironment(stream.getExecutionEnvironment)
 
@@ -112,7 +112,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * @return The named operator
    */
   def name(name: String) : DataStream[T] = stream match {
-    case stream : SingleOutputStreamOperator[T,_] => stream.name(name)
+    case stream : SingleOutputStreamOperator[T,_] => asScalaStream(stream.name(name))
     case _ => throw new UnsupportedOperationException("Only supported for operators.")
     this
   }
@@ -131,7 +131,7 @@ class DataStream[T](stream: JavaStream[T]) {
     */
   @PublicEvolving
   def uid(uid: String) : DataStream[T] = javaStream match {
-    case stream : SingleOutputStreamOperator[T,_] => stream.uid(uid)
+    case stream : SingleOutputStreamOperator[T,_] => asScalaStream(stream.uid(uid))
     case _ => throw new UnsupportedOperationException("Only supported for operators.")
     this
   }
@@ -231,7 +231,7 @@ class DataStream[T](stream: JavaStream[T]) {
    *
    */
   def union(dataStreams: DataStream[T]*): DataStream[T] =
-    stream.union(dataStreams.map(_.javaStream): _*)
+    asScalaStream(stream.union(dataStreams.map(_.javaStream): _*))
 
   /**
    * Creates a new ConnectedStreams by connecting
@@ -239,20 +239,20 @@ class DataStream[T](stream: JavaStream[T]) {
    * DataStreams connected using this operators can be used with CoFunctions.
    */
   def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2] =
-    stream.connect(dataStream.javaStream)
+    asScalaStream(stream.connect(dataStream.javaStream))
 
   /**
    * Groups the elements of a DataStream by the given key positions (for tuple/array types) to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
-  def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = stream.keyBy(fields: _*)
+  def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = asScalaStream(stream.keyBy(fields: _*))
 
   /**
    * Groups the elements of a DataStream by the given field expressions to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
   def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple] =
-   stream.keyBy(firstField +: otherFields.toArray: _*)
+    asScalaStream(stream.keyBy(firstField +: otherFields.toArray: _*))
 
   /**
    * Groups the elements of a DataStream by the given K key to
@@ -267,21 +267,22 @@ class DataStream[T](stream: JavaStream[T]) {
       def getKey(in: T) = cleanFun(in)
       override def getProducedType: TypeInformation[K] = keyType
     }
-    new JavaKeyedStream(stream, keyExtractor, keyType)
+    asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType))
   }
 
   /**
    * Partitions the elements of a DataStream by the given key positions (for tuple/array types) to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
-  def partitionByHash(fields: Int*): DataStream[T] = stream.partitionByHash(fields: _*)
+  def partitionByHash(fields: Int*): DataStream[T] = 
+    asScalaStream(stream.partitionByHash(fields: _*))
 
   /**
    * Groups the elements of a DataStream by the given field expressions to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
   def partitionByHash(firstField: String, otherFields: String*): DataStream[T] =
-    stream.partitionByHash(firstField +: otherFields.toArray: _*)
+    asScalaStream(stream.partitionByHash(firstField +: otherFields.toArray: _*))
 
   /**
    * Groups the elements of a DataStream by the given K key to
@@ -294,7 +295,8 @@ class DataStream[T](stream: JavaStream[T]) {
       def getKey(in: T) = cleanFun(in)
       override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
     }
-    stream.partitionByHash(keyExtractor)
+
+    asScalaStream(stream.partitionByHash(keyExtractor))
   }
 
   /**
@@ -305,7 +307,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * Note: This method works only on single field keys.
    */
   def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T] =
-    stream.partitionCustom(partitioner, field)
+    asScalaStream(stream.partitionCustom(partitioner, field))
 
   /**
    * Partitions a POJO DataStream on the specified key fields using a custom partitioner.
@@ -315,7 +317,8 @@ class DataStream[T](stream: JavaStream[T]) {
    * Note: This method works only on single field keys.
    */
   def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String)
-  : DataStream[T] = stream.partitionCustom(partitioner, field)
+        : DataStream[T] =
+    asScalaStream(stream.partitionCustom(partitioner, field))
 
   /**
    * Partitions a DataStream on the key returned by the selector, using a custom partitioner.
@@ -326,20 +329,24 @@ class DataStream[T](stream: JavaStream[T]) {
    * of fields.
    */
   def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K)
-  : DataStream[T] = {
+      : DataStream[T] = {
+    
+    val keyType = implicitly[TypeInformation[K]]
     val cleanFun = clean(fun)
+    
     val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
       def getKey(in: T) = cleanFun(in)
-      override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
+      override def getProducedType(): TypeInformation[K] = keyType
     }
-    stream.partitionCustom(partitioner, keyExtractor)
+
+    asScalaStream(stream.partitionCustom(partitioner, keyExtractor))
   }
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
    * are broad casted to every parallel instance of the next component.
    */
-  def broadcast: DataStream[T] = stream.broadcast()
+  def broadcast: DataStream[T] = asScalaStream(stream.broadcast())
 
   /**
    * Sets the partitioning of the DataStream so that the output values all go to
@@ -347,27 +354,27 @@ class DataStream[T](stream: JavaStream[T]) {
    * since it might cause a serious performance bottleneck in the application.
    */
   @PublicEvolving
-  def global: DataStream[T] = stream.global()
+  def global: DataStream[T] = asScalaStream(stream.global())
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
    * are shuffled to the next component.
    */
   @PublicEvolving
-  def shuffle: DataStream[T] = stream.shuffle()
+  def shuffle: DataStream[T] = asScalaStream(stream.shuffle())
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
    * are forwarded to the local subtask of the next component (whenever
    * possible).
    */
-  def forward: DataStream[T] = stream.forward()
+  def forward: DataStream[T] = asScalaStream(stream.forward())
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
    * are distributed evenly to the next component.
    */
-  def rebalance: DataStream[T] = stream.rebalance()
+  def rebalance: DataStream[T] = asScalaStream(stream.rebalance())
 
   /**
    * Sets the partitioning of the [[DataStream]] so that the output tuples
@@ -387,7 +394,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * downstream operations will have a differing number of inputs from upstream operations.
    */
   @PublicEvolving
-  def rescale: DataStream[T] = stream.rescale()
+  def rescale: DataStream[T] = asScalaStream(stream.rescale())
 
   /**
    * Initiates an iterative part of the program that creates a loop by feeding
@@ -440,13 +447,16 @@ class DataStream[T](stream: JavaStream[T]) {
    *
    */
   @PublicEvolving
-  def iterate[R, F: TypeInformation: ClassTag](stepFunction: ConnectedStreams[T, F] =>
-    (DataStream[F], DataStream[R]), maxWaitTimeMillis:Long): DataStream[R] = {
+  def iterate[R, F: TypeInformation: ClassTag](
+        stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]),
+        maxWaitTimeMillis:Long): DataStream[R] = {
+    
     val feedbackType: TypeInformation[F] = implicitly[TypeInformation[F]]
+    
     val connectedIterativeStream = stream.iterate(maxWaitTimeMillis).
                                    withFeedbackType(feedbackType)
 
-    val (feedback, output) = stepFunction(connectedIterativeStream)
+    val (feedback, output) = stepFunction(asScalaStream(connectedIterativeStream))
     connectedIterativeStream.closeWith(feedback.javaStream)
     output
   }
@@ -475,7 +485,7 @@ class DataStream[T](stream: JavaStream[T]) {
     }
 
     val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
-    stream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]]
+    asScalaStream(stream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]])
   }
 
   /**
@@ -488,7 +498,7 @@ class DataStream[T](stream: JavaStream[T]) {
     }
 
     val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
-    stream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]]
+    asScalaStream(stream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]])
   }
 
   /**
@@ -528,7 +538,7 @@ class DataStream[T](stream: JavaStream[T]) {
     if (filter == null) {
       throw new NullPointerException("Filter function must not be null.")
     }
-    stream.filter(filter)
+    asScalaStream(stream.filter(filter))
   }
 
   /**
@@ -539,10 +549,10 @@ class DataStream[T](stream: JavaStream[T]) {
       throw new NullPointerException("Filter function must not be null.")
     }
     val cleanFun = clean(fun)
-    val filter = new FilterFunction[T] {
+    val filterFun = new FilterFunction[T] {
       def filter(in: T) = cleanFun(in)
     }
-    this.filter(filter)
+    filter(filterFun)
   }
 
   /**
@@ -643,7 +653,7 @@ class DataStream[T](stream: JavaStream[T]) {
    */
   @deprecated
   def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] = {
-    stream.assignTimestamps(clean(extractor))
+    asScalaStream(stream.assignTimestamps(clean(extractor)))
   }
 
   /**
@@ -671,8 +681,8 @@ class DataStream[T](stream: JavaStream[T]) {
   @PublicEvolving
   def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]) 
       : DataStream[T] = {
-    
-    stream.assignTimestampsAndWatermarks(assigner)
+
+    asScalaStream(stream.assignTimestampsAndWatermarks(assigner))
   }
 
   /**
@@ -701,8 +711,8 @@ class DataStream[T](stream: JavaStream[T]) {
   @PublicEvolving
   def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T])
       : DataStream[T] = {
-    
-    stream.assignTimestampsAndWatermarks(assigner)
+
+    asScalaStream(stream.assignTimestampsAndWatermarks(assigner))
   }
 
   /**
@@ -726,7 +736,7 @@ class DataStream[T](stream: JavaStream[T]) {
         cleanExtractor(element)
       }
     }
-    stream.assignTimestampsAndWatermarks(extractorFunction)
+    asScalaStream(stream.assignTimestampsAndWatermarks(extractorFunction))
   }
 
   /**
@@ -735,7 +745,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * OutputSelector. Calling this method on an operator creates a new
    * [[SplitStream]].
    */
-  def split(selector: OutputSelector[T]): SplitStream[T] = stream.split(selector)
+  def split(selector: OutputSelector[T]): SplitStream[T] = asScalaStream(stream.split(selector))
 
   /**
    * Creates a new [[SplitStream]] that contains only the elements satisfying the

http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
index 21c5d84..b111d9a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -269,13 +269,13 @@ object JoinedStreams {
 
       val join = new JavaJoinedStreams[T1, T2](input1.javaStream, input2.javaStream)
 
-      join
+      asScalaStream(join
         .where(keySelector1)
         .equalTo(keySelector2)
         .window(windowAssigner)
         .trigger(trigger)
         .evictor(evictor)
-        .apply(clean(function), implicitly[TypeInformation[T]])
+        .apply(clean(function), implicitly[TypeInformation[T]]))
     }
 
     /**
@@ -286,13 +286,13 @@ object JoinedStreams {
 
       val join = new JavaJoinedStreams[T1, T2](input1.javaStream, input2.javaStream)
 
-      join
+      asScalaStream(join
         .where(keySelector1)
         .equalTo(keySelector2)
         .window(windowAssigner)
         .trigger(trigger)
         .evictor(evictor)
-        .apply(clean(function), implicitly[TypeInformation[T]])
+        .apply(clean(function), implicitly[TypeInformation[T]]))
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 8419f89..b492bf9 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -128,7 +128,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
       throw new NullPointerException("Reduce function must not be null.")
     }
  
-    javaStream.reduce(reducer)
+    asScalaStream(javaStream.reduce(reducer))
   }
 
   /**
@@ -141,7 +141,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
     }
     val cleanFun = clean(fun)
     val reducer = new ReduceFunction[T] {
-      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+      def reduce(v1: T, v2: T) : T = { cleanFun(v1, v2) }
     }
     reduce(reducer)
   }
@@ -152,15 +152,15 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    * aggregate is kept per key.
    */
   def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]): 
-  DataStream[R] = {
+      DataStream[R] = {
     if (folder == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
     
     val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
     
-    javaStream.fold(initialValue, folder).
-      returns(outType).asInstanceOf[JavaStream[R]]
+    asScalaStream(javaStream.fold(initialValue, folder).
+      returns(outType).asInstanceOf[JavaStream[R]])
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
index ac83514..0b9ac69 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
@@ -34,6 +34,6 @@ class SplitStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaS
   /**
    *  Sets the output names for which the next operator will receive values.
    */
-  def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*)
-  
+  def select(outputNames: String*): DataStream[T] = 
+    asScalaStream(javaStream.select(outputNames: _*))
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index a7152e2..4107e4d 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.runtime.state.AbstractStateBackend
-import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode}
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
 import org.apache.flink.streaming.api.functions.source.SourceFunction
@@ -404,7 +403,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     require(data != null, "Data must not be null.")
     val typeInfo = implicitly[TypeInformation[T]]
 
-    javaEnv.fromCollection(scala.collection.JavaConversions.asJavaCollection(data), typeInfo)
+    val collection = scala.collection.JavaConversions.asJavaCollection(data)
+    asScalaStream(javaEnv.fromCollection(collection, typeInfo))
   }
 
   /**
@@ -415,33 +415,32 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   def fromCollection[T: ClassTag : TypeInformation] (data: Iterator[T]): DataStream[T] = {
     val typeInfo = implicitly[TypeInformation[T]]
-    javaEnv.fromCollection(data.asJava, typeInfo)
+    asScalaStream(javaEnv.fromCollection(data.asJava, typeInfo))
   }
 
   /**
    * Creates a DataStream from the given [[SplittableIterator]].
    */
   def fromParallelCollection[T: ClassTag : TypeInformation] (data: SplittableIterator[T]):
-  DataStream[T] = {
+      DataStream[T] = {
     val typeInfo = implicitly[TypeInformation[T]]
-    javaEnv.fromParallelCollection(data, typeInfo)
+    asScalaStream(javaEnv.fromParallelCollection(data, typeInfo))
   }
 
   /**
    * Creates a DataStream that represents the Strings produced by reading the
    * given file line wise. The file will be read with the system's default
    * character set.
-   *
    */
   def readTextFile(filePath: String): DataStream[String] =
-    javaEnv.readTextFile(filePath)
+    asScalaStream(javaEnv.readTextFile(filePath))
 
   /**
    * Creates a data stream that represents the Strings produced by reading the given file
    * line wise. The character set with the given name will be used to read the files.
    */
   def readTextFile(filePath: String, charsetName: String): DataStream[String] =
-    javaEnv.readTextFile(filePath, charsetName)
+    asScalaStream(javaEnv.readTextFile(filePath, charsetName))
 
 
   /**
@@ -449,8 +448,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
    */
   def readFile[T: ClassTag : TypeInformation](inputFormat: FileInputFormat[T], filePath: String):
-    DataStream[T] =
-    javaEnv.readFile(inputFormat, filePath)
+        DataStream[T] =
+    asScalaStream(javaEnv.readFile(inputFormat, filePath))
 
 
   /**
@@ -461,9 +460,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * every 100 milliseconds.
    *
    */
-  def readFileStream(StreamPath: String, intervalMillis: Long = 100, watchType: WatchType = 
-    WatchType.ONLY_NEW_FILES): DataStream[String] =
-    javaEnv.readFileStream(StreamPath, intervalMillis, watchType)
+  def readFileStream(StreamPath: String, intervalMillis: Long = 100, 
+                     watchType: WatchType = WatchType.ONLY_NEW_FILES): DataStream[String] =
+    asScalaStream(javaEnv.readFileStream(StreamPath, intervalMillis, watchType))
 
   /**
    * Creates a new DataStream that contains the strings received infinitely
@@ -473,8 +472,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   @PublicEvolving
   def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0):
-    DataStream[String] =
-    javaEnv.socketTextStream(hostname, port)
+        DataStream[String] =
+    asScalaStream(javaEnv.socketTextStream(hostname, port))
 
   /**
    * Generic method to create an input data stream with a specific input format.
@@ -484,7 +483,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   @PublicEvolving
   def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] =
-    javaEnv.createInput(inputFormat)
+    asScalaStream(javaEnv.createInput(inputFormat))
 
   /**
    * Create a DataStream using a user defined source function for arbitrary
@@ -497,15 +496,15 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
     require(function != null, "Function must not be null.")
+    
     val cleanFun = scalaClean(function)
     val typeInfo = implicitly[TypeInformation[T]]
-    javaEnv.addSource(cleanFun).returns(typeInfo)
+    asScalaStream(javaEnv.addSource(cleanFun).returns(typeInfo))
   }
 
   /**
    * Create a DataStream using a user defined source function for arbitrary
    * source functionality.
-   *
    */
   def addSource[T: ClassTag: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] = {
     require(function != null, "Function must not be null.")

http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 22d24fa..d34dd1b 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -102,7 +102,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
    * @return The data stream that is the result of applying the reduce function to the window.
    */
   def reduce(function: ReduceFunction[T]): DataStream[T] = {
-    javaStream.reduce(clean(function))
+    asScalaStream(javaStream.reduce(clean(function)))
   }
 
   /**
@@ -148,7 +148,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
 
     val resultType : TypeInformation[R] = implicitly[TypeInformation[R]]
 
-    javaStream.fold(initialValue, function, resultType)
+    asScalaStream(javaStream.fold(initialValue, function, resultType))
   }
 
   /**
@@ -185,13 +185,15 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
    */
   def apply[R: TypeInformation: ClassTag](
       function: WindowFunction[Iterable[T], R, K, W]): DataStream[R] = {
+    
     val cleanFunction = clean(function)
     val javaFunction = new WindowFunction[java.lang.Iterable[T], R, K, W] {
       def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]) = {
         cleanFunction.apply(key, window, input.asScala, out)
       }
     }
-    javaStream.apply(javaFunction, implicitly[TypeInformation[R]])
+
+    asScalaStream(javaStream.apply(javaFunction, implicitly[TypeInformation[R]]))
   }
 
   /**
@@ -217,7 +219,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
         cleanedFunction(key, window, elements.asScala, out)
       }
     }
-    javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
+    asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]]))
   }
 
   /**
@@ -234,7 +236,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
   def apply[R: TypeInformation: ClassTag](
       preAggregator: ReduceFunction[T],
       function: WindowFunction[T, R, K, W]): DataStream[R] = {
-    javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]])
+
+    val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.apply(clean(preAggregator), clean(function), resultType))
   }
 
   /**
@@ -251,6 +255,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
   def apply[R: TypeInformation: ClassTag](
       preAggregator: (T, T) => T,
       function: (K, W, T, Collector[R]) => Unit): DataStream[R] = {
+    
     if (function == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
@@ -269,7 +274,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
         cleanApply(key, window, input, out)
       }
     }
-    javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
+    
+    asScalaStream(javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]]))
   }
 
   /**
@@ -288,11 +294,12 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
       initialValue: R,
       foldFunction: FoldFunction[T, R],
       function: WindowFunction[R, R, K, W]): DataStream[R] = {
-    javaStream.apply(
+    
+    asScalaStream(javaStream.apply(
       initialValue,
       clean(foldFunction),
       clean(function),
-      implicitly[TypeInformation[R]])
+      implicitly[TypeInformation[R]]))
   }
 
   /**
@@ -310,6 +317,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
       initialValue: R,
       foldFunction: (R, T) => R,
       function: (K, W, R, Collector[R]) => Unit): DataStream[R] = {
+    
     if (function == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
@@ -328,10 +336,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
         cleanApply(key, window, input, out)
       }
     }
-    javaStream.apply(initialValue, folder, applyFunction, implicitly[TypeInformation[R]])
+    val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.apply(initialValue, folder, applyFunction, resultType))
   }
 
-
   // ------------------------------------------------------------------------
   //  Aggregations on the keyed windows
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6dda5316/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index 26b0265..90f255c 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.api
 
-import _root_.scala.reflect.ClassTag
-import language.experimental.macros
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.{createTuple2TypeInformation => apiTupleCreator}
 import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
@@ -27,25 +25,44 @@ import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
 import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStream }
 import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => ConnectedJavaStreams }
 import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream }
+
 import language.implicitConversions
+import language.experimental.macros
 
 package object scala {
+  
   // We have this here so that we always have generated TypeInformationS when
   // using the Scala API
   implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
 
-  implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
-    new DataStream[R](javaStream)
-    
-  implicit def javaToScalaGroupedStream[R, K](javaStream: KeyedJavaStream[R, K]):
-  KeyedStream[R, K] = new KeyedStream[R, K](javaStream)
+  /**
+   * Converts an [[org.apache.flink.streaming.api.datastream.DataStream]] to a
+   * [[org.apache.flink.streaming.api.scala.DataStream]].
+   */
+  private[flink] def asScalaStream[R](stream: JavaStream[R])
+                                             = new DataStream[R](stream)
 
-  implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitStream[R] =
-    new SplitStream[R](javaStream)
+  /**
+   * Converts an [[org.apache.flink.streaming.api.datastream.KeyedStream]] to a
+   * [[org.apache.flink.streaming.api.scala.KeyedStream]].
+   */
+  private[flink] def asScalaStream[R, K](stream: KeyedJavaStream[R, K])
+                                             = new KeyedStream[R, K](stream)
 
-  implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: ConnectedJavaStreams[IN1, IN2]):
-  ConnectedStreams[IN1, IN2] = new ConnectedStreams[IN1, IN2](javaStream)
+  /**
+   * Converts an [[org.apache.flink.streaming.api.datastream.SplitStream]] to a
+   * [[org.apache.flink.streaming.api.scala.SplitStream]].
+   */
+  private[flink] def asScalaStream[R](stream: SplitJavaStream[R])
+                                             = new SplitStream[R](stream)
+  /**
+   * Converts an [[org.apache.flink.streaming.api.datastream.ConnectedStreams]] to a
+   * [[org.apache.flink.streaming.api.scala.ConnectedStreams]].
+   */
+  private[flink] def asScalaStream[IN1, IN2](stream: ConnectedJavaStreams[IN1, IN2])
+                                             = new ConnectedStreams[IN1, IN2](stream)
 
+  
   private[flink] def fieldNames2Indices(
       typeInfo: TypeInformation[_],
       fields: Array[String]): Array[Int] = {


Mime
View raw message