flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [26/27] incubator-flink git commit: [streaming] [scala] Restructured streaming scala project and examples
Date Sun, 04 Jan 2015 20:51:16 GMT
[streaming] [scala] Restructured streaming scala project and examples

This closes #275


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/8183c8c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/8183c8c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/8183c8c3

Branch: refs/heads/master
Commit: 8183c8c3d0bf720d3fb8b407615fc033dcf169dd
Parents: 3f1af0e
Author: mbalassi <mbalassi@apache.org>
Authored: Sun Jan 4 12:44:16 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Sun Jan 4 20:54:00 2015 +0100

----------------------------------------------------------------------
 .../flink-streaming-examples/pom.xml            | 137 ++++-
 .../examples/windowing/TopSpeedWindowing.scala  |  98 ++++
 .../scala/examples/windowing/WindowJoin.scala   |  71 +++
 .../flink-streaming-scala/pom.xml               | 217 ++++++++
 .../scala/ScalaStreamingAggregator.java         | 111 ++++
 .../api/scala/ConnectedDataStream.scala         | 381 +++++++++++++
 .../flink/streaming/api/scala/DataStream.scala  | 558 +++++++++++++++++++
 .../streaming/api/scala/SplitDataStream.scala   |  50 ++
 .../api/scala/StreamCrossOperator.scala         | 111 ++++
 .../api/scala/StreamExecutionEnvironment.scala  | 275 +++++++++
 .../api/scala/StreamJoinOperator.scala          | 204 +++++++
 .../api/scala/StreamingConversions.scala        |  40 ++
 .../api/scala/WindowedDataStream.scala          | 209 +++++++
 .../streaming/api/scala/windowing/Delta.scala   |  47 ++
 .../streaming/api/scala/windowing/Time.scala    |  56 ++
 flink-addons/flink-streaming/pom.xml            |   1 +
 .../streaming/windowing/TopSpeedWindowing.scala |  96 ----
 .../scala/streaming/windowing/WindowJoin.scala  |  71 ---
 flink-scala/pom.xml                             |   6 -
 .../streaming/ScalaStreamingAggregator.java     | 111 ----
 .../scala/streaming/ConnectedDataStream.scala   | 380 -------------
 .../flink/api/scala/streaming/DataStream.scala  | 558 -------------------
 .../api/scala/streaming/SplitDataStream.scala   |  50 --
 .../scala/streaming/StreamCrossOperator.scala   | 112 ----
 .../streaming/StreamExecutionEnvironment.scala  | 278 ---------
 .../scala/streaming/StreamJoinOperator.scala    | 202 -------
 .../scala/streaming/StreamingConversions.scala  |  40 --
 .../scala/streaming/WindowedDataStream.scala    | 214 -------
 .../api/scala/streaming/windowing/Delta.scala   |  47 --
 .../api/scala/streaming/windowing/Time.scala    |  55 --
 30 files changed, 2565 insertions(+), 2221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index 64be993..3661726 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -43,6 +43,12 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-scala</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-java-examples</artifactId>
 			<version>${project.version}</version>
 		</dependency>
@@ -56,7 +62,6 @@ under the License.
 
 	<build>
 		<plugins>
-			
 			<!-- get default data from flink-java-examples package -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
@@ -346,6 +351,136 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+
+<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+ 
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+					<compilerPlugins>
+					   <compilerPlugin>
+						   <groupId>org.scalamacros</groupId>
+						   <artifactId>paradise_${scala.version}</artifactId>
+						   <version>${scala.macros.version}</version>
+					   </compilerPlugin>
+				   </compilerPlugins>
+				</configuration>
+			</plugin>
+			
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../../../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+
 		</plugins>
 		
 		<pluginManagement>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
