flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject flink git commit: [FLINK-1430] [streaming] Scala API completeness for streaming
Date Tue, 02 Jun 2015 10:03:14 GMT
Repository: flink
Updated Branches:
  refs/heads/master d22e9f860 -> 7571959a1


[FLINK-1430] [streaming] Scala API completeness for streaming

Closes #753


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7571959a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7571959a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7571959a

Branch: refs/heads/master
Commit: 7571959a199954f017130ee7aa793e7250384b9e
Parents: d22e9f8
Author: mbalassi <mbalassi@apache.org>
Authored: Mon Jun 1 16:56:15 2015 +0200
Committer: mbalassi <mbalassi@apache.org>
Committed: Tue Jun 2 12:02:21 2015 +0200

----------------------------------------------------------------------
 .../datastream/temporal/StreamJoinOperator.java |   2 +-
 .../environment/StreamExecutionEnvironment.java | 124 ++++-----
 .../flink-streaming-scala/pom.xml               |   8 +
 .../flink/streaming/api/scala/DataStream.scala  |  45 +++-
 .../api/scala/StreamExecutionEnvironment.scala  | 268 ++++++++++---------
 .../StreamingScalaAPICompletenessTest.scala     | 121 +++++++++
 .../api/scala/ScalaAPICompletenessTest.scala    | 176 ------------
 .../BatchScalaAPICompletenessTest.scala         | 139 ++++++++++
 .../ScalaAPICompletenessTestBase.scala          |  89 ++++++
 9 files changed, 599 insertions(+), 373 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7571959a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
index 43c7c85..e18e14b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
@@ -266,7 +266,7 @@ public class StreamJoinOperator<I1, I2> extends
 		}
 	}
 
