flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/6] flink git commit: [FLINK-3421] [streaming scala] Remove unneeded ClassTag context bounds
Date Wed, 17 Feb 2016 14:18:03 GMT
[FLINK-3421] [streaming scala] Remove unneeded ClassTag context bounds


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8afec3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8afec3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8afec3b

Branch: refs/heads/master
Commit: a8afec3b55700362ee76b8d5c6855819e0a27818
Parents: 6dda531
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Feb 16 19:59:24 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 17 15:09:13 2016 +0100

----------------------------------------------------------------------
 .../streaming/api/scala/AllWindowedStream.scala | 18 ++++++------
 .../streaming/api/scala/CoGroupedStreams.scala  |  6 ++--
 .../streaming/api/scala/ConnectedStreams.scala  | 12 ++++----
 .../flink/streaming/api/scala/DataStream.scala  | 13 ++++-----
 .../streaming/api/scala/JoinedStreams.scala     |  6 ++--
 .../flink/streaming/api/scala/KeyedStream.scala | 10 +++----
 .../api/scala/StreamExecutionEnvironment.scala  | 29 ++++++++------------
 .../streaming/api/scala/WindowedStream.scala    | 18 ++++++------
 8 files changed, 47 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a8afec3b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index a4df980..e36542e 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -30,8 +30,6 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
-import scala.reflect.ClassTag
-
 import scala.collection.JavaConverters._
 
 /**
@@ -136,7 +134,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T,
W]) {
    * @param function The fold function.
    * @return The data stream that is the result of applying the fold function to the window.
    */
