flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [12/36] flink git commit: [scala] [streaming] Fixed scala formatting
Date Wed, 07 Jan 2015 14:12:51 GMT
[scala] [streaming] Fixed scala formatting


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f7b6eaa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f7b6eaa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f7b6eaa

Branch: refs/heads/release-0.8
Commit: 1f7b6eaaa979e91799a047f29881f82666c12b19
Parents: f7291ea
Author: Gyula Fora <gyfora@apache.org>
Authored: Sun Dec 21 01:05:40 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Jan 5 17:58:54 2015 +0100

----------------------------------------------------------------------
 .../flink/api/scala/streaming/DataStream.scala  | 93 +++++++++++++-------
 .../api/scala/streaming/FieldsKeySelector.scala |  2 +-
 .../api/scala/streaming/SplitDataStream.scala   | 17 ++--
 .../scala/streaming/StreamCrossOperator.scala   | 25 ++++--
 .../streaming/StreamExecutionEnvironment.scala  | 16 ++--
 .../scala/streaming/StreamJoinOperator.scala    | 29 +++---
 .../scala/streaming/WindowedDataStream.scala    | 31 ++++---
 7 files changed, 136 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f7b6eaa/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index ecf5615..0cf4a60 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -82,7 +82,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
     javaStream match {
       case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop)
       case _ =>
-        throw new UnsupportedOperationException("Operator " + javaStream.toString + " cannot
have " +
+        throw new UnsupportedOperationException("Operator " + javaStream.toString +  " cannot
" +
+          "have " +
           "parallelism.")
     }
     this
@@ -94,7 +95,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def getParallelism: Int = javaStream match {
     case op: SingleOutputStreamOperator[_, _] => op.getParallelism
     case _ =>
-      throw new UnsupportedOperationException("Operator " + javaStream.toString + " does
not have " +
+      throw new UnsupportedOperationException("Operator " + javaStream.toString + " does
not have" +
+        " "  +
         "parallelism.")
   }
 
@@ -139,7 +141,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
 
   /**
    * Sets the partitioning of the DataStream so that the output is
-   * partitioned by the selected fields. This setting only effects the how the outputs will
be distributed between the parallel instances of the next processing operator.
+   * partitioned by the selected fields. This setting only effects the how the outputs will
be
+   * distributed between the parallel instances of the next processing operator.
    *
    */
   def partitionBy(fields: Int*): DataStream[T] =
@@ -147,7 +150,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
 
   /**
    * Sets the partitioning of the DataStream so that the output is
-   * partitioned by the selected fields. This setting only effects the how the outputs will
be distributed between the parallel instances of the next processing operator.
+   * partitioned by the selected fields. This setting only effects the how the outputs will
be
+   * distributed between the parallel instances of the next processing operator.
    *
    */
   def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
@@ -155,7 +159,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
 
   /**
    * Sets the partitioning of the DataStream so that the output is
-   * partitioned by the given Key. This setting only effects the how the outputs will be
distributed between the parallel instances of the next processing operator.
+   * partitioned by the given Key. This setting only effects the how the outputs will be
+   * distributed between the parallel instances of the next processing operator.
    *
    */
   def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
@@ -222,7 +227,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    *
    */
-  def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]), maxWaitTimeMillis:
Long = 0): DataStream[T] = {
+  def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]),  maxWaitTimeMillis:
+    Long = 0): DataStream[T] = {
     val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
 
     val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
@@ -252,19 +258,24 @@ class DataStream[T](javaStream: JavaStream[T]) {
 
   /**
    * Applies an aggregation that that gives the current minimum element of the data stream
by
-   * the given position. When equality, the user can set to get the first or last element
with the minimal value.
+   * the given position. When equality, the user can set to get the first or last element
with
+   * the minimal value.
    *
    */
-  def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY,
position, first)
+  def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType
+    .MINBY, position, first)
 
   /**
    * Applies an aggregation that that gives the current maximum element of the data stream
by
-   * the given position. When equality, the user can set to get the first or last element
with the maximal value.
+   * the given position. When equality, the user can set to get the first or last element
with
+   * the maximal value.
    *
    */