-	public static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> getJoinWindowFunction(
+	private static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> getJoinWindowFunction(
 			JoinFunction<I1, I2, OUT> joinFunction, JoinPredicate<I1, I2> predicate) {
 		return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, predicate.keys2, joinFunction);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7571959a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index da1ba73..60b849c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -106,32 +106,6 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Gets the parallelism with which operation are executed by default.
-	 * Operations can individually override this value to use a specific
-	 * parallelism.
-	 *
-	 * @return The parallelism used by operations, unless they override that
-	 * value.
-	 * @deprecated Please use {@link #getParallelism}
-	 */
-	@Deprecated
-	public int getDegreeOfParallelism() {
-		return getParallelism();
-	}
-
-	/**
-	 * Gets the parallelism with which operation are executed by default.
-	 * Operations can individually override this value to use a specific
-	 * parallelism.
-	 *
-	 * @return The parallelism used by operations, unless they override that
-	 * value.
-	 */
-	public int getParallelism() {
-		return config.getParallelism();
-	}
-
-	/**
 	 * Sets the parallelism for operations executed through this environment.
 	 * Setting a parallelism of x here will cause all operators (such as map,
 	 * batchReduce) to run with x parallel instances. This method overrides the
@@ -151,6 +125,20 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Gets the parallelism with which operation are executed by default.
+	 * Operations can individually override this value to use a specific
+	 * parallelism.
+	 *
+	 * @return The parallelism used by operations, unless they override that
+	 * value.
+	 * @deprecated Please use {@link #getParallelism}
+	 */
+	@Deprecated
+	public int getDegreeOfParallelism() {
+		return getParallelism();
+	}
+
+	/**
 	 * Sets the parallelism for operations executed through this environment.
 	 * Setting a parallelism of x here will cause all operators (such as map,
 	 * batchReduce) to run with x parallel instances. This method overrides the
@@ -172,6 +160,18 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Gets the parallelism with which operation are executed by default.
+	 * Operations can individually override this value to use a specific
+	 * parallelism.
+	 *
+	 * @return The parallelism used by operations, unless they override that
+	 * value.
+	 */
+	public int getParallelism() {
+		return config.getParallelism();
+	}
+
+	/**
 	 * Sets the maximum time frequency (milliseconds) for the flushing of the
 	 * output buffers. By default the output buffers flush frequently to provide
 	 * low latency and to aid smooth developer experience. Setting the parameter
@@ -200,6 +200,17 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Sets the maximum time frequency (milliseconds) for the flushing of the
+	 * output buffers. For clarification on the extremal values see
+	 * {@link #setBufferTimeout(long)}.
+	 *
+	 * @return The timeout of the buffer.
+	 */
+	public long getBufferTimeout() {
+		return this.bufferTimeout;
+	}
+
+	/**
 	 * Disables operator chaining for streaming operators. Operator chaining
 	 * allows non-shuffle operations to be co-located in the same thread fully
 	 * avoiding serialization and de-serialization.
@@ -287,17 +298,6 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Sets the maximum time frequency (milliseconds) for the flushing of the
-	 * output buffers. For clarification on the extremal values see
-	 * {@link #setBufferTimeout(long)}.
-	 *
-	 * @return The timeout of the buffer.
-	 */
-	public long getBufferTimeout() {
-		return this.bufferTimeout;
-	}
-
-	/**
 	 * Sets the default parallelism that will be used for the local execution
 	 * environment created by {@link #createLocalEnvironment()}.
 	 *
@@ -654,31 +654,6 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Creates a data stream that contains the contents of file created while system watches the given path. The file
-	 * will be read with the system's default character set.
-	 *
-	 * @param filePath
-	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path/")
-	 * @param intervalMillis
-	 * 		The interval of file watching in milliseconds
-	 * @param watchType
-	 * 		The watch type of file stream. When watchType is {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#ONLY_NEW_FILES}, the system processes
-	 * 		only
-	 * 		new files. {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#REPROCESS_WITH_APPENDED} means that the system re-processes all contents of
-	 * 		appended file. {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#PROCESS_ONLY_APPENDED} means that the system processes only appended
-	 * 		contents
-	 * 		of files.
-	 * @return The DataStream containing the given directory.
-	 */
-	public DataStream<String> readFileStream(String filePath, long intervalMillis,
-			WatchType watchType) {
-		DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
-				filePath, intervalMillis, watchType), "Read File Stream source");
-
-		return source.flatMap(new FileReadFunction());
-	}
-
-	/**
 	 * Creates a data stream that represents the strings produced by reading the given file line wise. This method is
 	 * similar to {@link #readTextFile(String)}, but it produces a data stream with mutable {@link org.apache.flink.types.StringValue}
 	 * objects,
@@ -797,6 +772,31 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Creates a data stream that contains the contents of file created while system watches the given path. The file
+	 * will be read with the system's default character set.
+	 *
+	 * @param filePath
+	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path/")
+	 * @param intervalMillis
+	 * 		The interval of file watching in milliseconds
+	 * @param watchType
+	 * 		The watch type of file stream. When watchType is {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#ONLY_NEW_FILES}, the system processes
+	 * 		only
+	 * 		new files. {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#REPROCESS_WITH_APPENDED} means that the system re-processes all contents of
+	 * 		appended file. {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#PROCESS_ONLY_APPENDED} means that the system processes only appended
+	 * 		contents
+	 * 		of files.
+	 * @return The DataStream containing the given directory.
+	 */
+	public DataStream<String> readFileStream(String filePath, long intervalMillis,
+											WatchType watchType) {
+		DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
+				filePath, intervalMillis, watchType), "Read File Stream source");
+
+		return source.flatMap(new FileReadFunction());
+	}
+
+	/**
 	 * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
 	 * decoded by the system's default character set. On the termination of the socket server connection retries can be
 	 * initiated.

http://git-wip-us.apache.org/repos/asf/flink/blob/7571959a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml b/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
index 25ecb57..51bea21 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
@@ -72,6 +72,14 @@ under the License.
 			<version>${guava.version}</version>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/7571959a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 3618c49..c5b101e 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -46,12 +46,19 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def getJavaStream: JavaStream[T] = javaStream
 
   /**
+   * Returns the ID of the {@link DataStream}.
+   *
+   * @return ID of the DataStream
+   */
+  def getId = javaStream.getId
+
+  /**
    * Returns the TypeInformation for the elements of this DataStream.
    */
   def getType(): TypeInformation[T] = javaStream.getType
 
   /**
-   * Sets the parallelism of this operation. This must be greater than 1.
+   * Sets the parallelism of this operation. This must be at least 1.
    */
   def setParallelism(parallelism: Int): DataStream[T] = {
     javaStream match {
@@ -67,13 +74,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   /**
    * Returns the parallelism of this operation.
    */
-  def getParallelism: Int = javaStream match {
-    case op: SingleOutputStreamOperator[_, _] => op.getParallelism
-    case _ =>
-      throw new UnsupportedOperationException("Operator " + javaStream.toString + " does not have" +
-        " "  +
-        "parallelism.")
-  }
+  def getParallelism = javaStream.getParallelism
 
   /**
    * Gets the name of the current data stream. This name is
@@ -168,6 +169,23 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
+   * Sets the maximum time frequency (ms) for the flushing of the output
+   * buffer. By default the output buffers flush only when they are full.
+   *
+   * @param timeoutMillis
+   * The maximum time between two output flushes.
+   * @return The operator with buffer timeout set.
+   */
+  def setBufferTimeout(timeoutMillis: Long): DataStream[T] = {
+    javaStream match {
+      case ds: SingleOutputStreamOperator[_, _] => ds.setBufferTimeout(timeoutMillis);
+      case _ =>
+        throw new UnsupportedOperationException("Only supported for operators.")
+    }
+    this
+  }
+
+  /**
    * Creates a new DataStream by merging DataStream outputs of
    * the same type with each other. The DataStreams merged using this operator
    * will be transformed simultaneously.
@@ -660,7 +678,16 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * written.
    *
    */
-  def print(): DataStream[T] = javaStream.print()
+  def print(): DataStream[T] = javaStream.print
+
+  /**
+   * Writes a DataStream to the standard output stream (stderr).<br>
+   * For each element of the DataStream the result of
+   * {@link Object#toString()} is written.
+   *
+   * @return The closed DataStream.
+   */
+  def printToErr() = javaStream.printToErr
 
   /**
    * Writes a DataStream to the file specified by path in text format. The

http://git-wip-us.apache.org/repos/asf/flink/blob/7571959a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 865b484..62e24d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -36,6 +36,11 @@ import scala.reflect.ClassTag
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
+   * Gets the config object.
+   */
+  def getConfig = javaEnv.getConfig
+
+  /**
    * Sets the parallelism for operations executed through this environment.
    * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
    * with x parallel instances. This value can be overridden by specific operations using
@@ -48,6 +53,14 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
+   * Returns the default parallelism for this execution environment. Note that this
+   * value can be overridden by individual operations using [[DataStream.setParallelism]]
+   * @deprecated Please use [[getParallelism]]
+   */
+  @deprecated
+  def getDegreeOfParallelism = javaEnv.getParallelism
+
+  /**
    * Sets the parallelism for operations executed through this environment.
    * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
    * with x parallel instances. This value can be overridden by specific operations using
@@ -60,14 +73,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Returns the default parallelism for this execution environment. Note that this
    * value can be overridden by individual operations using [[DataStream.setParallelism]]
-   * @deprecated Please use [[getParallelism]]
-   */
-  @deprecated
-  def getDegreeOfParallelism = javaEnv.getParallelism
-
-  /**
-   * Returns the default parallelism for this execution environment. Note that this
-   * value can be overridden by individual operations using [[DataStream.setParallelism]]
    */
   def getParallelism = javaEnv.getParallelism
 
@@ -96,7 +101,18 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Gets the default buffer timeout set for this environment
    */
-  def getBufferTimout: Long = javaEnv.getBufferTimeout()
+  def getBufferTimeout = javaEnv.getBufferTimeout
+
+  /**
+   * Disables operator chaining for streaming operators. Operator chaining
+   * allows non-shuffle operations to be co-located in the same thread fully
+   * avoiding serialization and de-serialization.
+   *
+   */
+  def disableOperatorChaining(): StreamExecutionEnvironment = {
+    javaEnv.disableOperatorChaining()
+    this
+  }
 
   /**
    * Method for enabling fault-tolerance. Activates monitoring and backup of streaming
@@ -134,17 +150,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     javaEnv.setStateHandleProvider(provider)
     this
   }
- 
-  /**
-   * Disables operator chaining for streaming operators. Operator chaining
-   * allows non-shuffle operations to be co-located in the same thread fully
-   * avoiding serialization and de-serialization.
-   * 
-   */
-  def disableOperatorChaning(): StreamExecutionEnvironment = {
-    javaEnv.disableOperatorChaining()
-    this
-  }
 
   /**
    * Sets the number of times that failed tasks are re-executed. A value of zero
@@ -162,6 +167,36 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
 
+  // --------------------------------------------------------------------------------------------
+  // Registry for types and serializers
+  // --------------------------------------------------------------------------------------------
+  /**
+   * Adds a new Kryo default serializer to the Runtime.
+   * <p/>
+   * Note that the serializer instance must be serializable (as defined by
+   * java.io.Serializable), because it may be distributed to the worker nodes
+   * by java serialization.
+   *
+   * @param type
+   * The class of the types serialized with the given serializer.
+   * @param serializer
+   * The serializer to use.
+   */
+  def addDefaultKryoSerializer(`type`: Class[_], serializer: Serializer[_]) {
+    javaEnv.addDefaultKryoSerializer(`type`, serializer)
+  }
+
+  /**
+   * Adds a new Kryo default serializer to the Runtime.
+   *
+   * @param type
+   * The class of the types serialized with the given serializer.
+   * @param serializerClass
+   * The class of the serializer to use.
+   */
+  def addDefaultKryoSerializer(`type`: Class[_], serializerClass: Class[_ <: Serializer[_]]) {
+    javaEnv.addDefaultKryoSerializer(`type`, serializerClass)
+  }
 
   /**
    * Registers the given type with the serializer at the [[KryoSerializer]].
@@ -180,33 +215,87 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
   }
 
+  /**
+   * Registers the given type with the serialization stack. If the type is eventually
+   * serialized as a POJO, then the type is registered with the POJO serializer. If the
+   * type ends up being serialized with Kryo, then it will be registered at Kryo to make
+   * sure that only tags are written.
+   *
+   */
+  def registerType(typeClass: Class[_]) {
+    javaEnv.registerType(typeClass)
+  }
+
+  // --------------------------------------------------------------------------------------------
+  // Data stream creations
+  // --------------------------------------------------------------------------------------------
 
   /**
-   * Registers a default serializer for the given class and its sub-classes at Kryo.
+   * Creates a new DataStream that contains a sequence of numbers.
+   *
+   * Note that this operation will result in a non-parallel data source, i.e. a data source with
+   * a parallelism of one.
    */
-  def registerDefaultKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]) {
-    javaEnv.addDefaultKryoSerializer(clazz, serializer)
+  def generateSequence(from: Long, to: Long): DataStream[Long] = {
+    new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
+      asInstanceOf[DataStream[Long]]
   }
 
   /**
-   * Registers a default serializer for the given class and its sub-classes at Kryo.
+   * Creates a new DataStream that contains a sequence of numbers in a parallel fashion.
+   */
+  def generateParallelSequence(from: Long, to: Long): DataStream[Long] = {
+    new DataStream[java.lang.Long](javaEnv.generateParallelSequence(from, to)).
+      asInstanceOf[DataStream[Long]]
+  }
+
+  /**
+   * Creates a DataStream that contains the given elements. The elements must all be of the
+   * same type.
    *
-   * Note that the serializer instance must be serializable (as defined by java.io.Serializable),
-   * because it may be distributed to the worker nodes by java serialization.
+   * Note that this operation will result in a non-parallel data source, i.e. a data source with
+   * a parallelism of one.
    */
-  def registerDefaultKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
-    javaEnv.addDefaultKryoSerializer(clazz, serializer)
+  def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
+    val typeInfo = implicitly[TypeInformation[T]]
+    fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
   }
 
   /**
-   * Registers the given type with the serialization stack. If the type is eventually
-   * serialized as a POJO, then the type is registered with the POJO serializer. If the
-   * type ends up being serialized with Kryo, then it will be registered at Kryo to make
-   * sure that only tags are written.
+   * Creates a DataStream from the given non-empty [[Seq]]. The elements need to be serializable
+   * because the framework may move the elements into the cluster if needed.
    *
+   * Note that this operation will result in a non-parallel data source, i.e. a data source with
+   * a parallelism of one.
    */