new file mode 100644
index 0000000..dc01f02
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+
+import java.util.concurrent.TimeUnit._
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector
+import scala.math.{max, min}
+
+import scala.util.Random
+
+import org.apache.flink.streaming.api.scala.windowing.Time
+import org.apache.flink.streaming.api.scala.windowing.Delta
+
+/**
+ * An example of grouped stream windowing where different eviction and 
+ * trigger policies can be used. A source fetches events from cars 
+ * every 1 sec containing their id, their current speed (kmh),
+ * overall elapsed distance (m) and a timestamp. The streaming
+ * example triggers the top speed of each car every x meters elapsed 
+ * for the last y seconds.
+ */
+object TopSpeedWindowing {
+
+  case class CarSpeed(carId: Int, speed: Int, distance: Double, time: Long)
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val cars = env.addSource(carSource _).groupBy("carId")
+      .window(Time.of(evictionSec, SECONDS))
+      .every(Delta.of[CarSpeed](triggerMeters, 
+          (oldSp,newSp) => newSp.distance-oldSp.distance, CarSpeed(0,0,0,0)))
+      .reduce((x, y) => if (x.speed > y.speed) x else y)
+
+    cars print
+
+    env.execute("TopSpeedWindowing")
+
+  }
+
+  def carSource(out: Collector[CarSpeed]) = {
+
+    val speeds = new Array[Int](numOfCars)
+    val distances = new Array[Double](numOfCars)
+
+    while (true) {
+      Thread sleep 1000
+      for (i <- 0 until speeds.length) {
+        speeds(i) = if (Random.nextBoolean) min(100, speeds(i) + 5) else max(0, speeds(i) - 5)
+        distances(i) += speeds(i) / 3.6d
+        out.collect(new CarSpeed(i, speeds(i), distances(i), System.currentTimeMillis))
+      }
+    }
+  }
+
+  def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      if (args.length == 3) {
+        numOfCars = args(0).toInt
+        evictionSec = args(1).toInt
+        triggerMeters = args(2).toDouble
+      }
+      else {
+        System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>")
+        false
+      }
+    }
+    true
+  }
+
+  var numOfCars = 2
+  var evictionSec = 10
+  var triggerMeters = 50d
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
new file mode 100644
index 0000000..a19e4b4
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.util.Collector
+import scala.util.Random
+
+object WindowJoin {
+
+  case class Name(id: Long, name: String)
+  case class Age(id: Long, age: Int)
+  case class Person(name: String, age: Long)
+
+  def main(args: Array[String]) {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    //Create streams for names and ages by mapping the inputs to the corresponding objects
+    val names = env.addSource(nameStream _).map(x => Name(x._1, x._2))
+    val ages = env.addSource(ageStream _).map(x => Age(x._1, x._2))
+
+    //Join the two input streams by id on the last second and create new Person objects
+    //containing both name and age
+    val joined =
+      names.join(ages).onWindow(1000)
+                      .where("id").equalTo("id") { (n, a) => Person(n.name, a.age) }
+
+    joined print
+
+    env.execute("WindowJoin")
+  }
+
+  //Stream source for generating (id, name) pairs
+  def nameStream(out: Collector[(Long, String)]) = {
+    val names = Array("tom", "jerry", "alice", "bob", "john", "grace")
+
+    for (i <- 1 to 10000) {
+      if (i % 100 == 0) Thread.sleep(1000) else {
+        out.collect((i, names(Random.nextInt(names.length))))
+      }
+    }
+  }
+
+  //Stream source for generating (id, age) pairs
+  def ageStream(out: Collector[(Long, Int)]) = {
+    for (i <- 1 to 10000) {
+      if (i % 100 == 0) Thread.sleep(1000) else {
+        out.collect((i, Random.nextInt(90)))
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/pom.xml b/flink-addons/flink-streaming/flink-streaming-scala/pom.xml
new file mode 100644
index 0000000..c06fba7
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/pom.xml
@@ -0,0 +1,217 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-parent</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-streaming-scala</artifactId>
+	<name>flink-streaming-scala</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-scala</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-reflect</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-library</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-compiler</artifactId>
+		</dependency>
+
+		<dependency>
+		   <groupId>org.scalamacros</groupId>
+		   <artifactId>quasiquotes_${scala.binary.version}</artifactId>
+		   <version>${scala.macros.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.ow2.asm</groupId>
+			<artifactId>asm</artifactId>
+		</dependency>
+		
+		<!--  guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading -->
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+ 
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+					<compilerPlugins>
+					   <compilerPlugin>
+						   <groupId>org.scalamacros</groupId>
+						   <artifactId>paradise_${scala.version}</artifactId>
+						   <version>${scala.macros.version}</version>
+					   </compilerPlugin>
+				   </compilerPlugins>
+				</configuration>
+			</plugin>
+			
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../../../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java b/flink-addons/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java
new file mode 100644
index 0000000..77d102d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.streaming.scala;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.function.aggregation.SumFunction;
+
+import scala.Product;
+
+public class ScalaStreamingAggregator<IN extends Product> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	TupleSerializerBase<IN> serializer;
+	Object[] fields;
+	int length;
+	int position;
+
+	public ScalaStreamingAggregator(TypeSerializer<IN> serializer, int pos) {
+		this.serializer = (TupleSerializerBase<IN>) serializer;
+		this.length = this.serializer.getArity();
+		this.fields = new Object[this.length];
+		this.position = pos;
+	}
+
+	public class Sum extends AggregationFunction<IN> {
+		private static final long serialVersionUID = 1L;
+		SumFunction sumFunction;
+
+		public Sum(SumFunction func) {
+			super(ScalaStreamingAggregator.this.position);
+			this.sumFunction = func;
+		}
+
+		@Override
+		public IN reduce(IN value1, IN value2) throws Exception {
+			for (int i = 0; i < length; i++) {
+				fields[i] = value2.productElement(i);
+			}
+
+			fields[position] = sumFunction.add(fields[position], value1.productElement(position));
+
+			return serializer.createInstance(fields);
+		}
+	}
+
+	public class ProductComparableAggregator extends ComparableAggregator<IN> {
+
+		private static final long serialVersionUID = 1L;
+
+		public ProductComparableAggregator(AggregationFunction.AggregationType aggregationType,
+				boolean first) {
+			super(ScalaStreamingAggregator.this.position, aggregationType, first);
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public IN reduce(IN value1, IN value2) throws Exception {
+			Object v1 = value1.productElement(position);
+			Object v2 = value2.productElement(position);
+
+			int c = comparator.isExtremal((Comparable<Object>) v1, v2);
+
+			if (byAggregate) {
+				if (c == 1) {
+					return value1;
+				}
+				if (first) {
+					if (c == 0) {
+						return value1;
+					}
+				}
+
+				return value2;
+			} else {
+				for (int i = 0; i < length; i++) {
+					fields[i] = value2.productElement(i);
+				}
+
+				if (c == 1) {
+					fields[position] = v1;
+				}
+
+				return serializer.createInstance(fields);
+			}
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
new file mode 100644
index 0000000..320bfa0
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import java.util
+
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.reflect.ClassTag
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream}
+import org.apache.flink.streaming.api.function.co.{ CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction }
+import org.apache.flink.streaming.api.invokable.operator.co.{ CoFlatMapInvokable, CoMapInvokable, CoReduceInvokable }
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
+import org.apache.flink.streaming.api.scala.StreamingConversions._
+import org.apache.flink.util.Collector
+
+class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
+
+  /**
+   * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
+   * the output to a common type. The transformation calls a
+   * @param fun1 for each element of the first input and
+   * @param fun2 for each element of the second input. Each
+   * CoMapFunction call returns exactly one element.
+   *
+   * The CoMapFunction used to jointly transform the two input
+   * DataStreams
+   * @return The transformed { @link DataStream}
+   */
+  def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): 
+  DataStream[R] = {
+    if (fun1 == null || fun2 == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+    val comapper = new CoMapFunction[IN1, IN2, R] {
+      def map1(in1: IN1): R = clean(fun1)(in1)
+      def map2(in2: IN2): R = clean(fun2)(in2)
+    }
+
+    new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]],
+      new CoMapInvokable[IN1, IN2, R](comapper)))
+  }
+
+  /**
+   * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
+   * the output to a common type. The transformation calls a
+   * {@link CoMapFunction#map1} for each element of the first input and
+   * {@link CoMapFunction#map2} for each element of the second input. Each
+   * CoMapFunction call returns exactly one element. The user can also extend
+   * {@link RichCoMapFunction} to gain access to other features provided by
+   * the {@link RichFuntion} interface.
+   *
+   * @param coMapper
+   * The CoMapFunction used to jointly transform the two input
+   * DataStreams
+   * @return The transformed { @link DataStream}
+   */
+  def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): 
+  DataStream[R] = {
+    if (coMapper == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+
+    new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]],
+      new CoMapInvokable[IN1, IN2, R](coMapper)))
+  }
+
+  /**
+   * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
+   * maps the output to a common type. The transformation calls a
+   * {@link CoFlatMapFunction#flatMap1} for each element of the first input
+   * and {@link CoFlatMapFunction#flatMap2} for each element of the second
+   * input. Each CoFlatMapFunction call returns any number of elements
+   * including none. The user can also extend {@link RichFlatMapFunction} to
+   * gain access to other features provided by the {@link RichFuntion}
+   * interface.
+   *
+   * @param coFlatMapper
+   * The CoFlatMapFunction used to jointly transform the two input
+   * DataStreams
+   * @return The transformed { @link DataStream}
+   */
+  def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): 
+  DataStream[R] = {
+    if (coFlatMapper == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    new DataStream[R](javaStream.addCoFunction("flatMap", implicitly[TypeInformation[R]],
+      new CoFlatMapInvokable[IN1, IN2, R](coFlatMapper)))
+  }
+
+  /**
+   * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
+   * maps the output to a common type. The transformation calls a
+   * @param fun1 for each element of the first input
+   * and @param fun2 for each element of the second
+   * input. Each CoFlatMapFunction call returns any number of elements
+   * including none.
+   *
+   * @return The transformed { @link DataStream}
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, 
+      fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
+    if (fun1 == null || fun2 == null) {
+      throw new NullPointerException("FlatMap functions must not be null.")
+    }
+    val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
+      def flatMap1(value: IN1, out: Collector[R]): Unit = clean(fun1)(value, out)
+      def flatMap2(value: IN2, out: Collector[R]): Unit = clean(fun2)(value, out)
+    }
+    flatMap(flatMapper)
+  }
+
+  /**
+   * GroupBy operation for connected data stream. Groups the elements of
+   * input1 and input2 according to keyPosition1 and keyPosition2. Used for
+   * applying function on grouped data streams for example
+   * {@link ConnectedDataStream#reduce}
+   *
+   * @param keyPosition1
+   * The field used to compute the hashcode of the elements in the
+   * first input stream.
+   * @param keyPosition2
+   * The field used to compute the hashcode of the elements in the
+   * second input stream.
+   * @return @return The transformed { @link ConnectedDataStream}
+   */
+  def groupBy(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = {
+    javaStream.groupBy(keyPosition1, keyPosition2)
+  }
+
+  /**
+   * GroupBy operation for connected data stream. Groups the elements of
+   * input1 and input2 according to keyPositions1 and keyPositions2. Used for
+   * applying function on grouped data streams for example
+   * {@link ConnectedDataStream#reduce}
+   *
+   * @param keyPositions1
+   * The fields used to group the first input stream.
+   * @param keyPositions2
+   * The fields used to group the second input stream.
+   * @return @return The transformed { @link ConnectedDataStream}
+   */
+  def groupBy(keyPositions1: Array[Int], keyPositions2: Array[Int]): 
+  ConnectedDataStream[IN1, IN2] = {
+    javaStream.groupBy(keyPositions1, keyPositions2)
+  }
+
+  /**
+   * GroupBy operation for connected data stream using key expressions. Groups
+   * the elements of input1 and input2 according to field1 and field2. A field
+   * expression is either the name of a public field or a getter method with
+   * parentheses of the {@link DataStream}S underlying type. A dot can be used
+   * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+   *
+   * @param field1
+   * The grouping expression for the first input
+   * @param field2
+   * The grouping expression for the second input
+   * @return The grouped { @link ConnectedDataStream}
+   */
+  def groupBy(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = {
+    javaStream.groupBy(field1, field2)
+  }
+
+  /**
+   * GroupBy operation for connected data stream using key expressions. Groups
+   * the elements of input1 and input2 according to fields1 and fields2. A
+   * field expression is either the name of a public field or a getter method
+   * with parentheses of the {@link DataStream}S underlying type. A dot can be
+   * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+   * .
+   *
+   * @param fields1
+   * The grouping expressions for the first input
+   * @param fields2
+   * The grouping expressions for the second input
+   * @return The grouped { @link ConnectedDataStream}
+   */
+  def groupBy(fields1: Array[String], fields2: Array[String]): 
+  ConnectedDataStream[IN1, IN2] = {
+    javaStream.groupBy(fields1, fields2)
+  }
+
+  /**
+   * GroupBy operation for connected data stream. Groups the elements of
+   * input1 and input2 using fun1 and fun2. Used for applying
+   * function on grouped data streams for example
+   * {@link ConnectedDataStream#reduce}
+   *
+   * @param fun1
+   * The function used for grouping the first input
+   * @param fun2
+   * The function used for grouping the second input
+   * @return @return The transformed { @link ConnectedDataStream}
+   */
+  def groupBy[K: TypeInformation](fun1: IN1 => _, fun2: IN2 => _):
+  ConnectedDataStream[IN1, IN2] = {
+
+    val keyExtractor1 = new KeySelector[IN1, Any] {
+      def getKey(in: IN1) = clean(fun1)(in)
+    }
+    val keyExtractor2 = new KeySelector[IN2, Any] {
+      def getKey(in: IN2) = clean(fun2)(in)
+    }
+
+    javaStream.groupBy(keyExtractor1, keyExtractor2)
+  }
+
+  /**
+   * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
+   * the outputs to a common type. If the {@link ConnectedDataStream} is
+   * batched or windowed then the reduce transformation is applied on every
+   * sliding batch/window of the data stream. If the connected data stream is
+   * grouped then the reducer is applied on every group of elements sharing
+   * the same key. This type of reduce is much faster than reduceGroup since
+   * the reduce function can be applied incrementally.
+   *
+   * @param coReducer
+   * The { @link CoReduceFunction} that will be called for every
+   *             element of the inputs.
+   * @return The transformed { @link DataStream}.
+   */
+  def reduce[R: TypeInformation: ClassTag](coReducer: CoReduceFunction[IN1, IN2, R]): 
+  DataStream[R] = {
+    if (coReducer == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+
+    new DataStream[R](javaStream.addCoFunction("coReduce", implicitly[TypeInformation[R]],
+      new CoReduceInvokable[IN1, IN2, R](coReducer)))
+  }
+
+  /**
+   * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
+   * the outputs to a common type. If the {@link ConnectedDataStream} is
+   * batched or windowed then the reduce transformation is applied on every
+   * sliding batch/window of the data stream. If the connected data stream is
+   * grouped then the reducer is applied on every group of elements sharing
+   * the same key. This type of reduce is much faster than reduceGroup since
+   * the reduce function can be applied incrementally.
+   *
+   * @param reducer1
+   * @param reducer2
+   * @param mapper1
+   * @param mapper2
+   *
+   * @return The transformed { @link DataStream}.
+   */
+  def reduce[R: TypeInformation: ClassTag](reducer1: (IN1, IN1) => IN1, 
+      reducer2: (IN2, IN2) => IN2,mapper1: IN1 => R, mapper2: IN2 => R): DataStream[R] = {
+    if (mapper1 == null || mapper2 == null) {
+      throw new NullPointerException("Map functions must not be null.")
+    }
+    if (reducer1 == null || reducer2 == null) {
+      throw new NullPointerException("Reduce functions must not be null.")
+    }
+
+    val reducer = new CoReduceFunction[IN1, IN2, R] {
+      def reduce1(value1: IN1, value2: IN1): IN1 = clean(reducer1)(value1, value2)
+      def map2(value: IN2): R = clean(mapper2)(value)
+      def reduce2(value1: IN2, value2: IN2): IN2 = clean(reducer2)(value1, value2)
+      def map1(value: IN1): R = clean(mapper1)(value)
+    }
+    reduce(reducer)
+  }
+
+  /**
+   * Applies a CoWindow transformation on the connected DataStreams. The
+   * transformation calls the {@link CoWindowFunction#coWindow} method for for
+   * time aligned windows of the two data streams. System time is used as
+   * default to compute windows.
+   *
+   * @param coWindowFunction
+   * The { @link CoWindowFunction} that will be applied for the time
+   *             windows.
+   * @param windowSize
+   * Size of the windows that will be aligned for both streams in
+   * milliseconds.
+   * @param slideInterval
+   * After every function call the windows will be slid by this
+   * interval.
+   *
+   * @return The transformed { @link DataStream}.
+   */
+  def windowReduce[R: TypeInformation: ClassTag](coWindowFunction: 
+      CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long) = {
+    if (coWindowFunction == null) {
+      throw new NullPointerException("CoWindow function must no be null")
+    }
+
+    javaStream.windowReduce(coWindowFunction, windowSize, slideInterval)
+  }
+
+  /**
+   * Applies a CoWindow transformation on the connected DataStreams. The
+   * transformation calls the {@link CoWindowFunction#coWindow} method for for
+   * time aligned windows of the two data streams. System time is used as
+   * default to compute windows.
+   *
+   * @param coWindower
+   * The coWindowing function to be applied for the time windows.
+   * @param windowSize
+   * Size of the windows that will be aligned for both streams in
+   * milliseconds.
+   * @param slideInterval
+   * After every function call the windows will be slid by this
+   * interval.
+   *
+   * @return The transformed { @link DataStream}.
+   */
+  def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], Seq[IN2], 
+      Collector[R]) => Unit, windowSize: Long, slideInterval: Long) = {
+    if (coWindower == null) {
+      throw new NullPointerException("CoWindow function must no be null")
+    }
+
+    val coWindowFun = new CoWindowFunction[IN1, IN2, R] {
+      def coWindow(first: util.List[IN1], second: util.List[IN2], 
+          out: Collector[R]): Unit = clean(coWindower)(first, second, out)
+    }
+
+    javaStream.windowReduce(coWindowFun, windowSize, slideInterval)
+  }
+
+  /**
+   * Returns the first {@link DataStream}.
+   *
+   * @return The first DataStream.
+   */
+  def getFirst(): DataStream[IN1] = {
+    javaStream.getFirst
+  }
+
+  /**
+   * Returns the second {@link DataStream}.
+   *
+   * @return The second DataStream.
+   */
+  def getSecond(): DataStream[IN2] = {
+    javaStream.getSecond
+  }
+
+  /**
+   * Gets the type of the first input
+   *
+   * @return The type of the first input
+   */
+  def getInputType1(): TypeInformation[IN1] = {
+    javaStream.getInputType1
+  }
+
+  /**
+   * Gets the type of the second input
+   *
+   * @return The type of the second input
+   */
+  def getInputType2(): TypeInformation[IN2] = {
+    javaStream.getInputType2
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
new file mode 100644
index 0000000..270b80c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
+  SingleOutputStreamOperator, GroupedDataStream}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import scala.reflect.ClassTag
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.streaming.api.invokable.operator.MapInvokable
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.streaming.api.invokable.StreamInvokable
+import org.apache.flink.streaming.api.invokable.operator.{ GroupedReduceInvokable, StreamReduceInvokable }
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.streaming.api.function.sink.SinkFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
+import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
+import org.apache.flink.streaming.api.windowing.policy.{ EvictionPolicy, TriggerPolicy }
+import org.apache.flink.streaming.api.collector.OutputSelector
+import scala.collection.JavaConversions._
+import java.util.HashMap
+import org.apache.flink.streaming.api.function.aggregation.SumFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.streaming.api.scala.StreamingConversions._
+import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
+
+class DataStream[T](javaStream: JavaStream[T]) {
+
+  /**
+   * Gets the underlying java DataStream object.
+   */
+  def getJavaStream: JavaStream[T] = javaStream
+
+  /**
+   * Sets the degree of parallelism of this operation. This must be greater than 1.
+   */
+  def setParallelism(dop: Int): DataStream[T] = {
+    javaStream match {
+      case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop)
+      case _ =>
+        throw new UnsupportedOperationException("Operator " + javaStream.toString +  " cannot " +
+          "have " +
+          "parallelism.")
+    }
+    this
+  }
+
+  /**
+   * Returns the degree of parallelism of this operation.
+   */
+  def getParallelism: Int = javaStream match {
+    case op: SingleOutputStreamOperator[_, _] => op.getParallelism
+    case _ =>
+      throw new UnsupportedOperationException("Operator " + javaStream.toString + " does not have" +
+        " "  +
+        "parallelism.")
+  }
+
+  /**
+   * Creates a new DataStream by merging DataStream outputs of
+   * the same type with each other. The DataStreams merged using this operator
+   * will be transformed simultaneously.
+   *
+   */
+  def merge(dataStreams: DataStream[T]*): DataStream[T] =
+    javaStream.merge(dataStreams.map(_.getJavaStream): _*)
+
+  /**
+   * Creates a new ConnectedDataStream by connecting
+   * DataStream outputs of different type with each other. The
+   * DataStreams connected using this operators can be used with CoFunctions.
+   *
+   */
+  def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] = 
+    javaStream.connect(dataStream.getJavaStream)
+
+  /**
+   * Groups the elements of a DataStream by the given key positions (for tuple/array types) to
+   * be used with grouped operators like grouped reduce or grouped aggregations
+   *
+   */
+  def groupBy(fields: Int*): DataStream[T] = javaStream.groupBy(fields: _*)
+
+  /**
+   * Groups the elements of a DataStream by the given field expressions to
+   * be used with grouped operators like grouped reduce or grouped aggregations
+   *
+   */
+  def groupBy(firstField: String, otherFields: String*): DataStream[T] = 
+   javaStream.groupBy(firstField +: otherFields.toArray: _*)   
+  
+  /**
+   * Groups the elements of a DataStream by the given K key to
+   * be used with grouped operators like grouped reduce or grouped aggregations
+   *
+   */
+  def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = {
+
+    val keyExtractor = new KeySelector[T, K] {
+      val cleanFun = clean(fun)
+      def getKey(in: T) = cleanFun(in)
+    }
+    javaStream.groupBy(keyExtractor)
+  }
+
+  /**
+   * Sets the partitioning of the DataStream so that the output is
+   * partitioned by the selected fields. This setting only effects the how the outputs will be
+   * distributed between the parallel instances of the next processing operator.
+   *
+   */
+  def partitionBy(fields: Int*): DataStream[T] =
+    javaStream.partitionBy(fields: _*)
+
+  /**
+   * Sets the partitioning of the DataStream so that the output is
+   * partitioned by the selected fields. This setting only effects the how the outputs will be
+   * distributed between the parallel instances of the next processing operator.
+   *
+   */
+  def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
+   javaStream.partitionBy(firstField +: otherFields.toArray: _*)
+
+  /**
+   * Sets the partitioning of the DataStream so that the output is
+   * partitioned by the given Key. This setting only effects the how the outputs will be
+   * distributed between the parallel instances of the next processing operator.
+   *
+   */
+  def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
+
+    val keyExtractor = new KeySelector[T, K] {
+      val cleanFun = clean(fun)
+      def getKey(in: T) = cleanFun(in)
+    }
+    javaStream.partitionBy(keyExtractor)
+  }
+
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are broadcasted to every parallel instance of the next component. This
+   * setting only effects the how the outputs will be distributed between the
+   * parallel instances of the next processing operator.
+   *
+   */
+  def broadcast: DataStream[T] = javaStream.broadcast()
+
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are shuffled to the next component. This setting only effects the how the
+   * outputs will be distributed between the parallel instances of the next
+   * processing operator.
+   *
+   */
+  def shuffle: DataStream[T] = javaStream.shuffle()
+
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are forwarded to the local subtask of the next component (whenever
+   * possible). This is the default partitioner setting. This setting only
+   * effects the how the outputs will be distributed between the parallel
+   * instances of the next processing operator.
+   *
+   */
+  def forward: DataStream[T] = javaStream.forward()
+
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are distributed evenly to the next component.This setting only effects
+   * the how the outputs will be distributed between the parallel instances of
+   * the next processing operator.
+   *
+   */
+  def distribute: DataStream[T] = javaStream.distribute()
+
+  /**
+   * Initiates an iterative part of the program that creates a loop by feeding
+   * back data streams. To create a streaming iteration the user needs to define
+   * a transformation that creates two DataStreams.The first one one is the output
+   * that will be fed back to the start of the iteration and the second is the output
+   * stream of the iterative part.
+   * <p>
+   * stepfunction: initialStream => (feedback, output)
+   * <p>
+   * A common pattern is to use output splitting to create feedback and output DataStream.
+   * Please refer to the .split(...) method of the DataStream
+   * <p>
+   * By default a DataStream with iteration will never terminate, but the user
+   * can use the maxWaitTime parameter to set a max waiting time for the iteration head.
+   * If no data received in the set time the stream terminates.
+   *
+   *
+   */
+  def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]),  maxWaitTimeMillis:
+    Long = 0): DataStream[T] = {
+    val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
+
+    val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
+    iterativeStream.closeWith(feedback.getJavaStream)
+    output
+  }
+
+  /**
+   * Applies an aggregation that that gives the current maximum of the data stream at
+   * the given position.
+   *
+   */
+  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
+
+  /**
+   * Applies an aggregation that that gives the current minimum of the data stream at
+   * the given position.
+   *
+   */
+  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
+
+  /**
+   * Applies an aggregation that sums the data stream at the given position.
+   *
+   */
+  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
+
+  /**
+   * Applies an aggregation that that gives the current minimum element of the data stream by
+   * the given position. When equality, the user can set to get the first or last element with
+   * the minimal value.
+   *
+   */
+  def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType
+    .MINBY, position, first)
+
+  /**
+   * Applies an aggregation that that gives the current maximum element of the data stream by
+   * the given position. When equality, the user can set to get the first or last element with
+   * the maximal value.
+   *
+   */
+  def maxBy(position: Int, first: Boolean = true): DataStream[T] =
+    aggregate(AggregationType.MAXBY, position, first)
+
+  private def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true):
+    DataStream[T] = {
+
+    val jStream = javaStream.asInstanceOf[JavaStream[Product]]
+    val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
+
+    val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position)
+
+    val reducer = aggregationType match {
+      case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).
+        getTypeClass()));
+      case _ => new agg.ProductComparableAggregator(aggregationType, first)
+    }
+
+    val invokable = jStream match {
+      case groupedStream: GroupedDataStream[_] => new GroupedReduceInvokable(reducer,
+        groupedStream.getKeySelector())
+      case _ => new StreamReduceInvokable(reducer)
+    }
+    new DataStream[Product](jStream.transform("aggregation", jStream.getType(),
+      invokable)).asInstanceOf[DataStream[T]]
+  }
+
+  /**
+   * Creates a new DataStream containing the current number (count) of
+   * received records.
+   *
+   */
+  def count: DataStream[Long] = new DataStream[java.lang.Long](
+    javaStream.count()).asInstanceOf[DataStream[Long]]
+
+  /**
+   * Creates a new DataStream by applying the given function to every element of this DataStream.
+   */
+  def map[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+    val mapper = new MapFunction[T, R] {
+      val cleanFun = clean(fun)
+      def map(in: T): R = cleanFun(in)
+    }
+
+    javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper))
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element of this DataStream.
+   */
+  def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataStream[R] = {
+    if (mapper == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+
+    javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper))
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element and flattening
+   * the results.
+   */
+  def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataStream[R] = {
+    if (flatMapper == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+   javaStream.transform("flatMap", implicitly[TypeInformation[R]], 
+       new FlatMapInvokable[T, R](flatMapper))
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element and flattening
+   * the results.
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    val flatMapper = new FlatMapFunction[T, R] {
+      val cleanFun = clean(fun)
+      def flatMap(in: T, out: Collector[R]) { cleanFun(in, out) }
+    }
+    flatMap(flatMapper)
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element and flattening
+   * the results.
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    val flatMapper = new FlatMapFunction[T, R] {
+      val cleanFun = clean(fun)
+      def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect }
+    }
+    flatMap(flatMapper)
+  }
+
+  /**
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream
+   * using an associative reduce function.
+   */
+  def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
+    if (reducer == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    javaStream match {
+      case ds: GroupedDataStream[_] => javaStream.transform("reduce",
+        javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector()))
+      case _ => javaStream.transform("reduce", javaStream.getType(),
+        new StreamReduceInvokable[T](reducer))
+    }
+  }
+
+  /**
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream
+   * using an associative reduce function.
+   */
+  def reduce(fun: (T, T) => T): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    val reducer = new ReduceFunction[T] {
+      val cleanFun = clean(fun)
+      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+    }
+    reduce(reducer)
+  }
+
+  /**
+   * Creates a new DataStream that contains only the elements satisfying the given filter predicate.
+   */
+  def filter(filter: FilterFunction[T]): DataStream[T] = {
+    if (filter == null) {
+      throw new NullPointerException("Filter function must not be null.")
+    }
+    javaStream.filter(filter)
+  }
+
+  /**
+   * Creates a new DataStream that contains only the elements satisfying the given filter predicate.
+   */
+  def filter(fun: T => Boolean): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Filter function must not be null.")
+    }
+    val filter = new FilterFunction[T] {
+      val cleanFun = clean(fun)
+      def filter(in: T) = cleanFun(in)
+    }
+    this.filter(filter)
+  }
+
+  /**
+   * Create a WindowedDataStream that can be used to apply
+   * transformation like .reduce(...) or aggregations on
+   * preset chunks(windows) of the data stream. To define the windows one or
+   * more WindowingHelper-s such as Time, Count and
+   * Delta can be used.</br></br> When applied to a grouped data
+   * stream, the windows (evictions) and slide sizes (triggers) will be
+   * computed on a per group basis. </br></br> For more advanced control over
+   * the trigger and eviction policies please use to
+   * window(List(triggers), List(evicters))
+   */
+  def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
+    javaStream.window(windowingHelper: _*)
+
+  /**
+   * Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s.
+   * Windowing can be used to apply transformation like .reduce(...) or aggregations on
+   * preset chunks(windows) of the data stream.</br></br>For most common
+   * use-cases please refer to window(WindowingHelper[_]*)
+   *
+   */
+  def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]):
+    WindowedDataStream[T] = javaStream.window(triggers, evicters)
+
+  /**
+   *
+   * Operator used for directing tuples to specific named outputs using an
+   * OutputSelector. Calling this method on an operator creates a new
+   * SplitDataStream.
+   */
+  def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream match {
+    case op: SingleOutputStreamOperator[_, _] => op.split(selector)
+    case _ =>
+      throw new UnsupportedOperationException("Operator " + javaStream.toString + " can not be " +
+        "split.")
+  }
+
+  /**
+   * Creates a new SplitDataStream that contains only the elements satisfying the
+   *  given output selector predicate.
+   */
+  def split(fun: T => String): SplitDataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("OutputSelector must not be null.")
+    }
+    val selector = new OutputSelector[T] {
+      val cleanFun = clean(fun)
+      def select(in: T): java.lang.Iterable[String] = {
+        List(cleanFun(in))
+      }
+    }
+    split(selector)
+  }
+
+  /**
+   * Initiates a temporal Join transformation that joins the elements of two
+   * data streams on key equality over a specified time window.
+   *
+   * This method returns a StreamJoinOperator on which the
+   * .onWindow(..) should be called to define the
+   * window, and then the .where(..) and .equalTo(..) methods can be used to defin
+   * the join keys.</p> The user can also use the apply method of the returned JoinedStream
+   * to use custom join function.
+   *
+   */
+  def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] =
+    new StreamJoinOperator[T, R](javaStream, stream.getJavaStream)
+
+  /**
+   * Initiates a temporal cross transformation that builds all pair
+   * combinations of elements of both DataStreams, i.e., it builds a Cartesian
+   * product.
+   *
+   * This method returns a StreamJoinOperator on which the
+   * .onWindow(..) should be called to define the
+   * window, and then the .where(..) and .equalTo(..) methods can be used to defin
+   * the join keys.</p> The user can also use the apply method of the returned JoinedStream
+   * to use custom join function.
+   *
+   */
+  def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] =
+    new StreamCrossOperator[T, R](javaStream, stream.getJavaStream)
+
+  /**
+   * Writes a DataStream to the standard output stream (stdout). For each
+   * element of the DataStream the result of .toString is
+   * written.
+   *
+   */
+  def print(): DataStream[T] = javaStream.print()
+
+  /**
+   * Writes a DataStream to the file specified by path in text format. The
+   * writing is performed periodically, in every millis milliseconds. For
+   * every element of the DataStream the result of .toString
+   * is written.
+   *
+   */
+  def writeAsText(path: String, millis: Long = 0): DataStream[T] =
+    javaStream.writeAsText(path, millis)
+
+  /**
+   * Writes a DataStream to the file specified by path in text format. The
+   * writing is performed periodically, in every millis milliseconds. For
+   * every element of the DataStream the result of .toString
+   * is written.
+   *
+   */
+  def writeAsCsv(path: String, millis: Long = 0): DataStream[T] =
+    javaStream.writeAsCsv(path, millis)
+
+  /**
+   * Adds the given sink to this DataStream. Only streams with sinks added
+   * will be executed once the StreamExecutionEnvironment.execute(...)
+   * method is called.
+   *
+   */
+  def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] =
+    javaStream.addSink(sinkFuntion)
+
+  /**
+   * Adds the given sink to this DataStream. Only streams with sinks added
+   * will be executed once the StreamExecutionEnvironment.execute(...)
+   * method is called.
+   *
+   */
+  def addSink(fun: T => Unit): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Sink function must not be null.")
+    }
+    val sinkFunction = new SinkFunction[T] {
+      val cleanFun = clean(fun)
+      def invoke(in: T) = cleanFun(in)
+    }
+    this.addSink(sinkFunction)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
new file mode 100644
index 0000000..7349db6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream }
+import org.apache.flink.streaming.api.scala.StreamingConversions._
+
+/**
+ * The SplitDataStream represents an operator that has been split using an
+ * {@link OutputSelector}. Named outputs can be selected using the
+ * {@link #select} function.
+ *
+ * @param <OUT>
+ *            The type of the output.
+ */
+class SplitDataStream[T](javaStream: SplitJavaStream[T]) {
+
+  /**
+   * Gets the underlying java DataStream object.
+   */
+  private[flink] def getJavaStream: SplitJavaStream[T] = javaStream
+
+  /**
+   *  Sets the output names for which the next operator will receive values.
+   */
+  def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*)
+
+  /**
+   * Selects all output names from a split data stream.
+   */
+  def selectAll(): DataStream[T] = javaStream.selectAll()
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
new file mode 100644
index 0000000..e26db62
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import scala.reflect.ClassTag
+
+import org.apache.commons.lang.Validate
+import org.apache.flink.api.common.functions.CrossFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.CaseClassSerializer
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
+import org.apache.flink.streaming.api.datastream.TemporalOperator
+import org.apache.flink.streaming.api.function.co.CrossWindowFunction
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
+import org.apache.flink.streaming.api.scala.StreamingConversions._
+
+class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
+  TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
+
+  override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, I2] = {
+
+    val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this,
+      (l: I1, r: I2) => (l, r))
+
+    val returnType = new CaseClassTypeInfo[(I1, I2)](
+
+      classOf[(I1, I2)], Seq(input1.getType, input2.getType), Array("_1", "_2")) {
+
+      override def createSerializer: TypeSerializer[(I1, I2)] = {
+        val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
+        for (i <- 0 until getArity) {
+          fieldSerializers(i) = types(i).createSerializer
+        }
+
+        new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
+          override def createInstance(fields: Array[AnyRef]) = {
+            (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
+          }
+        }
+      }
+    }
+
+    val javaStream = input1.connect(input2).addGeneralWindowCombine(
+      crossWindowFunction,
+      returnType, windowSize,
+      slideInterval, timeStamp1, timeStamp2);
+
+    new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream)
+  }
+}
+object StreamCrossOperator {
+
+  private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2],
+                                           javaStream: JavaStream[(I1, I2)]) extends
+    DataStream[(I1, I2)](javaStream) {
+
+    /**
+     * Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf
+     * call will be emitted.
+     *
+     */
+    def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
+
+      val invokable = new CoWindowInvokable[I1, I2, R](
+        clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
+        op.timeStamp2)
+
+      javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
+        invokable)
+
+      javaStream.setType(implicitly[TypeInformation[R]])
+    }
+  }
+
+  private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2],
+                                                       crossFunction: (I1, I2) => R):
+  CrossWindowFunction[I1, I2, R] = {
+    Validate.notNull(crossFunction, "Join function must not be null.")
+
+    val crossFun = new CrossFunction[I1, I2, R] {
+      val cleanFun = op.input1.clean(crossFunction)
+
+      override def cross(first: I1, second: I2): R = {
+        cleanFun(first, second)
+      }
+    }
+
+    new CrossWindowFunction[I1, I2, R](crossFun)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
new file mode 100644
index 0000000..361abd6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import scala.reflect.ClassTag
+
+import org.apache.commons.lang.Validate
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
+import org.apache.flink.streaming.api.function.source.{ FromElementsFunction, SourceFunction }
+import org.apache.flink.streaming.api.scala.StreamingConversions.javaToScalaStream
+import org.apache.flink.util.Collector
+
+class StreamExecutionEnvironment(javaEnv: JavaEnv) {
+
+  /**
+   * Sets the degree of parallelism (DOP) for operations executed through this environment.
+   * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
+   * x parallel instances. This value can be overridden by specific operations using
+   * [[DataStream.setParallelism]].
+   */
+  def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
+    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+  }
+
+  /**
+   * Returns the default degree of parallelism for this execution environment. Note that this
+   * value can be overridden by individual operations using [[DataStream.setParallelism]]
+   */
+  def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
+
+  /**
+   * Sets the maximum time frequency (milliseconds) for the flushing of the
+   * output buffers. By default the output buffers flush frequently to provide
+   * low latency and to aid smooth developer experience. Setting the parameter
+   * can result in three logical modes:
+   *
+   * <ul>
+   * <li>
+   * A positive integer triggers flushing periodically by that integer</li>
+   * <li>
+   * 0 triggers flushing after every record thus minimizing latency</li>
+   * <li>
+   * -1 triggers flushing only when the output buffer is full thus maximizing
+   * throughput</li>
+   * </ul>
+   *
+   */
+  def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
+    javaEnv.setBufferTimeout(timeoutMillis)
+    this
+  }
+
+  /**
+   * Gets the default buffer timeout set for this environment
+   */
+  def getBufferTimout: Long = javaEnv.getBufferTimeout()
+
+  /**
+   * Creates a DataStream that represents the Strings produced by reading the
+   * given file line wise. The file will be read with the system's default
+   * character set.
+   *
+   */
+  def readTextFile(filePath: String): DataStream[String] =
+    javaEnv.readTextFile(filePath)
+
+  /**
+   * Creates a DataStream that represents the Strings produced by reading the
+   * given file line wise multiple times(infinite). The file will be read with
+   * the system's default character set. This functionality can be used for
+   * testing a topology.
+   *
+   */
+  def readTextStream(StreamPath: String): DataStream[String] = 
+    javaEnv.readTextStream(StreamPath)
+
+  /**
+   * Creates a new DataStream that contains the strings received infinitely
+   * from socket. Received strings are decoded by the system's default
+   * character set.
+   *
+   */
+  def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String] =
+    javaEnv.socketTextStream(hostname, port, delimiter)
+
+  /**
+   * Creates a new DataStream that contains the strings received infinitely
+   * from socket. Received strings are decoded by the system's default
+   * character set, uses '\n' as delimiter.
+   *
+   */
+  def socketTextStream(hostname: String, port: Int): DataStream[String] =
+    javaEnv.socketTextStream(hostname, port)
+
+  /**
+   * Creates a new DataStream that contains a sequence of numbers.
+   *
+   */
+  def generateSequence(from: Long, to: Long): DataStream[Long] = {
+    new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
+      asInstanceOf[DataStream[Long]]
+  }
+
+  /**
+   * Creates a DataStream that contains the given elements. The elements must all be of the
+   * same type and must be serializable.
+   *
+   * * Note that this operation will result in a non-parallel data source, i.e. a data source with
+   * a degree of parallelism of one.
+   */
+  def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
+    val typeInfo = implicitly[TypeInformation[T]]
+    fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
+  }
+
+  /**
+   * Creates a DataStream from the given non-empty [[Seq]]. The elements need to be serializable
+   * because the framework may move the elements into the cluster if needed.
+   *
+   * Note that this operation will result in a non-parallel data source, i.e. a data source with
+   * a degree of parallelism of one.
+   */
+  def fromCollection[T: ClassTag: TypeInformation](
+    data: Seq[T]): DataStream[T] = {
+    Validate.notNull(data, "Data must not be null.")
+    val typeInfo = implicitly[TypeInformation[T]]
+
+    val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
+        .asJavaCollection(data))
+        
+    javaEnv.addSource(sourceFunction, typeInfo)
+  }
+
+  /**
+   * Create a DataStream using a user defined source function for arbitrary
+   * source functionality.
+   *
+   */
+  def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
+    Validate.notNull(function, "Function must not be null.")
+    val cleanFun = StreamExecutionEnvironment.clean(function)
+    val typeInfo = implicitly[TypeInformation[T]]
+    javaEnv.addSource(cleanFun, typeInfo)
+  }
+  
+   /**
+   * Create a DataStream using a user defined source function for arbitrary
+   * source functionality.
+   *
+   */
+  def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = {
+    Validate.notNull(function, "Function must not be null.")
+    val sourceFunction = new SourceFunction[T] {
+      val cleanFun = StreamExecutionEnvironment.clean(function)
+      override def invoke(out: Collector[T]) {
+        cleanFun(out)
+      }
+    }
+    addSource(sourceFunction)
+  }
+
+  /**
+   * Triggers the program execution. The environment will execute all parts of
+   * the program that have resulted in a "sink" operation. Sink operations are
+   * for example printing results or forwarding them to a message queue.
+   * <p>
+   * The program execution will be logged and displayed with a generated
+   * default name.
+   *
+   */
+  def execute() = javaEnv.execute()
+
+  /**
+   * Triggers the program execution. The environment will execute all parts of
+   * the program that have resulted in a "sink" operation. Sink operations are
+   * for example printing results or forwarding them to a message queue.
+   * <p>
+   * The program execution will be logged and displayed with the provided name
+   *
+   */
+  def execute(jobName: String) = javaEnv.execute(jobName)
+
+}
+
+object StreamExecutionEnvironment {
+  
+  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
+    ClosureCleaner.clean(f, checkSerializable)
+    f
+  }
+
+  /**
+   * Creates an execution environment that represents the context in which the program is
+   * currently executed. If the program is invoked standalone, this method returns a local
+   * execution environment. If the program is invoked from within the command line client
+   * to be submitted to a cluster, this method returns the execution environment of this cluster.
+   */
+  def getExecutionEnvironment: StreamExecutionEnvironment = {
+    new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment)
+  }
+
+  /**
+   * Creates a local execution environment. The local execution environment will run the program in
+   * a multi-threaded fashion in the same JVM as the environment was created in. The default degree
+   * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
+   */
+  def createLocalEnvironment(
+    degreeOfParallelism: Int =  Runtime.getRuntime.availableProcessors()):
+  StreamExecutionEnvironment = {
+    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism))
+  }
+
+  /**
+   * Creates a remote execution environment. The remote environment sends (parts of) the program to
+   * a cluster for execution. Note that all file paths used in the program must be accessible from
+   * the cluster. The execution will use the cluster's default degree of parallelism, unless the
+   * parallelism is set explicitly via [[StreamExecutionEnvironment.setDegreeOfParallelism()]].
+   *
+   * @param host The host name or address of the master (JobManager),
+   *             where the program should be executed.
+   * @param port The port of the master (JobManager), where the program should be executed.
+   * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
+   *                 program uses
+   *                 user-defined functions, user-defined input formats, or any libraries,
+   *                 those must be
+   *                 provided in the JAR files.
+   */
+  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*):
+  StreamExecutionEnvironment = {
+    new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*))
+  }
+
+  /**
+   * Creates a remote execution environment. The remote environment sends (parts of) the program
+   * to a cluster for execution. Note that all file paths used in the program must be accessible
+   * from the cluster. The execution will use the specified degree of parallelism.
+   *
+   * @param host The host name or address of the master (JobManager),
+   *             where the program should be executed.
+   * @param port The port of the master (JobManager), where the program should be executed.
+   * @param degreeOfParallelism The degree of parallelism to use during the execution.
+   * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
+   *                 program uses
+   *                 user-defined functions, user-defined input formats, or any libraries,
+   *                 those must be
+   *                 provided in the JAR files.
+   */
+  def createRemoteEnvironment(
+    host: String,
+    port: Int,
+    degreeOfParallelism: Int,
+    jarFiles: String*): StreamExecutionEnvironment = {
+    val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
+    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+    new StreamExecutionEnvironment(javaEnv)
+  }
+}


Mime
View raw message