-  def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY,
position, first)
+  def maxBy(position: Int, first: Boolean = true): DataStream[T] =
+    aggregate(AggregationType.MAXBY, position, first)
 
-  private def aggregate(aggregationType: AggregationType, position: Int, first: Boolean =
true): DataStream[T] = {
+  private def aggregate(aggregationType: AggregationType, position: Int, first: Boolean =
true):
+    DataStream[T] = {
 
     val jStream = javaStream.asInstanceOf[JavaStream[Product]]
     val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
@@ -272,15 +283,18 @@ class DataStream[T](javaStream: JavaStream[T]) {
     val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(),
position)
 
     val reducer = aggregationType match {
-      case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).getTypeClass()));
+      case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).
+        getTypeClass()));
       case _ => new agg.ProductComparableAggregator(aggregationType, first)
     }
 
     val invokable = jStream match {
-      case groupedStream: GroupedDataStream[_] => new GroupedReduceInvokable(reducer,
groupedStream.getKeySelector())
+      case groupedStream: GroupedDataStream[_] => new GroupedReduceInvokable(reducer,
+        groupedStream.getKeySelector())
       case _ => new StreamReduceInvokable(reducer)
     }
-    new DataStream[Product](jStream.transform("aggregation", jStream.getType(), invokable)).asInstanceOf[DataStream[T]]
+    new DataStream[Product](jStream.transform("aggregation", jStream.getType(),
+      invokable)).asInstanceOf[DataStream[T]]
   }
 
   /**
@@ -288,7 +302,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * received records.
    *
    */
-  def count: DataStream[Long] = new DataStream[java.lang.Long](javaStream.count()).asInstanceOf[DataStream[Long]]
+  def count: DataStream[Long] = new DataStream[java.lang.Long](
+    javaStream.count()).asInstanceOf[DataStream[Long]]
 
   /**
    * Creates a new DataStream by applying the given function to every element of this DataStream.
@@ -302,7 +317,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
       def map(in: T): R = cleanFun(in)
     }
 
-    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T,
R](mapper)))
+    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]],
+      new MapInvokable[T, R](mapper)))
   }
 
   /**
@@ -313,7 +329,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
       throw new NullPointerException("Map function must not be null.")
     }
 
-    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T,
R](mapper)))
+    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]],
+      new MapInvokable[T, R](mapper)))
   }
 
   /**
@@ -324,7 +341,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (flatMapper == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
-    new DataStream[R](javaStream.transform("flatMap", implicitly[TypeInformation[R]], new
FlatMapInvokable[T, R](flatMapper)))
+    new DataStream[R](javaStream.transform("flatMap", implicitly[TypeInformation[R]],
+      new FlatMapInvokable[T, R](flatMapper)))
   }
 
   /**
@@ -358,22 +376,24 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
-   * Creates a new [[DataStream]] by reducing the elements of this DataStream using an associative
reduce
-   * function.
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream
+   * using an associative reduce function.
    */
   def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
     if (reducer == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
     javaStream match {
-      case ds: GroupedDataStream[_] => new DataStream[T](javaStream.transform("reduce",
javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector())))
-      case _ => new DataStream[T](javaStream.transform("reduce", javaStream.getType(),
new StreamReduceInvokable[T](reducer)))
+      case ds: GroupedDataStream[_] => new DataStream[T](javaStream.transform("reduce",
+        javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector())))
+      case _ => new DataStream[T](javaStream.transform("reduce", javaStream.getType(),
+        new StreamReduceInvokable[T](reducer)))
     }
   }
 
   /**
-   * Creates a new [[DataStream]] by reducing the elements of this DataStream using an associative
reduce
-   * function.
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream
+   * using an associative reduce function.
    */
   def reduce(fun: (T, T) => T): DataStream[T] = {
     if (fun == null) {
@@ -421,7 +441,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * the trigger and eviction policies please use to
    * window(List(triggers), List(evicters))
    */
-  def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = new WindowedDataStream[T](javaStream.window(windowingHelper:
_*))
+  def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
+    new WindowedDataStream[T](javaStream.window(windowingHelper: _*))
 
   /**
    * Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s.
@@ -430,7 +451,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * use-cases please refer to window(WindowingHelper[_]*)
    *
    */
-  def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]): WindowedDataStream[T]
= new WindowedDataStream[T](javaStream.window(triggers, evicters))
+  def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]):
+    WindowedDataStream[T] = new WindowedDataStream[T](javaStream.window(triggers, evicters))
 
   /**
    *
@@ -473,7 +495,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * to use custom join function.
    *
    */
