flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [25/27] incubator-flink git commit: [streaming] [scala] Restructured streaming scala project and examples
Date Sun, 04 Jan 2015 20:51:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
new file mode 100644
index 0000000..d587d56
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import scala.Array.canBuildFrom
+import scala.reflect.ClassTag
+
+import org.apache.commons.lang.Validate
+import org.apache.flink.api.common.functions.JoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.operators.Keys
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.CaseClassSerializer
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
+import org.apache.flink.streaming.api.datastream.TemporalOperator
+import org.apache.flink.streaming.api.function.co.JoinWindowFunction
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
+import org.apache.flink.streaming.api.scala.StreamingConversions._
+import org.apache.flink.streaming.util.keys.KeySelectorUtil
+
+class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
+TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
+
+  override def createNextWindowOperator() = {
+    new StreamJoinOperator.JoinWindow[I1, I2](this)
+  }
+}
+
+object StreamJoinOperator {
+
+  class JoinWindow[I1, I2](private[flink] op: StreamJoinOperator[I1, I2]) {
+
+    private[flink] val type1 = op.input1.getType();
+    
+    /**
+     * Continues a temporal Join transformation by defining
+     * the fields in the first stream to be used as keys for the join.
+     * The resulting incomplete join can be completed by JoinPredicate.equalTo()
+     * to define the second key.
+     */
+    def where(fields: Int*) = {
+      new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
+          new Keys.ExpressionKeys(fields.toArray,type1),type1))
+    }
+
+    /**
+     * Continues a temporal Join transformation by defining
+     * the fields in the first stream to be used as keys for the join.
+     * The resulting incomplete join can be completed by JoinPredicate.equalTo()
+     * to define the second key.
+     */
+    def where(firstField: String, otherFields: String*) = 
+      new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
+          new Keys.ExpressionKeys(firstField +: otherFields.toArray,type1),type1))  
+
+    /**
+     * Continues a temporal Join transformation by defining
+     * the keyselector function that will be used to extract keys from the first stream
+     * for the join.
+     * The resulting incomplete join can be completed by JoinPredicate.equalTo()
+     * to define the second key.
+     */
+    def where[K: TypeInformation](fun: (I1) => K) = {
+      val keyType = implicitly[TypeInformation[K]]
+      val keyExtractor = new KeySelector[I1, K] {
+        val cleanFun = op.input1.clean(fun)
+        def getKey(in: I1) = cleanFun(in)
+      }
+      new JoinPredicate[I1, I2](op, keyExtractor)
+    }
+
+  }
+
+  class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2],
+                              private[flink] val keys1: KeySelector[I1, _]) {
+    private[flink] var keys2: KeySelector[I2, _] = null
+    private[flink] val type2 = op.input2.getType();
+
+    /**
+     * Creates a temporal join transformation by defining the second join key.
+     * The returned transformation wrapes each joined element pair in a tuple2:
+     * (first, second)
+     * To define a custom wrapping, use JoinedStream.apply(...)
+     */
+    def equalTo(fields: Int*): JoinedStream[I1, I2] = {
+      finish(KeySelectorUtil.getSelectorForKeys(
+          new Keys.ExpressionKeys(fields.toArray,type2),type2))
+    }
+
+    /**
+     * Creates a temporal join transformation by defining the second join key.
+     * The returned transformation wrapes each joined element pair in a tuple2:
+     * (first, second)
+     * To define a custom wrapping, use JoinedStream.apply(...)
+     */
+    def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = 
+     finish(KeySelectorUtil.getSelectorForKeys(
+          new Keys.ExpressionKeys(firstField +: otherFields.toArray,type2),type2))
+
+    /**
+     * Creates a temporal join transformation by defining the second join key.
+     * The returned transformation wrapes each joined element pair in a tuple2:
+     * (first, second)
+     * To define a custom wrapping, use JoinedStream.apply(...)
+     */
+    def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = {
+      val keyType = implicitly[TypeInformation[K]]
+      val keyExtractor = new KeySelector[I2, K] {
+        val cleanFun = op.input1.clean(fun)
+        def getKey(in: I2) = cleanFun(in)
+      }
+      finish(keyExtractor)
+    }
+
+    private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = {
+      this.keys2 = keys2
+      new JoinedStream[I1, I2](this, createJoinOperator())
+    }
+
+    private def createJoinOperator(): JavaStream[(I1, I2)] = {
+
+      val returnType = new CaseClassTypeInfo[(I1, I2)](
+
+        classOf[(I1, I2)], Seq(op.input1.getType, op.input2.getType), Array("_1", "_2")) {
+
+        override def createSerializer: TypeSerializer[(I1, I2)] = {
+          val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
+          for (i <- 0 until getArity) {
+            fieldSerializers(i) = types(i).createSerializer
+          }
+
+          new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
+            override def createInstance(fields: Array[AnyRef]) = {
+              (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
+            }
+          }
+        }
+      }
+
+      return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
+        .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
+        returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
+    }
+  }
+
+  class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends
+  DataStream[(I1, I2)](javaStream) {
+
+    private val op = jp.op
+
+    /**
+     * Sets a wrapper for the joined elements. For each joined pair, the result of the
+     * udf call will be emitted.
+     */
+    def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
+
+      val invokable = new CoWindowInvokable[I1, I2, R](
+        clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
+        op.timeStamp2)
+
+      javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
+        invokable)
+
+      javaStream.setType(implicitly[TypeInformation[R]])
+    }
+  }
+
+  private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2],
+                                                      joinFunction: (I1, I2) => R) = {
+    Validate.notNull(joinFunction, "Join function must not be null.")
+
+    val joinFun = new JoinFunction[I1, I2, R] {
+
+      val cleanFun = jp.op.input1.clean(joinFunction)
+
+      override def join(first: I1, second: I2): R = {
+        cleanFun(first, second)
+      }
+    }
+
+    new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala
new file mode 100644
index 0000000..fb3745f
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
+import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream }
+import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream }
+
+object StreamingConversions {
+
+  implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
+    new DataStream[R](javaStream)
+
+  implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] =
+    new WindowedDataStream[R](javaWStream)
+
+  implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] =
+    new SplitDataStream[R](javaStream)
+
+  implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]): 
+  ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
new file mode 100644
index 0000000..deda3d9
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import scala.Array.canBuildFrom
+import scala.collection.JavaConversions.iterableAsScalaIterable
+import scala.reflect.ClassTag
+
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.scala._
+import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
+import org.apache.flink.streaming.api.datastream.{WindowedDataStream => JavaWStream}
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
+import org.apache.flink.streaming.api.function.aggregation.SumFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
+import org.apache.flink.streaming.api.scala.StreamingConversions._
+import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
+import org.apache.flink.streaming.api.windowing.helper._
+import org.apache.flink.util.Collector
+
+class WindowedDataStream[T](javaStream: JavaWStream[T]) {
+
+  /**
+   * Defines the slide size (trigger frequency) for the windowed data stream.
+   * This controls how often the user defined function will be triggered on
+   * the window.
+   */
+  def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
+    javaStream.every(windowingHelper: _*)
+
+  /**
+   * Groups the elements of the WindowedDataStream using the given
+   * field positions. The window sizes (evictions) and slide sizes
+   * (triggers) will be calculated on the whole stream (in a central fashion),
+   * but the user defined functions will be applied on a per group basis.
+   * </br></br> To get windows and triggers on a per group basis apply the
+   * DataStream.window(...) operator on an already grouped data stream.
+   *
+   */
+  def groupBy(fields: Int*): WindowedDataStream[T] = javaStream.groupBy(fields: _*)
+
+  /**
+   * Groups the elements of the WindowedDataStream using the given
+   * field expressions. The window sizes (evictions) and slide sizes
+   * (triggers) will be calculated on the whole stream (in a central fashion),
+   * but the user defined functions will be applied on a per group basis.
+   * </br></br> To get windows and triggers on a per group basis apply the
+   * DataStream.window(...) operator on an already grouped data stream.
+   *
+   */
+  def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T] =
+   javaStream.groupBy(firstField +: otherFields.toArray: _*)   
+    
+  /**
+   * Groups the elements of the WindowedDataStream using the given
+   * KeySelector function. The window sizes (evictions) and slide sizes
+   * (triggers) will be calculated on the whole stream (in a central fashion),
+   * but the user defined functions will be applied on a per group basis.
+   * </br></br> To get windows and triggers on a per group basis apply the
+   * DataStream.window(...) operator on an already grouped data stream.
+   *
+   */
+  def groupBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = {
+
+    val keyExtractor = new KeySelector[T, K] {
+      val cleanFun = clean(fun)
+      def getKey(in: T) = cleanFun(in)
+    }
+    javaStream.groupBy(keyExtractor)
+  }
+
+  /**
+   * Applies a reduce transformation on the windowed data stream by reducing
+   * the current window at every trigger.
+   *
+   */
+  def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
+    if (reducer == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    javaStream.reduce(reducer)
+  }
+
+  /**
+   * Applies a reduce transformation on the windowed data stream by reducing
+   * the current window at every trigger.
+   *
+   */
+  def reduce(fun: (T, T) => T): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    val reducer = new ReduceFunction[T] {
+      val cleanFun = clean(fun)
+      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+    }
+    reduce(reducer)
+  }
+
+  /**
+   * Applies a reduceGroup transformation on the windowed data stream by reducing
+   * the current window at every trigger. In contrast with the simple binary reduce operator,
+   * groupReduce exposes the whole window through the Iterable interface.
+   * </br>
+   * </br>
+   * Whenever possible try to use reduce instead of groupReduce for increased efficiency
+   */
+  def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]):
+  DataStream[R] = {
+    if (reducer == null) {
+      throw new NullPointerException("GroupReduce function must not be null.")
+    }
+    javaStream.reduceGroup(reducer, implicitly[TypeInformation[R]])
+  }
+
+  /**
+   * Applies a reduceGroup transformation on the windowed data stream by reducing
+   * the current window at every trigger. In contrast with the simple binary reduce operator,
+   * groupReduce exposes the whole window through the Iterable interface.
+   * </br>
+   * </br>
+   * Whenever possible try to use reduce instead of groupReduce for increased efficiency
+   */
+  def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit):
+  DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("GroupReduce function must not be null.")
+    }
+    val reducer = new GroupReduceFunction[T, R] {
+      val cleanFun = clean(fun)
+      def reduce(in: java.lang.Iterable[T], out: Collector[R]) = { cleanFun(in, out) }
+    }
+    reduceGroup(reducer)
+  }
+
+  /**
+   * Applies an aggregation that that gives the maximum of the elements in the window at
+   * the given position.
+   *
+   */
+  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
+
+  /**
+   * Applies an aggregation that that gives the minimum of the elements in the window at
+   * the given position.
+   *
+   */
+  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
+
+  /**
+   * Applies an aggregation that sums the elements in the window at the given position.
+   *
+   */
+  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
+
+  /**
+   * Applies an aggregation that that gives the maximum element of the window by
+   * the given position. When equality, returns the first.
+   *
+   */
+  def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY,
+    position, first)
+
+  /**
+   * Applies an aggregation that that gives the minimum element of the window by
+   * the given position. When equality, returns the first.
+   *
+   */
+  def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY,
+    position, first)
+
+  def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true):
+  DataStream[T] = {
+
+    val jStream = javaStream.asInstanceOf[JavaWStream[Product]]
+    val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
+
+    val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position)
+
+    val reducer = aggregationType match {
+      case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(
+        outType.getTypeAt(position).getTypeClass()));
+      case _ => new agg.ProductComparableAggregator(aggregationType, first)
+    }
+
+    new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
new file mode 100644
index 0000000..83f2293
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.windowing
+
+import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta }
+import org.apache.commons.lang.Validate
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean;
+import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction
+
+object Delta {
+
+  /**
+   * Creates a delta helper representing a delta trigger or eviction policy.
+   * </br></br> This policy calculates a delta between the data point which
+   * triggered last and the currently arrived data point. It triggers if the
+   * delta is higher than a specified threshold. </br></br> In case it gets
+   * used for eviction, this policy starts from the first element of the
+   * buffer and removes all elements from the buffer which have a higher delta
+   * then the threshold. As soon as there is an element with a lower delta,
+   * the eviction stops.
+   */
+  def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): JavaDelta[T] = {
+    Validate.notNull(deltaFunction, "Delta function must not be null")
+    val df = new DeltaFunction[T] {
+      val cleanFun = clean(deltaFunction)
+      override def getDelta(first: T, second: T) = cleanFun(first, second)
+    }
+    JavaDelta.of(threshold, df, initVal)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
new file mode 100644
index 0000000..e1b9768
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.windowing
+
+import java.util.concurrent.TimeUnit
+import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime }
+
+import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.commons.net.ntp.TimeStamp
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp
+import org.apache.commons.lang.Validate
+
+object Time {
+
+  /**
+   * Creates a helper representing a time trigger which triggers every given
+   * length (slide size) or a time eviction which evicts all elements older
+   * than length (window size) using System time.
+   *
+   */
+  def of(windowSize: Long, timeUnit: TimeUnit): JavaTime[_] =
+    JavaTime.of(windowSize, timeUnit)
+
+  /**
+   * Creates a helper representing a time trigger which triggers every given
+   * length (slide size) or a time eviction which evicts all elements older
+   * than length (window size) using a user defined timestamp extractor.
+   *
+   */
+  def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): JavaTime[R] = {
+    Validate.notNull(timestamp, "Timestamp must not be null.")
+    val ts = new Timestamp[R] {
+      val fun = clean(timestamp, true)
+      override def getTimestamp(in: R) = fun(in)
+    }
+    JavaTime.of(windowSize, ts, startTime)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/pom.xml b/flink-addons/flink-streaming/pom.xml
index c20554b..386ef73 100644
--- a/flink-addons/flink-streaming/pom.xml
+++ b/flink-addons/flink-streaming/pom.xml
@@ -35,6 +35,7 @@ under the License.
 
 	<modules>
 		<module>flink-streaming-core</module>
+		<module>flink-streaming-scala</module>
 		<module>flink-streaming-examples</module>
 		<module>flink-streaming-connectors</module>
 	</modules>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
deleted file mode 100644
index e39fb11..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.scala.streaming.windowing
-
-
-import java.util.concurrent.TimeUnit._
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment
-import org.apache.flink.api.scala.streaming.windowing.Delta
-import org.apache.flink.streaming.api.windowing.helper.Time
-import org.apache.flink.util.Collector
-
-import scala.math.{max, min}
-import scala.util.Random
-
-/**
- * An example of grouped stream windowing where different eviction and 
- * trigger policies can be used.A source fetches events from cars 
- * every 1 sec containing their id, their current speed (kmh),
- * overall elapsed distance (m) and a timestamp. The streaming
- * example triggers the top speed of each car every x meters elapsed 
- * for the last y seconds.
- */
-object TopSpeedWindowing {
-
-  case class CarSpeed(carId: Int, speed: Int, distance: Double, time: Long)
-
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val cars = env.addSource(carSource _).groupBy("carId")
-      .window(Time.of(evictionSec, SECONDS))
-      .every(Delta.of[CarSpeed](triggerMeters, 
-          (oldSp,newSp) => newSp.distance-oldSp.distance, CarSpeed(0,0,0,0)))
-      .reduce((x, y) => if (x.speed > y.speed) x else y)
-
-    cars print
-
-    env.execute("TopSpeedWindowing")
-
-  }
-
-  def carSource(out: Collector[CarSpeed]) = {
-
-    val speeds = new Array[Int](numOfCars)
-    val distances = new Array[Double](numOfCars)
-
-    while (true) {
-      Thread sleep 1000
-      for (i <- 0 until speeds.length) {
-        speeds(i) = if (Random.nextBoolean) min(100, speeds(i) + 5) else max(0, speeds(i) - 5)
-        distances(i) += speeds(i) / 3.6d
-        out.collect(new CarSpeed(i, speeds(i), distances(i), System.currentTimeMillis))
-      }
-    }
-  }
-
-  def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      if (args.length == 3) {
-        numOfCars = args(0).toInt
-        evictionSec = args(1).toInt
-        triggerMeters = args(2).toDouble
-      }
-      else {
-        System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>")
-        false
-      }
-    }
-    true
-  }
-
-  var numOfCars = 2
-  var evictionSec = 10
-  var triggerMeters = 50d
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala
deleted file mode 100644
index caf5eb9..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.scala.streaming.windowing
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment
-import org.apache.flink.util.Collector
-import scala.util.Random
-
-object WindowJoin {
-
-  case class Name(id: Long, name: String)
-  case class Age(id: Long, age: Int)
-  case class Person(name: String, age: Long)
-
-  def main(args: Array[String]) {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    //Create streams for names and ages by mapping the inputs to the corresponding objects
-    val names = env.addSource(nameStream _).map(x => Name(x._1, x._2))
-    val ages = env.addSource(ageStream _).map(x => Age(x._1, x._2))
-
-    //Join the two input streams by id on the last second and create new Person objects
-    //containing both name and age
-    val joined =
-      names.join(ages).onWindow(1000)
-                      .where("id").equalTo("id") { (n, a) => Person(n.name, a.age) }
-
-    joined print
-
-    env.execute("WindowJoin")
-  }
-
-  //Stream source for generating (id, name) pairs
-  def nameStream(out: Collector[(Long, String)]) = {
-    val names = Array("tom", "jerry", "alice", "bob", "john", "grace")
-
-    for (i <- 1 to 10000) {
-      if (i % 100 == 0) Thread.sleep(1000) else {
-        out.collect((i, names(Random.nextInt(names.length))))
-      }
-    }
-  }
-
-  //Stream source for generating (id, age) pairs
-  def ageStream(out: Collector[(Long, Int)]) = {
-    for (i <- 1 to 10000) {
-      if (i % 100 == 0) Thread.sleep(1000) else {
-        out.collect((i, Random.nextInt(90)))
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index 4ccbb98..b902655 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -50,12 +50,6 @@ under the License.
 			<artifactId>flink-compiler</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-        
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
 
 		<dependency>
 			<groupId>org.scala-lang</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/java/org/apache/flink/api/scala/streaming/ScalaStreamingAggregator.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/streaming/ScalaStreamingAggregator.java b/flink-scala/src/main/java/org/apache/flink/api/scala/streaming/ScalaStreamingAggregator.java
deleted file mode 100644
index 2f587d7..0000000
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/streaming/ScalaStreamingAggregator.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.streaming;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.function.aggregation.SumFunction;
-
-import scala.Product;
-
-public class ScalaStreamingAggregator<IN extends Product> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	TupleSerializerBase<IN> serializer;
-	Object[] fields;
-	int length;
-	int position;
-
-	public ScalaStreamingAggregator(TypeSerializer<IN> serializer, int pos) {
-		this.serializer = (TupleSerializerBase<IN>) serializer;
-		this.length = this.serializer.getArity();
-		this.fields = new Object[this.length];
-		this.position = pos;
-	}
-
-	public class Sum extends AggregationFunction<IN> {
-		private static final long serialVersionUID = 1L;
-		SumFunction sumFunction;
-
-		public Sum(SumFunction func) {
-			super(ScalaStreamingAggregator.this.position);
-			this.sumFunction = func;
-		}
-
-		@Override
-		public IN reduce(IN value1, IN value2) throws Exception {
-			for (int i = 0; i < length; i++) {
-				fields[i] = value2.productElement(i);
-			}
-
-			fields[position] = sumFunction.add(fields[position], value1.productElement(position));
-
-			return serializer.createInstance(fields);
-		}
-	}
-
-	public class ProductComparableAggregator extends ComparableAggregator<IN> {
-
-		private static final long serialVersionUID = 1L;
-
-		public ProductComparableAggregator(AggregationFunction.AggregationType aggregationType,
-				boolean first) {
-			super(ScalaStreamingAggregator.this.position, aggregationType, first);
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public IN reduce(IN value1, IN value2) throws Exception {
-			Object v1 = value1.productElement(position);
-			Object v2 = value2.productElement(position);
-
-			int c = comparator.isExtremal((Comparable<Object>) v1, v2);
-
-			if (byAggregate) {
-				if (c == 1) {
-					return value1;
-				}
-				if (first) {
-					if (c == 0) {
-						return value1;
-					}
-				}
-
-				return value2;
-			} else {
-				for (int i = 0; i < length; i++) {
-					fields[i] = value2.productElement(i);
-				}
-
-				if (c == 1) {
-					fields[position] = v1;
-				}
-
-				return serializer.createInstance(fields);
-			}
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
deleted file mode 100644
index 985e512..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.streaming
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment._
-import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaCStream }
-import org.apache.flink.streaming.api.function.co.{ CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction }
-import org.apache.flink.streaming.api.invokable.operator.co.{ CoFlatMapInvokable, CoMapInvokable, CoReduceInvokable }
-import org.apache.flink.util.Collector
-import org.apache.flink.api.scala.streaming.StreamingConversions._
-
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-
-class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
-
-  /**
-   * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
-   * the output to a common type. The transformation calls a
-   * @param fun1 for each element of the first input and
-   * @param fun2 for each element of the second input. Each
-   * CoMapFunction call returns exactly one element.
-   *
-   * The CoMapFunction used to jointly transform the two input
-   * DataStreams
-   * @return The transformed { @link DataStream}
-   */
-  def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): 
-  DataStream[R] = {
-    if (fun1 == null || fun2 == null) {
-      throw new NullPointerException("Map function must not be null.")
-    }
-    val comapper = new CoMapFunction[IN1, IN2, R] {
-      def map1(in1: IN1): R = clean(fun1)(in1)
-      def map2(in2: IN2): R = clean(fun2)(in2)
-    }
-
-    new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]],
-      new CoMapInvokable[IN1, IN2, R](comapper)))
-  }
-
-  /**
-   * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
-   * the output to a common type. The transformation calls a
-   * {@link CoMapFunction#map1} for each element of the first input and
-   * {@link CoMapFunction#map2} for each element of the second input. Each
-   * CoMapFunction call returns exactly one element. The user can also extend
-   * {@link RichCoMapFunction} to gain access to other features provided by
-   * the {@link RichFuntion} interface.
-   *
-   * @param coMapper
-   * The CoMapFunction used to jointly transform the two input
-   * DataStreams
-   * @return The transformed { @link DataStream}
-   */
-  def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): 
-  DataStream[R] = {
-    if (coMapper == null) {
-      throw new NullPointerException("Map function must not be null.")
-    }
-
-    new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]],
-      new CoMapInvokable[IN1, IN2, R](coMapper)))
-  }
-
-  /**
-   * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
-   * maps the output to a common type. The transformation calls a
-   * {@link CoFlatMapFunction#flatMap1} for each element of the first input
-   * and {@link CoFlatMapFunction#flatMap2} for each element of the second
-   * input. Each CoFlatMapFunction call returns any number of elements
-   * including none. The user can also extend {@link RichFlatMapFunction} to
-   * gain access to other features provided by the {@link RichFuntion}
-   * interface.
-   *
-   * @param coFlatMapper
-   * The CoFlatMapFunction used to jointly transform the two input
-   * DataStreams
-   * @return The transformed { @link DataStream}
-   */
-  def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): 
-  DataStream[R] = {
-    if (coFlatMapper == null) {
-      throw new NullPointerException("FlatMap function must not be null.")
-    }
-    new DataStream[R](javaStream.addCoFunction("flatMap", implicitly[TypeInformation[R]],
-      new CoFlatMapInvokable[IN1, IN2, R](coFlatMapper)))
-  }
-
-  /**
-   * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
-   * maps the output to a common type. The transformation calls a
-   * @param fun1 for each element of the first input
-   * and @param fun2 for each element of the second
-   * input. Each CoFlatMapFunction call returns any number of elements
-   * including none.
-   *
-   * @return The transformed { @link DataStream}
-   */
-  def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, 
-      fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
-    if (fun1 == null || fun2 == null) {
-      throw new NullPointerException("FlatMap functions must not be null.")
-    }
-    val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
-      def flatMap1(value: IN1, out: Collector[R]): Unit = clean(fun1)(value, out)
-      def flatMap2(value: IN2, out: Collector[R]): Unit = clean(fun2)(value, out)
-    }
-    flatMap(flatMapper)
-  }
-
-  /**
-   * GroupBy operation for connected data stream. Groups the elements of
-   * input1 and input2 according to keyPosition1 and keyPosition2. Used for
-   * applying function on grouped data streams for example
-   * {@link ConnectedDataStream#reduce}
-   *
-   * @param keyPosition1
-   * The field used to compute the hashcode of the elements in the
-   * first input stream.
-   * @param keyPosition2
-   * The field used to compute the hashcode of the elements in the
-   * second input stream.
-   * @return @return The transformed { @link ConnectedDataStream}
-   */
-  def groupBy(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = {
-    javaStream.groupBy(keyPosition1, keyPosition2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream. Groups the elements of
-   * input1 and input2 according to keyPositions1 and keyPositions2. Used for
-   * applying function on grouped data streams for example
-   * {@link ConnectedDataStream#reduce}
-   *
-   * @param keyPositions1
-   * The fields used to group the first input stream.
-   * @param keyPositions2
-   * The fields used to group the second input stream.
-   * @return @return The transformed { @link ConnectedDataStream}
-   */
-  def groupBy(keyPositions1: Array[Int], keyPositions2: Array[Int]): 
-  ConnectedDataStream[IN1, IN2] = {
-    javaStream.groupBy(keyPositions1, keyPositions2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream using key expressions. Groups
-   * the elements of input1 and input2 according to field1 and field2. A field
-   * expression is either the name of a public field or a getter method with
-   * parentheses of the {@link DataStream}S underlying type. A dot can be used
-   * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-   *
-   * @param field1
-   * The grouping expression for the first input
-   * @param field2
-   * The grouping expression for the second input
-   * @return The grouped { @link ConnectedDataStream}
-   */
-  def groupBy(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = {
-    javaStream.groupBy(field1, field2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream using key expressions. Groups
-   * the elements of input1 and input2 according to fields1 and fields2. A
-   * field expression is either the name of a public field or a getter method
-   * with parentheses of the {@link DataStream}S underlying type. A dot can be
-   * used to drill down into objects, as in {@code "field1.getInnerField2()" }
-   * .
-   *
-   * @param fields1
-   * The grouping expressions for the first input
-   * @param fields2
-   * The grouping expressions for the second input
-   * @return The grouped { @link ConnectedDataStream}
-   */
-  def groupBy(fields1: Array[String], fields2: Array[String]): 
-  ConnectedDataStream[IN1, IN2] = {
-    javaStream.groupBy(fields1, fields2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream. Groups the elements of
-   * input1 and input2 using fun1 and fun2. Used for applying
-   * function on grouped data streams for example
-   * {@link ConnectedDataStream#reduce}
-   *
-   * @param fun1
-   * The function used for grouping the first input
-   * @param fun2
-   * The function used for grouping the second input
-   * @return @return The transformed { @link ConnectedDataStream}
-   */
-  def groupBy[K: TypeInformation](fun1: IN1 => _, fun2: IN2 => _):
-  ConnectedDataStream[IN1, IN2] = {
-
-    val keyExtractor1 = new KeySelector[IN1, Any] {
-      def getKey(in: IN1) = clean(fun1)(in)
-    }
-    val keyExtractor2 = new KeySelector[IN2, Any] {
-      def getKey(in: IN2) = clean(fun2)(in)
-    }
-
-    javaStream.groupBy(keyExtractor1, keyExtractor2)
-  }
-
-  /**
-   * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
-   * the outputs to a common type. If the {@link ConnectedDataStream} is
-   * batched or windowed then the reduce transformation is applied on every
-   * sliding batch/window of the data stream. If the connected data stream is
-   * grouped then the reducer is applied on every group of elements sharing
-   * the same key. This type of reduce is much faster than reduceGroup since
-   * the reduce function can be applied incrementally.
-   *
-   * @param coReducer
-   * The { @link CoReduceFunction} that will be called for every
-   *             element of the inputs.
-   * @return The transformed { @link DataStream}.
-   */
-  def reduce[R: TypeInformation: ClassTag](coReducer: CoReduceFunction[IN1, IN2, R]): 
-  DataStream[R] = {
-    if (coReducer == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-
-    new DataStream[R](javaStream.addCoFunction("coReduce", implicitly[TypeInformation[R]],
-      new CoReduceInvokable[IN1, IN2, R](coReducer)))
-  }
-
-  /**
-   * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
-   * the outputs to a common type. If the {@link ConnectedDataStream} is
-   * batched or windowed then the reduce transformation is applied on every
-   * sliding batch/window of the data stream. If the connected data stream is
-   * grouped then the reducer is applied on every group of elements sharing
-   * the same key. This type of reduce is much faster than reduceGroup since
-   * the reduce function can be applied incrementally.
-   *
-   * @param reducer1
-   * @param reducer2
-   * @param mapper1
-   * @param mapper2
-   *
-   * @return The transformed { @link DataStream}.
-   */
-  def reduce[R: TypeInformation: ClassTag](reducer1: (IN1, IN1) => IN1, 
-      reducer2: (IN2, IN2) => IN2,mapper1: IN1 => R, mapper2: IN2 => R): DataStream[R] = {
-    if (mapper1 == null || mapper2 == null) {
-      throw new NullPointerException("Map functions must not be null.")
-    }
-    if (reducer1 == null || reducer2 == null) {
-      throw new NullPointerException("Reduce functions must not be null.")
-    }
-
-    val reducer = new CoReduceFunction[IN1, IN2, R] {
-      def reduce1(value1: IN1, value2: IN1): IN1 = clean(reducer1)(value1, value2)
-      def map2(value: IN2): R = clean(mapper2)(value)
-      def reduce2(value1: IN2, value2: IN2): IN2 = clean(reducer2)(value1, value2)
-      def map1(value: IN1): R = clean(mapper1)(value)
-    }
-    reduce(reducer)
-  }
-
-  /**
-   * Applies a CoWindow transformation on the connected DataStreams. The
-   * transformation calls the {@link CoWindowFunction#coWindow} method for for
-   * time aligned windows of the two data streams. System time is used as
-   * default to compute windows.
-   *
-   * @param coWindowFunction
-   * The { @link CoWindowFunction} that will be applied for the time
-   *             windows.
-   * @param windowSize
-   * Size of the windows that will be aligned for both streams in
-   * milliseconds.
-   * @param slideInterval
-   * After every function call the windows will be slid by this
-   * interval.
-   *
-   * @return The transformed { @link DataStream}.
-   */
-  def windowReduce[R: TypeInformation: ClassTag](coWindowFunction: 
-      CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long) = {
-    if (coWindowFunction == null) {
-      throw new NullPointerException("CoWindow function must no be null")
-    }
-
-    javaStream.windowReduce(coWindowFunction, windowSize, slideInterval)
-  }
-
-  /**
-   * Applies a CoWindow transformation on the connected DataStreams. The
-   * transformation calls the {@link CoWindowFunction#coWindow} method for for
-   * time aligned windows of the two data streams. System time is used as
-   * default to compute windows.
-   *
-   * @param coWindower
-   * The coWindowing function to be applied for the time windows.
-   * @param windowSize
-   * Size of the windows that will be aligned for both streams in
-   * milliseconds.
-   * @param slideInterval
-   * After every function call the windows will be slid by this
-   * interval.
-   *
-   * @return The transformed { @link DataStream}.
-   */
-  def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], Seq[IN2], 
-      Collector[R]) => Unit, windowSize: Long, slideInterval: Long) = {
-    if (coWindower == null) {
-      throw new NullPointerException("CoWindow function must no be null")
-    }
-
-    val coWindowFun = new CoWindowFunction[IN1, IN2, R] {
-      def coWindow(first: util.List[IN1], second: util.List[IN2], 
-          out: Collector[R]): Unit = clean(coWindower)(first, second, out)
-    }
-
-    javaStream.windowReduce(coWindowFun, windowSize, slideInterval)
-  }
-
-  /**
-   * Returns the first {@link DataStream}.
-   *
-   * @return The first DataStream.
-   */
-  def getFirst(): DataStream[IN1] = {
-    javaStream.getFirst
-  }
-
-  /**
-   * Returns the second {@link DataStream}.
-   *
-   * @return The second DataStream.
-   */
-  def getSecond(): DataStream[IN2] = {
-    javaStream.getSecond
-  }
-
-  /**
-   * Gets the type of the first input
-   *
-   * @return The type of the first input
-   */
-  def getInputType1(): TypeInformation[IN1] = {
-    javaStream.getInputType1
-  }
-
-  /**
-   * Gets the type of the second input
-   *
-   * @return The type of the second input
-   */
-  def getInputType2(): TypeInformation[IN2] = {
-    javaStream.getInputType2
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
deleted file mode 100644
index ccfd176..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ /dev/null
@@ -1,558 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.streaming
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
-  SingleOutputStreamOperator, GroupedDataStream}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import scala.reflect.ClassTag
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.streaming.api.invokable.operator.MapInvokable
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.streaming.api.invokable.StreamInvokable
-import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable
-import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.common.functions.FilterFunction
-import org.apache.flink.streaming.api.function.sink.SinkFunction
-import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy
-import org.apache.flink.streaming.api.collector.OutputSelector
-import scala.collection.JavaConversions._
-import java.util.HashMap
-import org.apache.flink.streaming.api.function.aggregation.SumFunction
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.scala.streaming.StreamingConversions._
-
-class DataStream[T](javaStream: JavaStream[T]) {
-
-  /**
-   * Gets the underlying java DataStream object.
-   */
-  def getJavaStream: JavaStream[T] = javaStream
-
-  /**
-   * Sets the degree of parallelism of this operation. This must be greater than 1.
-   */
-  def setParallelism(dop: Int): DataStream[T] = {
-    javaStream match {
-      case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop)
-      case _ =>
-        throw new UnsupportedOperationException("Operator " + javaStream.toString +  " cannot " +
-          "have " +
-          "parallelism.")
-    }
-    this
-  }
-
-  /**
-   * Returns the degree of parallelism of this operation.
-   */
-  def getParallelism: Int = javaStream match {
-    case op: SingleOutputStreamOperator[_, _] => op.getParallelism
-    case _ =>
-      throw new UnsupportedOperationException("Operator " + javaStream.toString + " does not have" +
-        " "  +
-        "parallelism.")
-  }
-
-  /**
-   * Creates a new DataStream by merging DataStream outputs of
-   * the same type with each other. The DataStreams merged using this operator
-   * will be transformed simultaneously.
-   *
-   */
-  def merge(dataStreams: DataStream[T]*): DataStream[T] =
-    javaStream.merge(dataStreams.map(_.getJavaStream): _*)
-
-  /**
-   * Creates a new ConnectedDataStream by connecting
-   * DataStream outputs of different type with each other. The
-   * DataStreams connected using this operators can be used with CoFunctions.
-   *
-   */
-  def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] = 
-    javaStream.connect(dataStream.getJavaStream)
-
-  /**
-   * Groups the elements of a DataStream by the given key positions (for tuple/array types) to
-   * be used with grouped operators like grouped reduce or grouped aggregations
-   *
-   */
-  def groupBy(fields: Int*): DataStream[T] = javaStream.groupBy(fields: _*)
-
-  /**
-   * Groups the elements of a DataStream by the given field expressions to
-   * be used with grouped operators like grouped reduce or grouped aggregations
-   *
-   */
-  def groupBy(firstField: String, otherFields: String*): DataStream[T] = 
-   javaStream.groupBy(firstField +: otherFields.toArray: _*)   
-  
-  /**
-   * Groups the elements of a DataStream by the given K key to
-   * be used with grouped operators like grouped reduce or grouped aggregations
-   *
-   */
-  def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = {
-
-    val keyExtractor = new KeySelector[T, K] {
-      val cleanFun = clean(fun)
-      def getKey(in: T) = cleanFun(in)
-    }
-    javaStream.groupBy(keyExtractor)
-  }
-
-  /**
-   * Sets the partitioning of the DataStream so that the output is
-   * partitioned by the selected fields. This setting only effects the how the outputs will be
-   * distributed between the parallel instances of the next processing operator.
-   *
-   */
-  def partitionBy(fields: Int*): DataStream[T] =
-    javaStream.partitionBy(fields: _*)
-
-  /**
-   * Sets the partitioning of the DataStream so that the output is
-   * partitioned by the selected fields. This setting only effects the how the outputs will be
-   * distributed between the parallel instances of the next processing operator.
-   *
-   */
-  def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
-   javaStream.partitionBy(firstField +: otherFields.toArray: _*)
-
-  /**
-   * Sets the partitioning of the DataStream so that the output is
-   * partitioned by the given Key. This setting only effects the how the outputs will be
-   * distributed between the parallel instances of the next processing operator.
-   *
-   */
-  def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
-
-    val keyExtractor = new KeySelector[T, K] {
-      val cleanFun = clean(fun)
-      def getKey(in: T) = cleanFun(in)
-    }
-    javaStream.partitionBy(keyExtractor)
-  }
-
-  /**
-   * Sets the partitioning of the DataStream so that the output tuples
-   * are broadcasted to every parallel instance of the next component. This
-   * setting only effects the how the outputs will be distributed between the
-   * parallel instances of the next processing operator.
-   *
-   */
-  def broadcast: DataStream[T] = javaStream.broadcast()
-
-  /**
-   * Sets the partitioning of the DataStream so that the output tuples
-   * are shuffled to the next component. This setting only effects the how the
-   * outputs will be distributed between the parallel instances of the next
-   * processing operator.
-   *
-   */
-  def shuffle: DataStream[T] = javaStream.shuffle()
-
-  /**
-   * Sets the partitioning of the DataStream so that the output tuples
-   * are forwarded to the local subtask of the next component (whenever
-   * possible). This is the default partitioner setting. This setting only
-   * effects the how the outputs will be distributed between the parallel
-   * instances of the next processing operator.
-   *
-   */
-  def forward: DataStream[T] = javaStream.forward()
-
-  /**
-   * Sets the partitioning of the DataStream so that the output tuples
-   * are distributed evenly to the next component.This setting only effects
-   * the how the outputs will be distributed between the parallel instances of
-   * the next processing operator.
-   *
-   */
-  def distribute: DataStream[T] = javaStream.distribute()
-
-  /**
-   * Initiates an iterative part of the program that creates a loop by feeding
-   * back data streams. To create a streaming iteration the user needs to define
-   * a transformation that creates two DataStreams.The first one one is the output
-   * that will be fed back to the start of the iteration and the second is the output
-   * stream of the iterative part.
-   * <p>
-   * stepfunction: initialStream => (feedback, output)
-   * <p>
-   * A common pattern is to use output splitting to create feedback and output DataStream.
-   * Please refer to the .split(...) method of the DataStream
-   * <p>
-   * By default a DataStream with iteration will never terminate, but the user
-   * can use the maxWaitTime parameter to set a max waiting time for the iteration head.
-   * If no data received in the set time the stream terminates.
-   *
-   *
-   */
-  def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]),  maxWaitTimeMillis:
-    Long = 0): DataStream[T] = {
-    val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
-
-    val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
-    iterativeStream.closeWith(feedback.getJavaStream)
-    output
-  }
-
-  /**
-   * Applies an aggregation that that gives the current maximum of the data stream at
-   * the given position.
-   *
-   */
-  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
-
-  /**
-   * Applies an aggregation that that gives the current minimum of the data stream at
-   * the given position.
-   *
-   */
-  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
-
-  /**
-   * Applies an aggregation that sums the data stream at the given position.
-   *
-   */
-  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
-
-  /**
-   * Applies an aggregation that that gives the current minimum element of the data stream by
-   * the given position. When equality, the user can set to get the first or last element with
-   * the minimal value.
-   *
-   */
-  def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType
-    .MINBY, position, first)
-
-  /**
-   * Applies an aggregation that that gives the current maximum element of the data stream by
-   * the given position. When equality, the user can set to get the first or last element with
-   * the maximal value.
-   *
-   */
-  def maxBy(position: Int, first: Boolean = true): DataStream[T] =
-    aggregate(AggregationType.MAXBY, position, first)
-
-  private def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true):
-    DataStream[T] = {
-
-    val jStream = javaStream.asInstanceOf[JavaStream[Product]]
-    val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
-
-    val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position)
-
-    val reducer = aggregationType match {
-      case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).
-        getTypeClass()));
-      case _ => new agg.ProductComparableAggregator(aggregationType, first)
-    }
-
-    val invokable = jStream match {
-      case groupedStream: GroupedDataStream[_] => new GroupedReduceInvokable(reducer,
-        groupedStream.getKeySelector())
-      case _ => new StreamReduceInvokable(reducer)
-    }
-    new DataStream[Product](jStream.transform("aggregation", jStream.getType(),
-      invokable)).asInstanceOf[DataStream[T]]
-  }
-
-  /**
-   * Creates a new DataStream containing the current number (count) of
-   * received records.
-   *
-   */
-  def count: DataStream[Long] = new DataStream[java.lang.Long](
-    javaStream.count()).asInstanceOf[DataStream[Long]]
-
-  /**
-   * Creates a new DataStream by applying the given function to every element of this DataStream.
-   */
-  def map[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] = {
-    if (fun == null) {
-      throw new NullPointerException("Map function must not be null.")
-    }
-    val mapper = new MapFunction[T, R] {
-      val cleanFun = clean(fun)
-      def map(in: T): R = cleanFun(in)
-    }
-
-    javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper))
-  }
-
-  /**
-   * Creates a new DataStream by applying the given function to every element of this DataStream.
-   */
-  def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataStream[R] = {
-    if (mapper == null) {
-      throw new NullPointerException("Map function must not be null.")
-    }
-
-    javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper))
-  }
-
-  /**
-   * Creates a new DataStream by applying the given function to every element and flattening
-   * the results.
-   */
-  def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataStream[R] = {
-    if (flatMapper == null) {
-      throw new NullPointerException("FlatMap function must not be null.")
-    }
-   javaStream.transform("flatMap", implicitly[TypeInformation[R]], 
-       new FlatMapInvokable[T, R](flatMapper))
-  }
-
-  /**
-   * Creates a new DataStream by applying the given function to every element and flattening
-   * the results.
-   */
-  def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataStream[R] = {
-    if (fun == null) {
-      throw new NullPointerException("FlatMap function must not be null.")
-    }
-    val flatMapper = new FlatMapFunction[T, R] {
-      val cleanFun = clean(fun)
-      def flatMap(in: T, out: Collector[R]) { cleanFun(in, out) }
-    }
-    flatMap(flatMapper)
-  }
-
-  /**
-   * Creates a new DataStream by applying the given function to every element and flattening
-   * the results.
-   */
-  def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataStream[R] = {
-    if (fun == null) {
-      throw new NullPointerException("FlatMap function must not be null.")
-    }
-    val flatMapper = new FlatMapFunction[T, R] {
-      val cleanFun = clean(fun)
-      def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect }
-    }
-    flatMap(flatMapper)
-  }
-
-  /**
-   * Creates a new [[DataStream]] by reducing the elements of this DataStream
-   * using an associative reduce function.
-   */
-  def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
-    if (reducer == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    javaStream match {
-      case ds: GroupedDataStream[_] => javaStream.transform("reduce",
-        javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector()))
-      case _ => javaStream.transform("reduce", javaStream.getType(),
-        new StreamReduceInvokable[T](reducer))
-    }
-  }
-
-  /**
-   * Creates a new [[DataStream]] by reducing the elements of this DataStream
-   * using an associative reduce function.
-   */
-  def reduce(fun: (T, T) => T): DataStream[T] = {
-    if (fun == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    val reducer = new ReduceFunction[T] {
-      val cleanFun = clean(fun)
-      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
-    }
-    reduce(reducer)
-  }
-
-  /**
-   * Creates a new DataStream that contains only the elements satisfying the given filter predicate.
-   */
-  def filter(filter: FilterFunction[T]): DataStream[T] = {
-    if (filter == null) {
-      throw new NullPointerException("Filter function must not be null.")
-    }
-    javaStream.filter(filter)
-  }
-
-  /**
-   * Creates a new DataStream that contains only the elements satisfying the given filter predicate.
-   */
-  def filter(fun: T => Boolean): DataStream[T] = {
-    if (fun == null) {
-      throw new NullPointerException("Filter function must not be null.")
-    }
-    val filter = new FilterFunction[T] {
-      val cleanFun = clean(fun)
-      def filter(in: T) = cleanFun(in)
-    }
-    this.filter(filter)
-  }
-
-  /**
-   * Create a WindowedDataStream that can be used to apply
-   * transformation like .reduce(...) or aggregations on
-   * preset chunks(windows) of the data stream. To define the windows one or
-   * more WindowingHelper-s such as Time, Count and
-   * Delta can be used.</br></br> When applied to a grouped data
-   * stream, the windows (evictions) and slide sizes (triggers) will be
-   * computed on a per group basis. </br></br> For more advanced control over
-   * the trigger and eviction policies please use to
-   * window(List(triggers), List(evicters))
-   */
-  def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
-    javaStream.window(windowingHelper: _*)
-
-  /**
-   * Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s.
-   * Windowing can be used to apply transformation like .reduce(...) or aggregations on
-   * preset chunks(windows) of the data stream.</br></br>For most common
-   * use-cases please refer to window(WindowingHelper[_]*)
-   *
-   */
-  def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]):
-    WindowedDataStream[T] = javaStream.window(triggers, evicters)
-
-  /**
-   *
-   * Operator used for directing tuples to specific named outputs using an
-   * OutputSelector. Calling this method on an operator creates a new
-   * SplitDataStream.
-   */
-  def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream match {
-    case op: SingleOutputStreamOperator[_, _] => op.split(selector)
-    case _ =>
-      throw new UnsupportedOperationException("Operator " + javaStream.toString + " can not be " +
-        "split.")
-  }
-
-  /**
-   * Creates a new SplitDataStream that contains only the elements satisfying the
-   *  given output selector predicate.
-   */
-  def split(fun: T => String): SplitDataStream[T] = {
-    if (fun == null) {
-      throw new NullPointerException("OutputSelector must not be null.")
-    }
-    val selector = new OutputSelector[T] {
-      val cleanFun = clean(fun)
-      def select(in: T): java.lang.Iterable[String] = {
-        List(cleanFun(in))
-      }
-    }
-    split(selector)
-  }
-
-  /**
-   * Initiates a temporal Join transformation that joins the elements of two
-   * data streams on key equality over a specified time window.
-   *
-   * This method returns a StreamJoinOperator on which the
-   * .onWindow(..) should be called to define the
-   * window, and then the .where(..) and .equalTo(..) methods can be used to defin
-   * the join keys.</p> The user can also use the apply method of the returned JoinedStream
-   * to use custom join function.
-   *
-   */
-  def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] =
-    new StreamJoinOperator[T, R](javaStream, stream.getJavaStream)
-
-  /**
-   * Initiates a temporal cross transformation that builds all pair
-   * combinations of elements of both DataStreams, i.e., it builds a Cartesian
-   * product.
-   *
-   * This method returns a StreamJoinOperator on which the
-   * .onWindow(..) should be called to define the
-   * window, and then the .where(..) and .equalTo(..) methods can be used to defin
-   * the join keys.</p> The user can also use the apply method of the returned JoinedStream
-   * to use custom join function.
-   *
-   */
-  def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] =
-    new StreamCrossOperator[T, R](javaStream, stream.getJavaStream)
-
-  /**
-   * Writes a DataStream to the standard output stream (stdout). For each
-   * element of the DataStream the result of .toString is
-   * written.
-   *
-   */
-  def print(): DataStream[T] = javaStream.print()
-
-  /**
-   * Writes a DataStream to the file specified by path in text format. The
-   * writing is performed periodically, in every millis milliseconds. For
-   * every element of the DataStream the result of .toString
-   * is written.
-   *
-   */
-  def writeAsText(path: String, millis: Long = 0): DataStream[T] =
-    javaStream.writeAsText(path, millis)
-
-  /**
-   * Writes a DataStream to the file specified by path in text format. The
-   * writing is performed periodically, in every millis milliseconds. For
-   * every element of the DataStream the result of .toString
-   * is written.
-   *
-   */
-  def writeAsCsv(path: String, millis: Long = 0): DataStream[T] =
-    javaStream.writeAsCsv(path, millis)
-
-  /**
-   * Adds the given sink to this DataStream. Only streams with sinks added
-   * will be executed once the StreamExecutionEnvironment.execute(...)
-   * method is called.
-   *
-   */
-  def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] =
-    javaStream.addSink(sinkFuntion)
-
-  /**
-   * Adds the given sink to this DataStream. Only streams with sinks added
-   * will be executed once the StreamExecutionEnvironment.execute(...)
-   * method is called.
-   *
-   */
-  def addSink(fun: T => Unit): DataStream[T] = {
-    if (fun == null) {
-      throw new NullPointerException("Sink function must not be null.")
-    }
-    val sinkFunction = new SinkFunction[T] {
-      val cleanFun = clean(fun)
-      def invoke(in: T) = cleanFun(in)
-    }
-    this.addSink(sinkFunction)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
deleted file mode 100644
index f61e34b..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.streaming
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream }
-import org.apache.flink.api.scala.streaming.StreamingConversions._
-
-/**
- * The SplitDataStream represents an operator that has been split using an
- * {@link OutputSelector}. Named outputs can be selected using the
- * {@link #select} function.
- *
- * @param <OUT>
- *            The type of the output.
- */
-class SplitDataStream[T](javaStream: SplitJavaStream[T]) {
-
-  /**
-   * Gets the underlying java DataStream object.
-   */
-  private[flink] def getJavaStream: SplitJavaStream[T] = javaStream
-
-  /**
-   *  Sets the output names for which the next operator will receive values.
-   */
-  def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*)
-
-  /**
-   * Selects all output names from a split data stream.
-   */
-  def selectAll(): DataStream[T] = javaStream.selectAll()
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
deleted file mode 100644
index cac2927..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.streaming
-
-import org.apache.flink.api.common.functions.JoinFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.TemporalOperator
-import scala.reflect.ClassTag
-import org.apache.commons.lang.Validate
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
-import org.apache.flink.streaming.api.function.co.CrossWindowFunction
-import org.apache.flink.api.common.functions.CrossFunction
-import org.apache.flink.api.scala.typeutils.CaseClassSerializer
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
-import org.apache.flink.api.scala.streaming.StreamingConversions._
-
-class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
-  TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
-
-  override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, I2] = {
-
-    val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this,
-      (l: I1, r: I2) => (l, r))
-
-    val returnType = new CaseClassTypeInfo[(I1, I2)](
-
-      classOf[(I1, I2)], Seq(input1.getType, input2.getType), Array("_1", "_2")) {
-
-      override def createSerializer: TypeSerializer[(I1, I2)] = {
-        val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
-        for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer
-        }
-
-        new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
-          override def createInstance(fields: Array[AnyRef]) = {
-            (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
-          }
-        }
-      }
-    }
-
-    val javaStream = input1.connect(input2).addGeneralWindowCombine(
-      crossWindowFunction,
-      returnType, windowSize,
-      slideInterval, timeStamp1, timeStamp2);
-
-    new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream)
-  }
-}
-object StreamCrossOperator {
-
-  private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2],
-                                           javaStream: JavaStream[(I1, I2)]) extends
-    DataStream[(I1, I2)](javaStream) {
-
-    /**
-     * Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf
-     * call will be emitted.
-     *
-     */
-    def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
-
-      val invokable = new CoWindowInvokable[I1, I2, R](
-        clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
-        op.timeStamp2)
-
-      javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
-        invokable)
-
-      javaStream.setType(implicitly[TypeInformation[R]])
-    }
-  }
-
-  private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2],
-                                                       crossFunction: (I1, I2) => R):
-  CrossWindowFunction[I1, I2, R] = {
-    Validate.notNull(crossFunction, "Join function must not be null.")
-
-    val crossFun = new CrossFunction[I1, I2, R] {
-      val cleanFun = op.input1.clean(crossFunction)
-
-      override def cross(first: I1, second: I2): R = {
-        cleanFun(first, second)
-      }
-    }
-
-    new CrossWindowFunction[I1, I2, R](crossFun)
-  }
-
-}


Mime
View raw message