-  def fold[R: TypeInformation: ClassTag](
+  def fold[R: TypeInformation](
       initialValue: R,
       function: FoldFunction[T,R]): DataStream[R] = {
     
@@ -156,7 +154,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T,
W]) {
    * @param function The fold function.
    * @return The data stream that is the result of applying the fold function to the window.
    */
-  def fold[R: TypeInformation: ClassTag](initialValue: R, function: (R, T) => R): DataStream[R]
= {
+  def fold[R: TypeInformation](initialValue: R, function: (R, T) => R): DataStream[R]
= {
     if (function == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
@@ -180,7 +178,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T,
W]) {
    * @param function The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */
-  def apply[R: TypeInformation: ClassTag](
+  def apply[R: TypeInformation](
       function: AllWindowFunction[Iterable[T], R, W]): DataStream[R] = {
     
     val cleanedFunction = clean(function)
@@ -203,7 +201,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T,
W]) {
    * @param function The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */
-  def apply[R: TypeInformation: ClassTag](
+  def apply[R: TypeInformation](
       function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
     
     val cleanedFunction = clean(function)
@@ -226,7 +224,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T,
W]) {
    * @param function The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */
-  def apply[R: TypeInformation: ClassTag](
+  def apply[R: TypeInformation](
       preAggregator: ReduceFunction[T],
       function: AllWindowFunction[T, R, W]): DataStream[R] = {
 
@@ -245,7 +243,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T,
W]) {
    * @param function The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */
-  def apply[R: TypeInformation: ClassTag](
+  def apply[R: TypeInformation](
       preAggregator: (T, T) => T,
       function: (W, T, Collector[R]) => Unit): DataStream[R] = {
     if (function == null) {
@@ -283,7 +281,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T,
W]) {
     * @param function The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
-  def apply[R: TypeInformation: ClassTag](
+  def apply[R: TypeInformation](
       initialValue: R,
       preAggregator: FoldFunction[T, R],
       function: AllWindowFunction[R, R, W]): DataStream[R] = {
@@ -307,7 +305,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T,
W]) {
     * @param function The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
-  def apply[R: TypeInformation: ClassTag](
+  def apply[R: TypeInformation](
       initialValue: R,
       preAggregator: (R, T) => R,
       function: (W, R, Collector[R]) => Unit): DataStream[R] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/a8afec3b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
index 7572885..f4ab2ee 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -32,8 +32,6 @@ import org.apache.flink.util.Collector
 
 import scala.collection.JavaConverters._
 
-import scala.reflect.ClassTag
-
 /**
  * `CoGroupedStreams` represents two [[DataStream]]s that have been co-grouped.
  * A streaming co-group operation is evaluated over elements in a window.
@@ -236,7 +234,7 @@ object CoGroupedStreams {
      * Completes the co-group operation with the user function that is executed
      * for windowed groups.
      */
-    def apply[O: TypeInformation: ClassTag](
+    def apply[O: TypeInformation](
         fun: (Iterator[T1], Iterator[T2]) => O): DataStream[O] = {
       require(fun != null, "CoGroup function must not be null.")
 
@@ -255,7 +253,7 @@ object CoGroupedStreams {
      * Completes the co-group operation with the user function that is executed
      * for windowed groups.
      */
-    def apply[O: TypeInformation: ClassTag](
+    def apply[O: TypeInformation](
         fun: (Iterator[T1], Iterator[T2], Collector[O]) => Unit): DataStream[O] = {
       require(fun != null, "CoGroup function must not be null.")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a8afec3b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 278090d..fbea12a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -26,8 +26,6 @@ import org.apache.flink.streaming.api.datastream.{ConnectedStreams =>
JavaCStrea
 import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction}
 import org.apache.flink.util.Collector
 
-import scala.reflect.ClassTag
-
 /**
  * [[ConnectedStreams]] represents two connected streams of (possibly) different data types.
  * Connected streams are useful for cases where operations on one stream directly
@@ -60,7 +58,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * @param fun2 Function called per element of the second input.
    * @return The resulting data stream.
    */
-  def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): 
+  def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): 
       DataStream[R] = {
     
     if (fun1 == null || fun2 == null) {
@@ -92,7 +90,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * @return
     *        The resulting data stream
    */
-  def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): DataStream[R]
= {
+  def map[R: TypeInformation](coMapper: CoMapFunction[IN1, IN2, R]): DataStream[R] = {
     if (coMapper == null) {
       throw new NullPointerException("Map function must not be null.")
     }
@@ -117,7 +115,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * @return
     *        The resulting data stream.
    */
-  def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]):

+  def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): 
           DataStream[R] = {
     
     if (coFlatMapper == null) {
@@ -139,7 +137,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * @param fun2 Function called per element of the second input.
    * @return The resulting data stream.
    */
-  def flatMap[R: TypeInformation: ClassTag](
+  def flatMap[R: TypeInformation](
       fun1: (IN1, Collector[R]) => Unit, 
       fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
     
@@ -166,7 +164,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * @param fun2 Function called per element of the second input.
    * @return The resulting data stream.
    */
-  def flatMap[R: TypeInformation: ClassTag](
+  def flatMap[R: TypeInformation](
       fun1: IN1 => TraversableOnce[R],
       fun2: IN2 => TraversableOnce[R]): DataStream[R] = {
     

http://git-wip-us.apache.org/repos/asf/flink/blob/a8afec3b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 808f4ea..fd4bbf3 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -38,7 +38,6 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema
 import org.apache.flink.util.Collector
 
 import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
 
 @Public
 class DataStream[T](stream: JavaStream[T]) {
@@ -447,7 +446,7 @@ class DataStream[T](stream: JavaStream[T]) {
    *
    */
   @PublicEvolving
-  def iterate[R, F: TypeInformation: ClassTag](
+  def iterate[R, F: TypeInformation](
         stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]),
         maxWaitTimeMillis:Long): DataStream[R] = {
     
@@ -464,7 +463,7 @@ class DataStream[T](stream: JavaStream[T]) {
   /**
    * Creates a new DataStream by applying the given function to every element of this DataStream.
    */
-  def map[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] = {
+  def map[R: TypeInformation](fun: T => R): DataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("Map function must not be null.")
     }
@@ -479,7 +478,7 @@ class DataStream[T](stream: JavaStream[T]) {
   /**
    * Creates a new DataStream by applying the given function to every element of this DataStream.
    */
-  def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataStream[R] = {
+  def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R] = {
     if (mapper == null) {
       throw new NullPointerException("Map function must not be null.")
     }
@@ -492,7 +491,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * Creates a new DataStream by applying the given function to every element and flattening
    * the results.
    */
-  def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataStream[R]
= {
+  def flatMap[R: TypeInformation](flatMapper: FlatMapFunction[T, R]): DataStream[R] = {
     if (flatMapper == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
@@ -505,7 +504,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * Creates a new DataStream by applying the given function to every element and flattening
    * the results.
    */
-  def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataStream[R]
= {
+  def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
@@ -520,7 +519,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * Creates a new DataStream by applying the given function to every element and flattening
    * the results.
    */
-  def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataStream[R]
= {
+  def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/a8afec3b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
index b111d9a..381a8cb 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -30,8 +30,6 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
-import scala.reflect.ClassTag
-
 /**
  * `JoinedStreams` represents two [[DataStream]]s that have been joined.
  * A streaming join operation is evaluated over elements in a window.
@@ -233,7 +231,7 @@ object JoinedStreams {
      * Completes the join operation with the user function that is executed
      * for windowed groups.
      */
-    def apply[O: TypeInformation: ClassTag](fun: (T1, T2) => O): DataStream[O] = {
+    def apply[O: TypeInformation](fun: (T1, T2) => O): DataStream[O] = {
       require(fun != null, "Join function must not be null.")
 
       val joiner = new FlatJoinFunction[T1, T2, O] {
@@ -249,7 +247,7 @@ object JoinedStreams {
      * Completes the join operation with the user function that is executed
      * for windowed groups.
      */
-    def apply[O: TypeInformation: ClassTag](fun: (T1, T2, Collector[O]) => Unit): DataStream[O]
= {
+    def apply[O: TypeInformation](fun: (T1, T2, Collector[O]) => Unit): DataStream[O]
= {
       require(fun != null, "Join function must not be null.")
 
       val joiner = new FlatJoinFunction[T1, T2, O] {

http://git-wip-us.apache.org/repos/asf/flink/blob/a8afec3b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index b492bf9..00a4bbb 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -32,8 +32,6 @@ import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
 import org.apache.flink.util.Collector
 
-import scala.reflect.ClassTag
-
 @Public
 class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T](javaStream)
{
 
@@ -151,7 +149,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    * using an associative fold function and an initial value. An independent 
    * aggregate is kept per key.
    */
-  def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]): 
+  def fold[R: TypeInformation](initialValue: R, folder: FoldFunction[T,R]): 
       DataStream[R] = {
     if (folder == null) {
       throw new NullPointerException("Fold function must not be null.")
@@ -168,7 +166,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    * using an associative fold function and an initial value. An independent 
    * aggregate is kept per key.
    */
-  def fold[R: TypeInformation: ClassTag](initialValue: R, fun: (R,T) => R): DataStream[R]
= {
+  def fold[R: TypeInformation](initialValue: R, fun: (R,T) => R): DataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
@@ -321,7 +319,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    *
    * Note that the user state object needs to be serializable.
    */
-  def mapWithState[R: TypeInformation: ClassTag, S: TypeInformation](
+  def mapWithState[R: TypeInformation, S: TypeInformation](
         fun: (T, Option[S]) => (R, Option[S])): DataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("Map function must not be null.")
@@ -350,7 +348,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    *
    * Note that the user state object needs to be serializable.
    */
-  def flatMapWithState[R: TypeInformation: ClassTag, S: TypeInformation](
+  def flatMapWithState[R: TypeInformation, S: TypeInformation](
         fun: (T, Option[S]) => (TraversableOnce[R], Option[S])): DataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("Flatmap function must not be null.")

http://git-wip-us.apache.org/repos/asf/flink/blob/a8afec3b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 4107e4d..3736225 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
 import org.apache.flink.util.SplittableIterator
 
 import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
 
 import _root_.scala.language.implicitConversions
 
@@ -387,9 +386,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Note that this operation will result in a non-parallel data source, i.e. a data source
with
    * a parallelism of one.
    */
-  def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
-    val typeInfo = implicitly[TypeInformation[T]]
-    fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
+  def fromElements[T: TypeInformation](data: T*): DataStream[T] = {
+    fromCollection(data)
   }
 
   /**
@@ -399,7 +397,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Note that this operation will result in a non-parallel data source, i.e. a data source
with
    * a parallelism of one.
    */
-  def fromCollection[T: ClassTag: TypeInformation](data: Seq[T]): DataStream[T] = {
+  def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T] = {
     require(data != null, "Data must not be null.")
     val typeInfo = implicitly[TypeInformation[T]]
 
@@ -413,7 +411,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Note that this operation will result in a non-parallel data source, i.e. a data source
with
    * a parallelism of one.
    */
-  def fromCollection[T: ClassTag : TypeInformation] (data: Iterator[T]): DataStream[T] =
{
+  def fromCollection[T: TypeInformation] (data: Iterator[T]): DataStream[T] = {
     val typeInfo = implicitly[TypeInformation[T]]
     asScalaStream(javaEnv.fromCollection(data.asJava, typeInfo))
   }
@@ -421,7 +419,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Creates a DataStream from the given [[SplittableIterator]].
    */
-  def fromParallelCollection[T: ClassTag : TypeInformation] (data: SplittableIterator[T]):
+  def fromParallelCollection[T: TypeInformation] (data: SplittableIterator[T]):
       DataStream[T] = {
     val typeInfo = implicitly[TypeInformation[T]]
     asScalaStream(javaEnv.fromParallelCollection(data, typeInfo))
@@ -447,7 +445,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Reads the given file with the given input format. The file path should be passed
    * as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
    */
-  def readFile[T: ClassTag : TypeInformation](inputFormat: FileInputFormat[T], filePath:
String):
+  def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String):
         DataStream[T] =
     asScalaStream(javaEnv.readFile(inputFormat, filePath))
 
@@ -482,7 +480,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * data type by reflection, unless the input format implements the ResultTypeQueryable
interface.
    */
   @PublicEvolving
-  def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T]
=
+  def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] =
     asScalaStream(javaEnv.createInput(inputFormat))
 
   /**
@@ -494,7 +492,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * To change this afterwards call DataStreamSource.setParallelism(int)
    *
    */
-  def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T]
= {
+  def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
     require(function != null, "Function must not be null.")
     
     val cleanFun = scalaClean(function)
@@ -506,7 +504,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Create a DataStream using a user defined source function for arbitrary
    * source functionality.
    */
-  def addSource[T: ClassTag: TypeInformation](function: SourceContext[T] => Unit): DataStream[T]
= {
+  def addSource[T: TypeInformation](function: SourceContext[T] => Unit): DataStream[T]
= {
     require(function != null, "Function must not be null.")
     val sourceFunction = new SourceFunction[T] {
       val cleanFun = scalaClean(function)
@@ -522,10 +520,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Triggers the program execution. The environment will execute all parts of
    * the program that have resulted in a "sink" operation. Sink operations are
    * for example printing results or forwarding them to a message queue.
-   * <p>
+   * 
    * The program execution will be logged and displayed with a generated
    * default name.
-   *
    */
   def execute() = javaEnv.execute()
 
@@ -533,9 +530,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Triggers the program execution. The environment will execute all parts of
    * the program that have resulted in a "sink" operation. Sink operations are
    * for example printing results or forwarding them to a message queue.
-   * <p>
-   * The program execution will be logged and displayed with the provided name
-   *
+   * 
+   * The program execution will be logged and displayed with the provided name.
    */
   def execute(jobName: String) = javaEnv.execute(jobName)
 
@@ -544,7 +540,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * returns it as a String using a JSON representation of the execution data
    * flow graph. Note that this needs to be called, before the plan is
    * executed.
-   *
    */
   def getExecutionPlan = javaEnv.getExecutionPlan
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a8afec3b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index d34dd1b..53f033c 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -30,8 +30,6 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
-import scala.reflect.ClassTag
-
 import scala.collection.JavaConverters._
 
 /**
@@ -139,7 +137,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T,
K, W]) {
    * @param function The fold function.
    * @return The data stream that is the result of applying the fold function to the window.
    */
-  def fold[R: TypeInformation: ClassTag](
+  def fold[R: TypeInformation](
       initialValue: R,
       function: FoldFunction[T,R]): DataStream[R] = {
     if (function == null) {
@@ -159,7 +157,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T,
K, W]) {
    * @param function The fold function.
    * @return The data stream that is the result of applying the fold function to the window.
    */
-  def fold[R: TypeInformation: ClassTag](initialValue: R, function: (R, T) => R): DataStream[R]
= {
+  def fold[R: TypeInformation](initialValue: R, function: (R, T) => R): DataStream[R]
= {
     if (function == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
@@ -183,7 +181,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T,
K, W]) {
    * @param function The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */
-  def apply[R: TypeInformation: ClassTag](
+  def apply[R: TypeInformation](
       function: WindowFunction[Iterable[T], R, K, W]): DataStream[R] = {
     
     val cleanFunction = clean(function)
@@ -207,7 +205,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T,
K, W]) {
    * @param function The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */
-  def apply[R: TypeInformation: ClassTag](
+  def apply[R: TypeInformation](
       function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
     if (function == null) {
       throw new NullPointerException("WindowApply function must not be null.")
@@ -233,7 +231,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T,
K, W]) {
    * @param function The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */
-  def apply[R: TypeInformation: ClassTag](
+  def apply[R: TypeInformation](
       preAggregator: ReduceFunction[T],
       function: WindowFunction[T, R, K, W]): DataStream[R] = {
 
@@ -252,7 +250,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T,
K, W]) {
    * @param function The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */
-  def apply[R: TypeInformation: ClassTag](
+  def apply[R: TypeInformation](
       preAggregator: (T, T) => T,
       function: (K, W, T, Collector[R]) => Unit): DataStream[R] = {
     
@@ -290,7 +288,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T,
K, W]) {
     * @param function The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
-  def apply[R: TypeInformation: ClassTag](
+  def apply[R: TypeInformation](
       initialValue: R,
       foldFunction: FoldFunction[T, R],
       function: WindowFunction[R, R, K, W]): DataStream[R] = {
@@ -313,7 +311,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T,
K, W]) {
     * @param function The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
-  def apply[R: TypeInformation: ClassTag](
+  def apply[R: TypeInformation](
       initialValue: R,
       foldFunction: (R, T) => R,
       function: (K, W, R, Collector[R]) => Unit): DataStream[R] = {


Mime
View raw message