-  def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] = new StreamJoinOperator[T,
R](javaStream, stream.getJavaStream)
+  def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] =
+    new StreamJoinOperator[T, R](javaStream, stream.getJavaStream)
 
   /**
    * Initiates a temporal cross transformation that builds all pair
@@ -487,7 +510,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * to use custom join function.
    *
    */
-  def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] = new StreamCrossOperator[T,
R](javaStream, stream.getJavaStream)
+  def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] =
+    new StreamCrossOperator[T, R](javaStream, stream.getJavaStream)
 
   /**
    * Writes a DataStream to the standard output stream (stdout). For each
@@ -504,7 +528,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * is written.
    *
    */
-  def writeAsText(path: String, millis: Long = 0): DataStream[T] = new DataStream[T](javaStream.writeAsText(path,
millis))
+  def writeAsText(path: String, millis: Long = 0): DataStream[T] =
+    new DataStream[T](javaStream.writeAsText(path, millis))
 
   /**
    * Writes a DataStream to the file specified by path in text format. The
@@ -513,7 +538,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * is written.
    *
    */
-  def writeAsCsv(path: String, millis: Long = 0): DataStream[T] = new DataStream[T](javaStream.writeAsCsv(path,
millis))
+  def writeAsCsv(path: String, millis: Long = 0): DataStream[T] =
+    new DataStream[T](javaStream.writeAsCsv(path, millis))
 
   /**
    * Adds the given sink to this DataStream. Only streams with sinks added
@@ -521,7 +547,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * method is called.
    *
    */
-  def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] = new DataStream[T](javaStream.addSink(sinkFuntion))
+  def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] =
+    new DataStream[T](javaStream.addSink(sinkFuntion))
 
   /**
    * Adds the given sink to this DataStream. Only streams with sinks added
@@ -540,4 +567,4 @@ class DataStream[T](javaStream: JavaStream[T]) {
     this.addSink(sinkFunction)
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f7b6eaa/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
index d7c9f96..b50d346 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
@@ -44,4 +44,4 @@ class FieldsKeySelector[IN](fields: Int*) extends KeySelector[IN, Tuple]
{
       case _ => throw new RuntimeException("Only tuple types are supported")
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f7b6eaa/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
index 0b0cce5..82a5c70 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -46,4 +47,4 @@ class SplitDataStream[T](javaStream: SplitJavaStream[T]) {
    */
   def selectAll(): DataStream[T] = new DataStream[T](javaStream.selectAll())
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f7b6eaa/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
index 5dfbc3b..5f579e5 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
@@ -35,11 +35,13 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
 import org.apache.flink.streaming.api.function.co.CrossWindowFunction
 import org.apache.flink.api.common.functions.CrossFunction
 