-  def registerType(typeClass: Class[_]) {
-    javaEnv.registerType(typeClass)
+  def fromCollection[T: ClassTag: TypeInformation](data: Seq[T]): DataStream[T] = {
+    require(data != null, "Data must not be null.")
+    val typeInfo = implicitly[TypeInformation[T]]
+
+    val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
+      .asJavaCollection(data))
+
+    javaEnv.addSource(sourceFunction).returns(typeInfo)
+  }
+
+  /**
+   * Creates a DataStream from the given [[Iterator]].
+   *
+   * Note that this operation will result in a non-parallel data source, i.e. a data source with
+   * a parallelism of one.
+   */
+  def fromCollection[T: ClassTag : TypeInformation] (data: Iterator[T]): DataStream[T] = {
+    val typeInfo = implicitly[TypeInformation[T]]
+    javaEnv.fromCollection(data.asJava, typeInfo)
+  }
+
+  /**
+   * Creates a DataStream from the given [[SplittableIterator]].
+   */
+  def fromParallelCollection[T: ClassTag : TypeInformation] (data: SplittableIterator[T]):
+  DataStream[T] = {
+    val typeInfo = implicitly[TypeInformation[T]]
+    javaEnv.fromParallelCollection(data, typeInfo)
   }
 
   /**
@@ -260,17 +349,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * line wise. The file path should be passed as a URI (e.g., "file:///some/local/file" or
    * "hdfs://host:port/file/path").
    */