-class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1,
I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
+class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
+  TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
 
   override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, I2] = {
 
-    val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this, (l: I1, r:
I2) => (l, r))
+    val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this,
+      (l: I1, r: I2) => (l, r))
 
     val returnType = new CaseClassTypeInfo[(I1, I2)](
 
@@ -69,24 +71,31 @@ class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2])
extend
 }
 object StreamCrossOperator {
 
-  private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2], javaStream: JavaStream[(I1,
I2)]) extends DataStream[(I1, I2)](javaStream) {
+  private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2],
+                                           javaStream: JavaStream[(I1, I2)]) extends
+    DataStream[(I1, I2)](javaStream) {
 
     /**
-     * Sets a wrapper for the crossed elements. For each crossed pair, the result of the
udf call will be emitted.
+     * Sets a wrapper for the crossed elements. For each crossed pair, the result of the
udf
+     * call will be emitted.
      *
      */
     def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
 
       val invokable = new CoWindowInvokable[I1, I2, R](
-        clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
op.timeStamp2)
+        clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
+        op.timeStamp2)
 
-      javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
invokable)
+      javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
+        invokable)
 
       new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]]))
     }
   }
 
-  private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2], crossFunction:
(I1, I2) => R): CrossWindowFunction[I1, I2, R] = {
+  private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2],
+                                                       crossFunction: (I1, I2) => R):
+  CrossWindowFunction[I1, I2, R] = {
     Validate.notNull(crossFunction, "Join function must not be null.")
 
     val crossFun = new CrossFunction[I1, I2, R] {
@@ -100,4 +109,4 @@ object StreamCrossOperator {
     new CrossWindowFunction[I1, I2, R](crossFun)
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f7b6eaa/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
index 340ecc1..55f7c6c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -117,7 +117,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    *
    */
   def generateSequence(from: Long, to: Long): DataStream[Long] = {
-    new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).asInstanceOf[DataStream[Long]]
+    new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
+      asInstanceOf[DataStream[Long]]
   }
 
   /**
@@ -147,7 +148,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
       "elements", typeInfo);
 
     javaEnv.getJobGraphBuilder.addStreamVertex(returnStream.getId(),
-      new SourceInvokable[T](new FromElementsFunction[T](scala.collection.JavaConversions.asJavaCollection(data))),
null, typeInfo,
+      new SourceInvokable[T](new FromElementsFunction[T](scala.collection.JavaConversions
+        .asJavaCollection(data))), null, typeInfo,
       "source", 1);
     new DataStream(returnStream)
   }
@@ -204,7 +206,8 @@ object StreamExecutionEnvironment {
    * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
    */
   def createLocalEnvironment(
-    degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()): StreamExecutionEnvironment
= {
+    degreeOfParallelism: Int =  Runtime.getRuntime.availableProcessors()):
+  StreamExecutionEnvironment = {
     new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism))
   }
 
@@ -223,7 +226,8 @@ object StreamExecutionEnvironment {
    *                 those must be
    *                 provided in the JAR files.
    */
-  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment
= {
+  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*):
+  StreamExecutionEnvironment = {
     new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles:
_*))
   }
 
@@ -251,4 +255,4 @@ object StreamExecutionEnvironment {
     javaEnv.setDegreeOfParallelism(degreeOfParallelism)
     new StreamExecutionEnvironment(javaEnv)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f7b6eaa/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
index fff5e86..7a39da5 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
@@ -33,7 +33,8 @@ import scala.reflect.ClassTag
 import org.apache.commons.lang.Validate
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
 
-class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1,
I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
+class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
+TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
 
   override def createNextWindowOperator() = {
     new StreamJoinOperator.JoinWindow[I1, I2](this)
@@ -61,7 +62,8 @@ object StreamJoinOperator {
      * to define the second key.
      */
     def where(firstField: String, otherFields: String*) = {
-      new JoinPredicate[I1, I2](op, new PojoKeySelector[I1](op.input1.getType(), (firstField
+: otherFields): _*))
+      new JoinPredicate[I1, I2](op, new PojoKeySelector[I1](op.input1.getType(),
+        (firstField +: otherFields): _*))
     }
 
     /**
@@ -82,7 +84,8 @@ object StreamJoinOperator {
 
   }
 
-  class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], private[flink]
val keys1: KeySelector[I1, _]) {
+  class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2],
+                              private[flink] val keys1: KeySelector[I1, _]) {
     private[flink] var keys2: KeySelector[I2, _] = null
 
     /**
@@ -145,30 +148,36 @@ object StreamJoinOperator {
         }
       }
 
-      return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)).addGeneralWindowCombine(getJoinWindowFunction(this,
(_, _)),
+      return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
+        .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
         returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
     }
   }
 
-  class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)])
extends DataStream[(I1, I2)](javaStream) {
+  class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)])
extends
+  DataStream[(I1, I2)](javaStream) {
 
     private val op = jp.op
 
     /**
-     * Sets a wrapper for the joined elements. For each joined pair, the result of the udf
call will be emitted.
+     * Sets a wrapper for the joined elements. For each joined pair, the result of the
+     * udf call will be emitted.
      */
     def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
 
       val invokable = new CoWindowInvokable[I1, I2, R](
-        clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
op.timeStamp2)
+        clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
+        op.timeStamp2)
 
-      javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
invokable)
+      javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
+        invokable)
 
       new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]]))
     }
   }
 
-  private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2], joinFunction:
(I1, I2) => R) = {
+  private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2],
+                                                      joinFunction: (I1, I2) => R) = {
     Validate.notNull(joinFunction, "Join function must not be null.")
 
     val joinFun = new JoinFunction[I1, I2, R] {
@@ -183,4 +192,4 @@ object StreamJoinOperator {
     new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f7b6eaa/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
index c037305..8c763fc 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -52,7 +52,8 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    * This controls how often the user defined function will be triggered on
    * the window.
    */
-  def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = new WindowedDataStream[T](javaStream.every(windowingHelper:
_*))
+  def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
+    new WindowedDataStream[T](javaStream.every(windowingHelper: _*))
 
   /**
    * Groups the elements of the WindowedDataStream using the given
@@ -126,12 +127,14 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
 
   /**
    * Applies a reduceGroup transformation on the windowed data stream by reducing
-   * the current window at every trigger. In contrast with the simple binary reduce operator,
groupReduce exposes the whole window through the Iterable interface.
+   * the current window at every trigger. In contrast with the simple binary reduce operator,
+   * groupReduce exposes the whole window through the Iterable interface.
    * </br>
    * </br>
    * Whenever possible try to use reduce instead of groupReduce for increased efficiency
    */
-  def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]): DataStream[R]
= {
+  def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]):
+  DataStream[R] = {
     if (reducer == null) {
       throw new NullPointerException("GroupReduce function must not be null.")
     }
@@ -140,12 +143,14 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
 
   /**
    * Applies a reduceGroup transformation on the windowed data stream by reducing
-   * the current window at every trigger. In contrast with the simple binary reduce operator,
groupReduce exposes the whole window through the Iterable interface.
+   * the current window at every trigger. In contrast with the simple binary reduce operator,
+   * groupReduce exposes the whole window through the Iterable interface.
    * </br>
    * </br>
    * Whenever possible try to use reduce instead of groupReduce for increased efficiency
    */
-  def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit):
DataStream[R] = {
+  def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit):
+  DataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("GroupReduce function must not be null.")
     }
@@ -181,16 +186,19 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    * the given position. When equality, returns the first.
    *
    */
-  def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY,
position, first)
+  def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY,
+    position, first)
 
   /**
    * Applies an aggregation that that gives the minimum element of the window by
    * the given position. When equality, returns the first.
    *
    */
-  def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY,
position, first)
+  def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY,
+    position, first)
 
-  def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true):
DataStream[T] = {
+  def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true):
+  DataStream[T] = {
 
     val jStream = javaStream.asInstanceOf[JavaWStream[Product]]
     val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
@@ -198,11 +206,12 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
     val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(),
position)
 
     val reducer = aggregationType match {
-      case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).getTypeClass()));
+      case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(
+        outType.getTypeAt(position).getTypeClass()));
       case _ => new agg.ProductComparableAggregator(aggregationType, first)
     }
 
     new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
   }
 
-}
\ No newline at end of file
+}


Mime
View raw message