-  def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, typeClass: Class[T]):
-    DataStream[T] =
-    javaEnv.readFileOfPrimitives(filePath, typeClass)
-
-  /**
-   * Creates a data stream that represents the primitive type produced by reading the given file
-   * line wise. The file path should be passed as a URI (e.g., "file:///some/local/file" or
-   * "hdfs://host:port/file/path").
-   */
-  def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, delimiter: String,
-    typeClass: Class[T]): DataStream[T] =
+  def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String,
+    delimiter: String = "\n", typeClass: Class[T]): DataStream[T] =
     javaEnv.readFileOfPrimitives(filePath, delimiter, typeClass)
 
   /**
@@ -305,85 +385,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     javaEnv.createInput(inputFormat)
 
   /**
-   * Generic method to create an input data stream with a specific input format.
-   * Since all data streams need specific information about their types, this method needs to
-   * determine the type of the data produced by the input format. It will attempt to determine the
-   * data type by reflection, unless the input format implements the ResultTypeQueryable interface.
-   */
-  def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _],
-    typeInfo: TypeInformation[T]): DataStream[T] =
-    javaEnv.createInput(inputFormat, typeInfo)
-
-  /**
-   * Creates a new DataStream that contains a sequence of numbers.
-   *
-   * Note that this operation will result in a non-parallel data source, i.e. a data source with
-   * a parallelism of one.
-   */
-  def generateSequence(from: Long, to: Long): DataStream[Long] = {
-    new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
-      asInstanceOf[DataStream[Long]]
-  }
-
-  /**
-   * Creates a new DataStream that contains a sequence of numbers in a parallel fashion.
-   */
-  def generateParallelSequence(from: Long, to: Long): DataStream[Long] = {
-    new DataStream[java.lang.Long](javaEnv.generateParallelSequence(from, to)).
-      asInstanceOf[DataStream[Long]]
-  }
-
-  /**
-   * Creates a DataStream that contains the given elements. The elements must all be of the
-   * same type.
-   *
-   * Note that this operation will result in a non-parallel data source, i.e. a data source with
-   * a parallelism of one.
-   */
-  def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
-    val typeInfo = implicitly[TypeInformation[T]]
-    fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
-  }
-
-  /**
-   * Creates a DataStream from the given non-empty [[Seq]]. The elements need to be serializable
-   * because the framework may move the elements into the cluster if needed.
-   *
-   * Note that this operation will result in a non-parallel data source, i.e. a data source with
-   * a parallelism of one.
-   */
-  def fromCollection[T: ClassTag: TypeInformation](
-    data: Seq[T]): DataStream[T] = {
-    require(data != null, "Data must not be null.")
-    val typeInfo = implicitly[TypeInformation[T]]
-
-    val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
-        .asJavaCollection(data))
-        
-    javaEnv.addSource(sourceFunction).returns(typeInfo)
-  }
-
-  /**
-   * Creates a DataStream from the given [[Iterator]].
-   *
-   * Note that this operation will result in a non-parallel data source, i.e. a data source with
-   * a parallelism of one.
-   */
-  def fromCollection[T: ClassTag : TypeInformation] (data: Iterator[T]): DataStream[T] = {
-    val typeInfo = implicitly[TypeInformation[T]]
-    javaEnv.fromCollection(data.asJava, typeInfo)
-  }
-
-  /**
-   * Creates a DataStream from the given [[SplittableIterator]].
-   */
-  def fromParallelCollection[T: ClassTag : TypeInformation] (data: SplittableIterator[T]):
-    DataStream[T] = {
-    val typeInfo = implicitly[TypeInformation[T]]
-    javaEnv.fromParallelCollection(data, typeInfo)
-  }
-
-  /**
    * Create a DataStream using a user defined source function for arbitrary
    * source functionality. By default sources have a parallelism of 1. 
    * To enable parallel execution, the user defined source should implement 
@@ -444,7 +445,14 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * executed.
    *
    */
-  def getExecutionPlan() = javaEnv.getStreamGraph.getStreamingPlanAsJSON
+  def getExecutionPlan = javaEnv.getExecutionPlan
+
+  /**
+   * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
+   *
+   * @return The StreamGraph representing the transformations
+   */
+  def getStreamGraph = javaEnv.getStreamGraph
 
 }
 
@@ -456,6 +464,16 @@ object StreamExecutionEnvironment {
   }
 
   /**
+   * Sets the default parallelism that will be used for the local execution
+   * environment created by {@link #createLocalEnvironment()}.
+   *
+   * @param parallelism
+   * The parallelism to use as the default local parallelism.
+   */
+  def setDefaultLocalParallelism(parallelism: Int) : Unit =
+    StreamExecutionEnvironment.setDefaultLocalParallelism(parallelism)
+
+  /**
    * Creates an execution environment that represents the context in which the program is
    * currently executed. If the program is invoked standalone, this method returns a local
    * execution environment. If the program is invoked from within the command line client

http://git-wip-us.apache.org/repos/asf/flink/blob/7571959a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
new file mode 100644
index 0000000..7ebc161
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala
+
+import java.lang.reflect.Method
+
+import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
+
+import scala.language.existentials
+
+import org.junit.Test
+
+/**
+ * This checks whether the streaming Scala API is up to feature parity with the Java API.
+ * Implements the {@link ScalaAPICompletenessTest} for streaming.
+ */
+class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
+
+  override def isExcludedByName(method: Method): Boolean = {
+    val name = method.getDeclaringClass.getName + "." + method.getName
+    val excludedNames = Seq(
+      // These are only used internally. Should be internal API but Java doesn't have
+      // private[flink].
+      "org.apache.flink.streaming.api.datastream.DataStream.getExecutionEnvironment",
+      "org.apache.flink.streaming.api.datastream.DataStream.getType",
+      "org.apache.flink.streaming.api.datastream.DataStream.copy",
+      "org.apache.flink.streaming.api.datastream.DataStream.transform",
+      "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.copy",
+      "org.apache.flink.streaming.api.datastream.ConnectedDataStream.getExecutionEnvironment",
+      "org.apache.flink.streaming.api.datastream.ConnectedDataStream.getType1",
+      "org.apache.flink.streaming.api.datastream.ConnectedDataStream.getType2",
+      "org.apache.flink.streaming.api.datastream.ConnectedDataStream.addGeneralWindowCombine",
+      "org.apache.flink.streaming.api.datastream.ConnectedDataStream.transform",
+      "org.apache.flink.streaming.api.datastream.WindowedDataStream.getType",
+      "org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig",
+
+      // TypeHints are only needed for Java API, Scala API doesn't need them
+      "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.returns"
+    )
+    val excludedPatterns = Seq(
+      // We don't have project on tuples in the Scala API
+      """^org\.apache\.flink\.streaming.api.*project""",
+
+      // Cleaning is easier in the Scala API
+      """^org\.apache\.flink\.streaming.api.*clean""",
+
+      // Object methods
+      """^.*notify""",
+      """^.*wait""",
+      """^.*notifyAll""",
+      """^.*equals""",
+      """^.*toString""",
+      """^.*getClass""",
+      """^.*hashCode"""
+    ).map(_.r)
+    lazy val excludedByPattern =
+      excludedPatterns.map(_.findFirstIn(name)).filter(_.isDefined).nonEmpty
+    name.contains("$") || excludedNames.contains(name) || excludedByPattern
+  }
+
+  @Test
+  override def testCompleteness(): Unit = {
+    checkMethods("DataStream", "DataStream", classOf[JavaStream[_]], classOf[DataStream[_]])
+
+    checkMethods(
+      "StreamExecutionEnvironment", "StreamExecutionEnvironment",
+      classOf[org.apache.flink.streaming.api.environment.StreamExecutionEnvironment],
+      classOf[StreamExecutionEnvironment])
+
+    checkMethods(
+      "SingleOutputStreamOperator", "DataStream",
+      classOf[org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator[_,_]],
+      classOf[DataStream[_]])
+
+    checkMethods(
+      "ConnectedDataStream", "ConnectedDataStream",
+      classOf[org.apache.flink.streaming.api.datastream.ConnectedDataStream[_,_]],
+      classOf[ConnectedDataStream[_,_]])
+
+    checkMethods(
+      "SplitDataStream", "SplitDataStream",
+      classOf[org.apache.flink.streaming.api.datastream.SplitDataStream[_]],
+      classOf[SplitDataStream[_]])
+
+    checkMethods(
+      "StreamCrossOperator", "StreamCrossOperator",
+      classOf[org.apache.flink.streaming.api.datastream.temporal.StreamCrossOperator[_,_]],
+      classOf[StreamCrossOperator[_,_]])
+
+    checkMethods(
+      "StreamJoinOperator", "StreamJoinOperator",
+      classOf[org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator[_,_]],
+      classOf[StreamJoinOperator[_,_]])
+
+    checkMethods(
+      "TemporalOperator", "TemporalOperator",
+      classOf[org.apache.flink.streaming.api.datastream.temporal.TemporalOperator[_,_,_]],
+      classOf[TemporalOperator[_,_,_]])
+
+    checkMethods(
+      "WindowedDataStream", "WindowedDataStream",
+      classOf[org.apache.flink.streaming.api.datastream.WindowedDataStream[_]],
+      classOf[WindowedDataStream[_]])
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7571959a/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
deleted file mode 100644
index ab90757..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala
-
-import java.lang.reflect.Method
-
-import org.apache.flink.api.java.operators._
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
-
-import scala.language.existentials
-
-import org.junit.Assert._
-import org.junit.Test
-
-/**
- * This checks whether the Scala API is up to feature parity with the Java API. Right now is very
- * simple, it is only checked whether a method with the same name exists.
- *
- * When adding excluded methods to the lists you should give a good reason in a comment.
- *
- * Note: This is inspired by the JavaAPICompletenessChecker from Spark.
- */
-class ScalaAPICompletenessTest {
-
-  private def isExcludedByName(method: Method): Boolean = {
-    val name = method.getDeclaringClass.getName + "." + method.getName
-    val excludedNames = Seq(
-      // These are only used internally. Should be internal API but Java doesn't have
-      // private[flink].
-      "org.apache.flink.api.java.DataSet.getExecutionEnvironment",
-      "org.apache.flink.api.java.DataSet.getType",
-      "org.apache.flink.api.java.operators.Operator.getResultType",
-      "org.apache.flink.api.java.operators.Operator.getName",
-      "org.apache.flink.api.java.operators.Grouping.getDataSet",
-      "org.apache.flink.api.java.operators.Grouping.getKeys",
-      "org.apache.flink.api.java.operators.SingleInputOperator.getInput",
-      "org.apache.flink.api.java.operators.SingleInputOperator.getInputType",
-      "org.apache.flink.api.java.operators.TwoInputOperator.getInput1",
-      "org.apache.flink.api.java.operators.TwoInputOperator.getInput2",
-      "org.apache.flink.api.java.operators.TwoInputOperator.getInput1Type",
-      "org.apache.flink.api.java.operators.TwoInputOperator.getInput2Type",
-      "org.apache.flink.api.java.ExecutionEnvironment.localExecutionIsAllowed",
-      "org.apache.flink.api.java.ExecutionEnvironment.setDefaultLocalParallelism",
-
-      // TypeHints are only needed for Java API, Scala API doesn't need them
-      "org.apache.flink.api.java.operators.SingleInputUdfOperator.returns",
-      "org.apache.flink.api.java.operators.TwoInputUdfOperator.returns",
-
-      // This is really just a mapper, which in Scala can easily expressed as a map lambda
-      "org.apache.flink.api.java.DataSet.writeAsFormattedText",
-
-      // Exclude minBy and maxBy for now, since there is some discussion about our aggregator
-      // semantics
-      "org.apache.flink.api.java.DataSet.minBy",
-      "org.apache.flink.api.java.DataSet.maxBy",
-      "org.apache.flink.api.java.operators.UnsortedGrouping.minBy",
-      "org.apache.flink.api.java.operators.UnsortedGrouping.maxBy",
-
-      // This method is actually just an internal helper
-      "org.apache.flink.api.java.DataSet.getCallLocationName"
-    )
-    val excludedPatterns = Seq(
-      // We don't have project on tuples in the Scala API
-      """^org\.apache\.flink\.api.java.*project""",
-
-      // I don't want to have withParameters in the API since I consider Configuration to be
-      // deprecated. But maybe thats just me ...
-      """^org\.apache\.flink\.api.java.*withParameters""",
-
-      // These are only used internally. Should be internal API but Java doesn't have
-      // private[flink].
-      """^org\.apache\.flink\.api.java.*getBroadcastSets""",
-      """^org\.apache\.flink\.api.java.*setSemanticProperties""",
-      """^org\.apache\.flink\.api.java.*getSemanticProperties""",
-      """^org\.apache\.flink\.api.java.*getParameters""",
-
-      // Commented out for now until we have a use case for this.
-      """^org\.apache\.flink\.api.java.*runOperation""",
-
-      // Object methods
-      """^.*notify""",
-      """^.*wait""",
-      """^.*notifyAll""",
-      """^.*equals""",
-      """^.*toString""",
-      """^.*getClass""",
-      """^.*hashCode"""
-    ).map(_.r)
-    lazy val excludedByPattern =
-      excludedPatterns.map(_.findFirstIn(name)).filter(_.isDefined).nonEmpty
-    name.contains("$") || excludedNames.contains(name) || excludedByPattern
-  }
-
-  private def isExcludedByInterface(method: Method): Boolean = {
-    val excludedInterfaces =
-      Set("org.apache.spark.Logging", "org.apache.hadoop.mapreduce.HadoopMapReduceUtil")
-    def toComparisionKey(method: Method) =
-      (method.getReturnType, method.getName, method.getGenericReturnType)
-    val interfaces = method.getDeclaringClass.getInterfaces.filter { i =>
-      excludedInterfaces.contains(i.getName)
-    }
-    val excludedMethods = interfaces.flatMap(_.getMethods.map(toComparisionKey))
-    excludedMethods.contains(toComparisionKey(method))
-  }
-
-  private def checkMethods(
-      javaClassName: String,
-      scalaClassName: String,
-      javaClass: Class[_],
-      scalaClass: Class[_]) {
-    val javaMethods = javaClass.getMethods
-      .filterNot(_.isAccessible)
-      .filterNot(isExcludedByName)
-      .filterNot(isExcludedByInterface)
-      .map(m => m.getName).toSet
-
-    val scalaMethods = scalaClass.getMethods
-      .filterNot(_.isAccessible)
-      .filterNot(isExcludedByName)
-      .filterNot(isExcludedByInterface)
-      .map(m => m.getName).toSet
-
-    val missingMethods = javaMethods -- scalaMethods
-
-    for (method <- missingMethods) {
-      fail("Method " + method + " from " + javaClass + " is missing from " + scalaClassName + ".")
-    }
-  }
-
-  @Test
-  def testCompleteness(): Unit = {
-    checkMethods("DataSet", "DataSet", classOf[JavaDataSet[_]], classOf[DataSet[_]])
-
-    checkMethods(
-      "ExecutionEnvironment", "ExecutionEnvironment",
-      classOf[org.apache.flink.api.java.ExecutionEnvironment], classOf[ExecutionEnvironment])
-
-    checkMethods("Operator", "DataSet", classOf[Operator[_, _]], classOf[DataSet[_]])
-
-    checkMethods("UnsortedGrouping", "GroupedDataSet",
-      classOf[UnsortedGrouping[_]], classOf[GroupedDataSet[_]])
-
-    checkMethods("SortedGrouping", "GroupedDataSet",
-      classOf[SortedGrouping[_]], classOf[GroupedDataSet[_]])
-
-    checkMethods("AggregateOperator", "AggregateDataSet",
-      classOf[AggregateOperator[_]], classOf[AggregateDataSet[_]])
-
-    checkMethods("SingleInputOperator", "DataSet",
-      classOf[SingleInputOperator[_, _, _]], classOf[DataSet[_]])
-
-    checkMethods("TwoInputOperator", "DataSet",
-      classOf[TwoInputOperator[_, _, _, _]], classOf[DataSet[_]])
-
-    checkMethods("SingleInputUdfOperator", "DataSet",
-      classOf[SingleInputUdfOperator[_, _, _]], classOf[DataSet[_]])
-
-    checkMethods("TwoInputUdfOperator", "DataSet",
-      classOf[TwoInputUdfOperator[_, _, _, _]], classOf[DataSet[_]])
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7571959a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
new file mode 100644
index 0000000..a5f1cbb
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.completeness
+
+import java.lang.reflect.Method
+
+import org.apache.flink.api.java.operators._
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+import org.apache.flink.api.scala._
+import org.junit.Test
+
+import scala.language.existentials
+
+/**
+  * This checks whether the Scala API is up to feature parity with the Java API. Right now is very
+  * simple, it is only checked whether a method with the same name exists.
+  *
+  * When adding excluded methods to the lists you should give a good reason in a comment.
+  *
+  * Note: This is inspired by the JavaAPICompletenessChecker from Spark.
+  */
+class BatchScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
+
+   override def isExcludedByName(method: Method): Boolean = {
+     val name = method.getDeclaringClass.getName + "." + method.getName
+     val excludedNames = Seq(
+       // These are only used internally. Should be internal API but Java doesn't have
+       // private[flink].
+       "org.apache.flink.api.java.DataSet.getExecutionEnvironment",
+       "org.apache.flink.api.java.DataSet.getType",
+       "org.apache.flink.api.java.operators.Operator.getResultType",
+       "org.apache.flink.api.java.operators.Operator.getName",
+       "org.apache.flink.api.java.operators.Grouping.getDataSet",
+       "org.apache.flink.api.java.operators.Grouping.getKeys",
+       "org.apache.flink.api.java.operators.SingleInputOperator.getInput",
+       "org.apache.flink.api.java.operators.SingleInputOperator.getInputType",
+       "org.apache.flink.api.java.operators.TwoInputOperator.getInput1",
+       "org.apache.flink.api.java.operators.TwoInputOperator.getInput2",
+       "org.apache.flink.api.java.operators.TwoInputOperator.getInput1Type",
+       "org.apache.flink.api.java.operators.TwoInputOperator.getInput2Type",
+       "org.apache.flink.api.java.ExecutionEnvironment.localExecutionIsAllowed",
+       "org.apache.flink.api.java.ExecutionEnvironment.setDefaultLocalParallelism",
+
+       // TypeHints are only needed for Java API, Scala API doesn't need them
+       "org.apache.flink.api.java.operators.SingleInputUdfOperator.returns",
+       "org.apache.flink.api.java.operators.TwoInputUdfOperator.returns",
+
+       // This is really just a mapper, which in Scala can easily expressed as a map lambda
+       "org.apache.flink.api.java.DataSet.writeAsFormattedText",
+
+       // Exclude minBy and maxBy for now, since there is some discussion about our aggregator
+       // semantics
+       "org.apache.flink.api.java.DataSet.minBy",
+       "org.apache.flink.api.java.DataSet.maxBy",
+       "org.apache.flink.api.java.operators.UnsortedGrouping.minBy",
+       "org.apache.flink.api.java.operators.UnsortedGrouping.maxBy",
+
+       // This method is actually just an internal helper
+       "org.apache.flink.api.java.DataSet.getCallLocationName"
+     )
+     val excludedPatterns = Seq(
+       // We don't have project on tuples in the Scala API
+       """^org\.apache\.flink\.api.java.*project""",
+
+       // I don't want to have withParameters in the API since I consider Configuration to be
+       // deprecated. But maybe thats just me ...
+       """^org\.apache\.flink\.api.java.*withParameters""",
+
+       // These are only used internally. Should be internal API but Java doesn't have
+       // private[flink].
+       """^org\.apache\.flink\.api.java.*getBroadcastSets""",
+       """^org\.apache\.flink\.api.java.*setSemanticProperties""",
+       """^org\.apache\.flink\.api.java.*getSemanticProperties""",
+       """^org\.apache\.flink\.api.java.*getParameters""",
+
+       // Commented out for now until we have a use case for this.
+       """^org\.apache\.flink\.api.java.*runOperation""",
+
+       // Object methods
+       """^.*notify""",
+       """^.*wait""",
+       """^.*notifyAll""",
+       """^.*equals""",
+       """^.*toString""",
+       """^.*getClass""",
+       """^.*hashCode"""
+     ).map(_.r)
+     lazy val excludedByPattern =
+       excludedPatterns.map(_.findFirstIn(name)).filter(_.isDefined).nonEmpty
+     name.contains("$") || excludedNames.contains(name) || excludedByPattern
+   }
+
+   @Test
+   override def testCompleteness(): Unit = {
+     checkMethods("DataSet", "DataSet", classOf[JavaDataSet[_]], classOf[DataSet[_]])
+
+     checkMethods(
+       "ExecutionEnvironment", "ExecutionEnvironment",
+       classOf[org.apache.flink.api.java.ExecutionEnvironment], classOf[ExecutionEnvironment])
+
+     checkMethods("Operator", "DataSet", classOf[Operator[_, _]], classOf[DataSet[_]])
+
+     checkMethods("UnsortedGrouping", "GroupedDataSet",
+       classOf[UnsortedGrouping[_]], classOf[GroupedDataSet[_]])
+
+     checkMethods("SortedGrouping", "GroupedDataSet",
+       classOf[SortedGrouping[_]], classOf[GroupedDataSet[_]])
+
+     checkMethods("AggregateOperator", "AggregateDataSet",
+       classOf[AggregateOperator[_]], classOf[AggregateDataSet[_]])
+
+     checkMethods("SingleInputOperator", "DataSet",
+       classOf[SingleInputOperator[_, _, _]], classOf[DataSet[_]])
+
+     checkMethods("TwoInputOperator", "DataSet",
+       classOf[TwoInputOperator[_, _, _, _]], classOf[DataSet[_]])
+
+     checkMethods("SingleInputUdfOperator", "DataSet",
+       classOf[SingleInputUdfOperator[_, _, _]], classOf[DataSet[_]])
+
+     checkMethods("TwoInputUdfOperator", "DataSet",
+       classOf[TwoInputUdfOperator[_, _, _, _]], classOf[DataSet[_]])
+   }
+ }

http://git-wip-us.apache.org/repos/asf/flink/blob/7571959a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
new file mode 100644
index 0000000..0df4589
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.completeness
+
+import java.lang.reflect.Method
+
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.language.existentials
+
+/**
+ * Test base for checking whether the Scala API is up to feature parity with the Java API.
+ * Right now is very simple, it is only checked whether a method with the same name exists.
+ *
+ * When adding excluded methods to the lists you should give a good reason in a comment.
+ *
+ * Note: This is inspired by the JavaAPICompletenessChecker from Spark.
+ */
+abstract class ScalaAPICompletenessTestBase {
+
+  /**
+   * Determines whether a method is excluded by name.
+   */
+  protected def isExcludedByName(method: Method): Boolean
+
+  /**
+   * Determines whether a method is excluded by an interface it uses.
+   */
+  protected def isExcludedByInterface(method: Method): Boolean = {
+    val excludedInterfaces =
+      Set("org.apache.spark.Logging", "org.apache.hadoop.mapreduce.HadoopMapReduceUtil")
+    def toComparisionKey(method: Method) =
+      (method.getReturnType, method.getName, method.getGenericReturnType)
+    val interfaces = method.getDeclaringClass.getInterfaces.filter { i =>
+      excludedInterfaces.contains(i.getName)
+    }
+    val excludedMethods = interfaces.flatMap(_.getMethods.map(toComparisionKey))
+    excludedMethods.contains(toComparisionKey(method))
+  }
+
+  /**
+   * Utility to be called during the test.
+   */
+  protected def checkMethods(
+      javaClassName: String,
+      scalaClassName: String,
+      javaClass: Class[_],
+      scalaClass: Class[_]) {
+    val javaMethods = javaClass.getMethods
+      .filterNot(_.isAccessible)
+      .filterNot(isExcludedByName)
+      .filterNot(isExcludedByInterface)
+      .map(m => m.getName).toSet
+
+    val scalaMethods = scalaClass.getMethods
+      .filterNot(_.isAccessible)
+      .filterNot(isExcludedByName)
+      .filterNot(isExcludedByInterface)
+      .map(m => m.getName).toSet
+
+    val missingMethods = javaMethods -- scalaMethods
+
+    for (method <- missingMethods) {
+      fail("Method " + method + " from " + javaClass + " is missing from " + scalaClassName + ".")
+    }
+  }
+
+  /**
+   * Tests to be performed to ensure API completeness.
+   */
+  @Test
+  protected def testCompleteness(): Unit
+}


Mime
View raw message