flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [28/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:42:06 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
deleted file mode 100644
index d6a9ac0..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.deltafunction.EuclideanDistance;
-import org.apache.flink.streaming.api.windowing.extractor.FieldsFromTuple;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.streaming.api.windowing.helper.Delta;
-import org.apache.flink.util.Collector;
-
-/**
- * This example gives an impression about how to use delta policies. It also
- * shows how extractors can be used.
- */
-public class DeltaExtractExample {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		DataStream dstream = env
-				.addSource(new CountingSource())
-				.window(Delta.of(1.2, new EuclideanDistance(new FieldsFromTuple(0, 1)), new Tuple3(
-						0d, 0d, "foo"))).every(Count.of(2)).reduce(new ConcatStrings());
-
-		// emit result
-		if (fileOutput) {
-			dstream.writeAsText(outputPath, 1);
-		} else {
-			dstream.print();
-		}
-
-		// execute the program
-		env.execute("Delta Extract Example");
-
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	private static class CountingSource implements SourceFunction<Tuple3<Double, Double, String>> {
-		private static final long serialVersionUID = 1L;
-
-		private int counter = 0;
-
-		@Override
-		public void invoke(Collector<Tuple3<Double, Double, String>> collector) throws Exception {
-			while (true) {
-				if (counter > 9999) {
-					counter = 0;
-				}
-				collector.collect(new Tuple3<Double, Double, String>((double) counter,
-						(double) counter + 1, "V" + counter++));
-			}
-		}
-	}
-
-	private static final class ConcatStrings implements
-			ReduceFunction<Tuple3<Double, Double, String>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple3<Double, Double, String> reduce(Tuple3<Double, Double, String> value1,
-				Tuple3<Double, Double, String> value2) throws Exception {
-			return new Tuple3<Double, Double, String>(value1.f0, value2.f1, value1.f2 + "|"
-					+ value2.f2);
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 1) {
-				outputPath = args[0];
-			} else {
-				System.err.println("Usage: DeltaExtractExample <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing DeltaExtractExample with generated data.");
-			System.out.println("  Provide parameter to write to file.");
-			System.out.println("  Usage: DeltaExtractExample <result path>");
-		}
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
deleted file mode 100644
index 48783f2..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.util.Collector;
-
-/**
- * This example uses count based tumbling windowing with multiple eviction
- * policies at the same time.
- */
-public class MultiplePoliciesExample {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> stream = env.addSource(new BasicSource())
-				.groupBy(new KeySelector<String, String>(){
-					private static final long serialVersionUID = 1L;
-					@Override
-					public String getKey(String value) throws Exception {
-						return value;
-					}	
-				})
-				.window(Count.of(2))
-				.every(Count.of(3), Count.of(5))
-				.reduceGroup(new Concat());
-
-		// emit result
-		if (fileOutput) {
-			stream.writeAsText(outputPath, 1);
-		} else {
-			stream.print();
-		}
-
-		// execute the program
-		env.execute("Multiple Policies Example");
-	}
-
-	/**
-	 * This source function indefinitely provides String inputs for the
-	 * topology.
-	 */
-	public static final class BasicSource implements SourceFunction<String> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final static String STR_1 = new String("streaming");
-		private final static String STR_2 = new String("flink");
-
-		@Override
-		public void invoke(Collector<String> out) throws Exception {
-			// continuous emit
-			while (true) {
-				out.collect(STR_1);
-				out.collect(STR_2);
-			}
-		}
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	/**
-	 * This reduce function does a String concat.
-	 */
-	public static final class Concat implements GroupReduceFunction<String, String> {
-
-		/**
-		 * Auto generates version ID
-		 */
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void reduce(Iterable<String> values, Collector<String> out) throws Exception {
-			String output = "|";
-			for (String v : values) {
-				output = output + v + "|";
-			}
-			out.collect(output);
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 1) {
-				outputPath = args[0];
-			} else {
-				System.err.println("Usage: MultiplePoliciesExample <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing MultiplePoliciesExample with generated data.");
-			System.out.println("  Provide parameter to write to file.");
-			System.out.println("  Usage: MultiplePoliciesExample <result path>");
-		}
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
deleted file mode 100644
index cf03477..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.util.Collector;
-
-/**
- * This example uses count based sliding windows to illustrate different
- * possibilities for the realization of sliding windows. Take a look on the code
- * which is commented out to see different setups.
- */
-public class SlidingExample {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		/*
-		 * SIMPLE-EXAMPLE: Use this to always keep the newest 10 elements in the
-		 * buffer Resulting windows will have an overlap of 5 elements
-		 */
-
-		// DataStream<String> stream = env.addSource(new CountingSource())
-		// .window(Count.of(10))
-		// .every(Count.of(5))
-		// .reduce(new Concat());
-
-		/*
-		 * ADVANCED-EXAMPLE: Use this to have the last element of the last
-		 * window as first element of the next window while the window size is
-		 * always 5
-		 */
-
-		DataStream<String> stream = env.addSource(new CountingSource())
-				.window(Count.of(5)
-				.withDelete(4))
-				.every(Count.of(4)
-				.startingAt(-1))
-				.reduce(new Concat());
-
-		// emit result
-		if (fileOutput) {
-			stream.writeAsText(outputPath, 1);
-		} else {
-			stream.print();
-		}
-
-		// execute the program
-		env.execute("Sliding Example");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	private static final class CountingSource implements SourceFunction<String> {
-		private static final long serialVersionUID = 1L;
-
-		private int counter = 0;
-
-		@Override
-		public void invoke(Collector<String> collector) throws Exception {
-			// continuous emit
-			while (true) {
-				if (counter > 9999) {
-					counter = 0;
-				}
-				collector.collect("V" + counter++);
-			}
-		}
-	}
-
-	/**
-	 * This reduce function does a String concat.
-	 */
-	private static final class Concat implements ReduceFunction<String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String reduce(String value1, String value2) throws Exception {
-			return value1 + "|" + value2;
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-	
-	private static boolean fileOutput = false;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 1) {
-				outputPath = args[0];
-			} else {
-				System.err.println("Usage: SlidingExample <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing SlidingExample with generated data.");
-			System.out.println("  Provide parameter to write to file.");
-			System.out.println("  Usage: SlidingExample <result path>");
-		}
-		return true;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
deleted file mode 100644
index 622aa82..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.RichSourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
-import org.apache.flink.util.Collector;
-
-/**
- * This example shows the functionality of time based windows. It utilizes the
- * {@link ActiveTriggerPolicy} implementation in the
- * {@link ActiveTimeTriggerPolicy}.
- */
-public class TimeWindowingExample {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Integer> stream = env.addSource(new CountingSourceWithSleep())
-				.window(Count.of(100))
-				.every(Time.of(1000, TimeUnit.MILLISECONDS))
-				.groupBy(new MyKey())
-				.sum(0);
-
-		// emit result
-		if (fileOutput) {
-			stream.writeAsText(outputPath, 1);
-		} else {
-			stream.print();
-		}
-
-		// execute the program
-		env.execute("Time Windowing Example");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	/**
-	 * This data source emit one element every 0.001 sec. The output is an
-	 * Integer counting the output elements. As soon as the counter reaches
-	 * 10000 it is reset to 0. On each reset the source waits 5 sec. before it
-	 * restarts to produce elements.
-	 */
-	private static final class CountingSourceWithSleep extends RichSourceFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		private int counter = 0;
-		private transient Random rnd;
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			rnd = new Random();
-		}
-
-		@Override
-		public void invoke(Collector<Integer> collector) throws Exception {
-			// continuous emit
-			while (true) {
-				if (counter > 9999) {
-					System.out.println("Source pauses now!");
-					Thread.sleep(5000);
-					System.out.println("Source continouse with emitting now!");
-					counter = 0;
-				}
-				collector.collect(rnd.nextInt(9) + 1);
-
-				// Wait 0.001 sec. before the next emit. Otherwise the source is
-				// too fast for local tests and you might always see
-				// SUM[k=1..9999](k) as result.
-				Thread.sleep(1);
-				counter++;
-			}
-		}
-	}
-
-	private static final class MyKey implements KeySelector<Integer, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer getKey(Integer value) throws Exception {
-			if (value < 2) {
-				return 0;
-			} else {
-				return 1;
-			}
-		}
-
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 1) {
-				outputPath = args[0];
-			} else {
-				System.err.println("Usage: TimeWindowingExample <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing TimeWindowingExample with generated data.");
-			System.out.println("  Provide parameter to write to file.");
-			System.out.println("  Usage: TimeWindowingExample <result path>");
-		}
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
deleted file mode 100644
index 0f5d8eb..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.helper.Delta;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.util.Collector;
-
-import java.util.Arrays;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-/**
- * An example of grouped stream windowing where different eviction and trigger
- * policies can be used. A source fetches events from cars every 1 sec
- * containing their id, their current speed (kmh), overall elapsed distance (m)
- * and a timestamp. The streaming example triggers the top speed of each car
- * every x meters elapsed for the last y seconds.
- */
-public class TopSpeedWindowingExample {
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		@SuppressWarnings({ "rawtypes", "serial" })
-		DataStream topSpeeds = env
-				.addSource(CarSource.create(numOfCars))
-				.groupBy(0)
-				.window(Time.of(evictionSec, TimeUnit.SECONDS))
-				.every(Delta.of(triggerMeters,
-						new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
-							@Override
-							public double getDelta(
-									Tuple4<Integer, Integer, Double, Long> oldDataPoint,
-									Tuple4<Integer, Integer, Double, Long> newDataPoint) {
-								return newDataPoint.f2 - oldDataPoint.f2;
-							}
-						}, new Tuple4<Integer, Integer, Double, Long>(0, 0, 0d, 0l))).maxBy(1);
-
-		topSpeeds.print();
-		env.execute("CarTopSpeedWindowingExample");
-	}
-
-	private static class CarSource implements
-			SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
-
-		private static final long serialVersionUID = 1L;
-		private Integer[] speeds;
-		private Double[] distances;
-
-		private Random rand = new Random();
-
-		private CarSource(int numOfCars) {
-			speeds = new Integer[numOfCars];
-			distances = new Double[numOfCars];
-			Arrays.fill(speeds, 50);
-			Arrays.fill(distances, 0d);
-		}
-
-		public static CarSource create(int cars) {
-			return new CarSource(cars);
-		}
-
-		@Override
-		public void invoke(Collector<Tuple4<Integer, Integer, Double, Long>> collector)
-				throws Exception {
-
-			while (true) {
-				Thread.sleep(1000);
-				for (int carId = 0; carId < speeds.length; carId++) {
-					if (rand.nextBoolean()) {
-						speeds[carId] = Math.min(100, speeds[carId] + 5);
-					} else {
-						speeds[carId] = Math.max(0, speeds[carId] - 5);
-					}
-					distances[carId] += speeds[carId] / 3.6d;
-					collector.collect(new Tuple4<Integer, Integer, Double, Long>(carId,
-							speeds[carId], distances[carId], System.currentTimeMillis()));
-				}
-			}
-		}
-	}
-
-	private static int numOfCars = 2;
-	private static int evictionSec = 10;
-	private static double triggerMeters = 50;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			if (args.length == 3) {
-				numOfCars = Integer.valueOf(args[0]);
-				evictionSec = Integer.valueOf(args[1]);
-				triggerMeters = Double.valueOf(args[2]);
-			} else {
-				System.err
-						.println("Usage: TopSpeedWindowingExample <numCars> <evictSec> <triggerMeters>");
-				return false;
-			}
-		}
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
deleted file mode 100644
index f377863..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.examples.wordcount;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.Collector;
-
-/**
- * This example shows an implementation of WordCount without using the Tuple2
- * type, but a custom class.
- * 
- * <p>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
- * 
- * <p>
- * This example shows how to:
- * <ul>
- * <li>use POJO data types,
- * <li>write a simple Flink program,
- * <li>write and use user-defined functions. 
- * </ul>
- */
-public class PojoExample {
-	
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		DataStream<String> text = getTextDataStream(env);
-
-		DataStream<Word> counts =
-		// split up the lines into Word objects
-		text.flatMap(new Tokenizer())
-		// group by the field word and sum up the frequency
-				.groupBy("word").sum("frequency");
-
-		if (fileOutput) {
-			counts.writeAsText(outputPath);
-		} else {
-			counts.print();
-		}
-
-		// execute program
-		env.execute("WordCount Pojo Example");
-	}
-
-	// *************************************************************************
-	// DATA TYPES
-	// *************************************************************************
-
-	/**
-	 * This is the POJO (Plain Old Java Object) that is being used for all the
-	 * operations. As long as all fields are public or have a getter/setter, the
-	 * system can handle them
-	 */
-	public static class Word {
-
-		private String word;
-		private Integer frequency;
-
-		public Word() {
-		}
-
-		public Word(String word, int i) {
-			this.word = word;
-			this.frequency = i;
-		}
-
-		public String getWord() {
-			return word;
-		}
-
-		public void setWord(String word) {
-			this.word = word;
-		}
-
-		public Integer getFrequency() {
-			return frequency;
-		}
-
-		public void setFrequency(Integer frequency) {
-			this.frequency = frequency;
-		}
-
-		@Override
-		public String toString() {
-			return "(" + word + ", " + frequency + ")";
-		}
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	/**
-	 * Implements the string tokenizer that splits sentences into words as a
-	 * user-defined FlatMapFunction. The function takes a line (String) and
-	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
-	 * Integer>).
-	 */
-	public static final class Tokenizer implements FlatMapFunction<String, Word> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(String value, Collector<Word> out) {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Word(token, 1));
-				}
-			}
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: PojoExample <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing PojoExample example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: PojoExample <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return env.fromElements(WordCountData.WORDS);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
deleted file mode 100644
index 085fe5f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.wordcount;
-
-import java.util.StringTokenizer;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.Collector;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
- * 
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * 
- * <p>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
- * 
- * <p>
- * This example shows how to:
- * <ul>
- * <li>write a simple Flink Streaming program,
- * <li>use tuple data types,
- * <li>write and use user-defined functions.
- * </ul>
- * 
- */
-public class WordCount {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		DataStream<String> text = getTextDataStream(env);
-
-		DataStream<Tuple2<String, Integer>> counts =
-		// split up the lines in pairs (2-tuples) containing: (word,1)
-		text.flatMap(new Tokenizer())
-		// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0).sum(1);
-
-		// emit result
-		if (fileOutput) {
-			counts.writeAsText(outputPath, 1);
-		} else {
-			counts.print();
-		}
-
-		// execute program
-		env.execute("Streaming WordCount");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	/**
-	 * Implements the string tokenizer that splits sentences into words as a
-	 * user-defined FlatMapFunction. The function takes a line (String) and
-	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
-	 * Integer>).
-	 */
-	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(String inTuple, Collector<Tuple2<String, Integer>> out)
-				throws Exception {
-			// tokenize the line
-			StringTokenizer tokenizer = new StringTokenizer(inTuple);
-
-			// emit the pairs
-			while (tokenizer.hasMoreTokens()) {
-				out.collect(new Tuple2<String, Integer>(tokenizer.nextToken(), 1));
-			}
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: WordCount <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WordCount example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: WordCount <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return env.fromElements(WordCountData.WORDS);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
deleted file mode 100644
index b38764c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.examples.socket
-
-import org.apache.flink.streaming.api.scala._
-
-/**
- * This example shows an implementation of WordCount with data from a text socket. 
- * To run the example make sure that the service providing the text data is already up and running.
- *
- * To start an example socket text stream on your local machine run netcat from a command line, 
- * where the parameter specifies the port number:
- *
- * {{{
- *   nc -lk 9999
- * }}}
- *
- * Usage:
- * {{{
- *   SocketTextStreamWordCount <hostname> <port> <output path>
- * }}}
- *
- * This example shows how to:
- *
- *   - use StreamExecutionEnvironment.socketTextStream
- *   - write a simple Flink Streaming program in scala.
- *   - write and use user-defined functions.
- */
-object SocketTextStreamWordCount {
-
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    //Create streams for names and ages by mapping the inputs to the corresponding objects
-    val text = env.socketTextStream(hostName, port)
-    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
-      .map { (_, 1) }
-      .groupBy(0)
-      .sum(1)
-
-    if (fileOutput) {
-      counts.writeAsCsv(outputPath, 1)
-    } else {
-      counts print
-    }
-
-    env.execute("Scala SocketTextStreamWordCount Example")
-  }
-
-  private def parseParameters(args: Array[String]): Boolean = {
-      if (args.length == 3) {
-        fileOutput = true
-        hostName = args(0)
-        port = args(1).toInt
-        outputPath = args(2)
-      } else if (args.length == 2) {
-        hostName = args(0)
-        port = args(1).toInt
-      } else {
-        System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]")
-        return false
-      }
-    true
-  }
-
-  private var fileOutput: Boolean = false
-  private var hostName: String = null
-  private var port: Int = 0
-  private var outputPath: String = null
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
deleted file mode 100644
index 1091aa3..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.examples.windowing
-
-import java.util.concurrent.TimeUnit._
-
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.windowing.{Delta, Time}
-
-import scala.Stream._
-import scala.math._
-import scala.util.Random
-
-/**
- * An example of grouped stream windowing where different eviction and 
- * trigger policies can be used. A source fetches events from cars 
- * every 1 sec containing their id, their current speed (kmh),
- * overall elapsed distance (m) and a timestamp. The streaming
- * example triggers the top speed of each car every x meters elapsed 
- * for the last y seconds.
- */
-object TopSpeedWindowing {
-
-  case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) extends Serializable
-
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val cars = genCarStream().groupBy("carId")
-      .window(Time.of(evictionSec, SECONDS))
-      .every(Delta.of[CarEvent](triggerMeters,
-          (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
-      .maxBy("speed")
-
-    cars print
-
-    StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing")
-
-  }
-
-  def genCarStream(): DataStream[CarEvent] = {
-
-    def nextSpeed(carEvent : CarEvent) : CarEvent =
-    {
-      val next =
-        if (Random.nextBoolean) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5)
-      CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis)
-    }
-    def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
-    {
-      Thread.sleep(1000)
-      speeds.append(carStream(speeds.map(nextSpeed)))
-    }
-    carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis())))
-  }
-
-  def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      if (args.length == 3) {
-        numOfCars = args(0).toInt
-        evictionSec = args(1).toInt
-        triggerMeters = args(2).toDouble
-      }
-      else {
-        System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>")
-        false
-      }
-    }
-    true
-  }
-
-  var numOfCars = 2
-  var evictionSec = 10
-  var triggerMeters = 50d
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
deleted file mode 100644
index 0b78365..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.examples.windowing
-
-import org.apache.flink.streaming.api.scala._
-
-import scala.Stream._
-import scala.util.Random
-import java.util.concurrent.TimeUnit
-
-object WindowJoin {
-
-  case class Name(id: Long, name: String)
-  case class Age(id: Long, age: Int)
-  case class Person(name: String, age: Long)
-
-  def main(args: Array[String]) {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    //Create streams for names and ages by mapping the inputs to the corresponding objects
-    val names = env.fromCollection(nameStream).map(x => Name(x._1, x._2))
-    val ages = env.fromCollection(ageStream).map(x => Age(x._1, x._2))
-
-    //Join the two input streams by id on the last 2 seconds every second and create new 
-    //Person objects containing both name and age
-    val joined =
-      names.join(ages).onWindow(2, TimeUnit.SECONDS)
-                      .every(1, TimeUnit.SECONDS)
-                      .where("id")
-                      .equalTo("id") { (n, a) => Person(n.name, a.age) }
-
-    joined print
-
-    env.execute("WindowJoin")
-  }
-
-  def nameStream() : Stream[(Long,String)] = {
-    def nameMapper(names: Array[String])(x: Int) : (Long, String) =
-    {
-      if(x%100==0) Thread.sleep(1000)
-      (x, names(Random.nextInt(names.length)))
-    }
-    range(1,10000).map(nameMapper(Array("tom", "jerry", "alice", "bob", "john", "grace")))
-  }
-
-  def ageStream() : Stream[(Long,Int)] = {
-    def ageMapper(x: Int) : (Long, Int) =
-    {
-      if(x%100==0) Thread.sleep(1000)
-      (x, Random.nextInt(90))
-    }
-    range(1,10000).map(ageMapper)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/pom.xml b/flink-addons/flink-streaming/flink-streaming-scala/pom.xml
deleted file mode 100644
index c06fba7..0000000
--- a/flink-addons/flink-streaming/flink-streaming-scala/pom.xml
+++ /dev/null
@@ -1,217 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-parent</artifactId>
-		<version>0.9-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-streaming-scala</artifactId>
-	<name>flink-streaming-scala</name>
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>scala-reflect</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>scala-library</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>scala-compiler</artifactId>
-		</dependency>
-
-		<dependency>
-		   <groupId>org.scalamacros</groupId>
-		   <artifactId>quasiquotes_${scala.binary.version}</artifactId>
-		   <version>${scala.macros.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.ow2.asm</groupId>
-			<artifactId>asm</artifactId>
-		</dependency>
-		
-		<!--  guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading -->
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-	</dependencies>
-
-	<build>
-		<plugins>
-			<!-- Scala Compiler -->
-			<plugin>
-				<groupId>net.alchim31.maven</groupId>
-				<artifactId>scala-maven-plugin</artifactId>
-				<version>3.1.4</version>
-				<executions>
-					<!-- Run scala compiler in the process-resources phase, so that dependencies on
-						scala classes can be resolved later in the (Java) compile phase -->
-					<execution>
-						<id>scala-compile-first</id>
-						<phase>process-resources</phase>
-						<goals>
-							<goal>compile</goal>
-						</goals>
-					</execution>
- 
-					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
-						 scala classes can be resolved later in the (Java) test-compile phase -->
-					<execution>
-						<id>scala-test-compile</id>
-						<phase>process-test-resources</phase>
-						<goals>
-							<goal>testCompile</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<jvmArgs>
-						<jvmArg>-Xms128m</jvmArg>
-						<jvmArg>-Xmx512m</jvmArg>
-					</jvmArgs>
-					<compilerPlugins>
-					   <compilerPlugin>
-						   <groupId>org.scalamacros</groupId>
-						   <artifactId>paradise_${scala.version}</artifactId>
-						   <version>${scala.macros.version}</version>
-					   </compilerPlugin>
-				   </compilerPlugins>
-				</configuration>
-			</plugin>
-			
-			<!-- Eclipse Integration -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-eclipse-plugin</artifactId>
-				<version>2.8</version>
-				<configuration>
-					<downloadSources>true</downloadSources>
-					<projectnatures>
-						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-					</projectnatures>
-					<buildcommands>
-						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-					</buildcommands>
-					<classpathContainers>
-						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-					</classpathContainers>
-					<excludes>
-						<exclude>org.scala-lang:scala-library</exclude>
-						<exclude>org.scala-lang:scala-compiler</exclude>
-					</excludes>
-					<sourceIncludes>
-						<sourceInclude>**/*.scala</sourceInclude>
-						<sourceInclude>**/*.java</sourceInclude>
-					</sourceIncludes>
-				</configuration>
-			</plugin>
-
-			<!-- Adding scala source directories to build path -->
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<!-- Add src/main/scala to eclipse build path -->
-					<execution>
-						<id>add-source</id>
-						<phase>generate-sources</phase>
-						<goals>
-							<goal>add-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/main/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-					<!-- Add src/test/scala to eclipse build path -->
-					<execution>
-						<id>add-test-source</id>
-						<phase>generate-test-sources</phase>
-						<goals>
-							<goal>add-test-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/test/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<plugin>
-				<groupId>org.scalastyle</groupId>
-				<artifactId>scalastyle-maven-plugin</artifactId>
-				<version>0.5.0</version>
-				<executions>
-					<execution>
-						<goals>
-							<goal>check</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<verbose>false</verbose>
-					<failOnViolation>true</failOnViolation>
-					<includeTestSourceDirectory>true</includeTestSourceDirectory>
-					<failOnWarning>false</failOnWarning>
-					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
-					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
-					<configLocation>${project.basedir}/../../../tools/maven/scalastyle-config.xml</configLocation>
-					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
-					<outputEncoding>UTF-8</outputEncoding>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java b/flink-addons/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java
deleted file mode 100644
index 77d102d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.streaming.scala;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.function.aggregation.SumFunction;
-
-import scala.Product;
-
-public class ScalaStreamingAggregator<IN extends Product> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	TupleSerializerBase<IN> serializer;
-	Object[] fields;
-	int length;
-	int position;
-
-	public ScalaStreamingAggregator(TypeSerializer<IN> serializer, int pos) {
-		this.serializer = (TupleSerializerBase<IN>) serializer;
-		this.length = this.serializer.getArity();
-		this.fields = new Object[this.length];
-		this.position = pos;
-	}
-
-	public class Sum extends AggregationFunction<IN> {
-		private static final long serialVersionUID = 1L;
-		SumFunction sumFunction;
-
-		public Sum(SumFunction func) {
-			super(ScalaStreamingAggregator.this.position);
-			this.sumFunction = func;
-		}
-
-		@Override
-		public IN reduce(IN value1, IN value2) throws Exception {
-			for (int i = 0; i < length; i++) {
-				fields[i] = value2.productElement(i);
-			}
-
-			fields[position] = sumFunction.add(fields[position], value1.productElement(position));
-
-			return serializer.createInstance(fields);
-		}
-	}
-
-	public class ProductComparableAggregator extends ComparableAggregator<IN> {
-
-		private static final long serialVersionUID = 1L;
-
-		public ProductComparableAggregator(AggregationFunction.AggregationType aggregationType,
-				boolean first) {
-			super(ScalaStreamingAggregator.this.position, aggregationType, first);
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public IN reduce(IN value1, IN value2) throws Exception {
-			Object v1 = value1.productElement(position);
-			Object v2 = value2.productElement(position);
-
-			int c = comparator.isExtremal((Comparable<Object>) v1, v2);
-
-			if (byAggregate) {
-				if (c == 1) {
-					return value1;
-				}
-				if (first) {
-					if (c == 0) {
-						return value1;
-					}
-				}
-
-				return value2;
-			} else {
-				for (int i = 0; i < length; i++) {
-					fields[i] = value2.productElement(i);
-				}
-
-				if (c == 1) {
-					fields[position] = v1;
-				}
-
-				return serializer.createInstance(fields);
-			}
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
deleted file mode 100644
index d60e796..0000000
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.util
-
-import scala.collection.JavaConversions.asScalaBuffer
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream}
-import org.apache.flink.streaming.api.function.co.{ CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction }
-import org.apache.flink.streaming.api.invokable.operator.co.{ CoFlatMapInvokable, CoMapInvokable, CoReduceInvokable }
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
-import org.apache.flink.util.Collector
-
-class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
-
-  /**
-   * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
-   * the output to a common type. The transformation calls a
-   * @param fun1 for each element of the first input and
-   * @param fun2 for each element of the second input. Each
-   * CoMapFunction call returns exactly one element.
-   *
-   * The CoMapFunction used to jointly transform the two input
-   * DataStreams
-   * @return The transformed { @link DataStream}
-   */
-  def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): 
-  DataStream[R] = {
-    if (fun1 == null || fun2 == null) {
-      throw new NullPointerException("Map function must not be null.")
-    }
-    val comapper = new CoMapFunction[IN1, IN2, R] {
-      def map1(in1: IN1): R = clean(fun1)(in1)
-      def map2(in2: IN2): R = clean(fun2)(in2)
-    }
-
-    new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]],
-      new CoMapInvokable[IN1, IN2, R](comapper)))
-  }
-
-  /**
-   * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
-   * the output to a common type. The transformation calls a
-   * {@link CoMapFunction#map1} for each element of the first input and
-   * {@link CoMapFunction#map2} for each element of the second input. Each
-   * CoMapFunction call returns exactly one element. The user can also extend
-   * {@link RichCoMapFunction} to gain access to other features provided by
-   * the {@link RichFuntion} interface.
-   *
-   * @param coMapper
-   * The CoMapFunction used to jointly transform the two input
-   * DataStreams
-   * @return The transformed { @link DataStream}
-   */
-  def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): 
-  DataStream[R] = {
-    if (coMapper == null) {
-      throw new NullPointerException("Map function must not be null.")
-    }
-
-    new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]],
-      new CoMapInvokable[IN1, IN2, R](coMapper)))
-  }
-
-  /**
-   * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
-   * maps the output to a common type. The transformation calls a
-   * {@link CoFlatMapFunction#flatMap1} for each element of the first input
-   * and {@link CoFlatMapFunction#flatMap2} for each element of the second
-   * input. Each CoFlatMapFunction call returns any number of elements
-   * including none. The user can also extend {@link RichFlatMapFunction} to
-   * gain access to other features provided by the {@link RichFuntion}
-   * interface.
-   *
-   * @param coFlatMapper
-   * The CoFlatMapFunction used to jointly transform the two input
-   * DataStreams
-   * @return The transformed { @link DataStream}
-   */
-  def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): 
-  DataStream[R] = {
-    if (coFlatMapper == null) {
-      throw new NullPointerException("FlatMap function must not be null.")
-    }
-    new DataStream[R](javaStream.addCoFunction("flatMap", implicitly[TypeInformation[R]],
-      new CoFlatMapInvokable[IN1, IN2, R](coFlatMapper)))
-  }
-
-  /**
-   * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
-   * maps the output to a common type. The transformation calls a
-   * @param fun1 for each element of the first input
-   * and @param fun2 for each element of the second
-   * input. Each CoFlatMapFunction call returns any number of elements
-   * including none.
-   *
-   * @return The transformed { @link DataStream}
-   */
-  def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, 
-      fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
-    if (fun1 == null || fun2 == null) {
-      throw new NullPointerException("FlatMap functions must not be null.")
-    }
-    val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
-      def flatMap1(value: IN1, out: Collector[R]): Unit = clean(fun1)(value, out)
-      def flatMap2(value: IN2, out: Collector[R]): Unit = clean(fun2)(value, out)
-    }
-    flatMap(flatMapper)
-  }
-
-  /**
-   * GroupBy operation for connected data stream. Groups the elements of
-   * input1 and input2 according to keyPosition1 and keyPosition2. Used for
-   * applying function on grouped data streams for example
-   * {@link ConnectedDataStream#reduce}
-   *
-   * @param keyPosition1
-   * The field used to compute the hashcode of the elements in the
-   * first input stream.
-   * @param keyPosition2
-   * The field used to compute the hashcode of the elements in the
-   * second input stream.
-   * @return @return The transformed { @link ConnectedDataStream}
-   */
-  def groupBy(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = {
-    javaStream.groupBy(keyPosition1, keyPosition2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream. Groups the elements of
-   * input1 and input2 according to keyPositions1 and keyPositions2. Used for
-   * applying function on grouped data streams for example
-   * {@link ConnectedDataStream#reduce}
-   *
-   * @param keyPositions1
-   * The fields used to group the first input stream.
-   * @param keyPositions2
-   * The fields used to group the second input stream.
-   * @return @return The transformed { @link ConnectedDataStream}
-   */
-  def groupBy(keyPositions1: Array[Int], keyPositions2: Array[Int]): 
-  ConnectedDataStream[IN1, IN2] = {
-    javaStream.groupBy(keyPositions1, keyPositions2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream using key expressions. Groups
-   * the elements of input1 and input2 according to field1 and field2. A field
-   * expression is either the name of a public field or a getter method with
-   * parentheses of the {@link DataStream}S underlying type. A dot can be used
-   * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-   *
-   * @param field1
-   * The grouping expression for the first input
-   * @param field2
-   * The grouping expression for the second input
-   * @return The grouped { @link ConnectedDataStream}
-   */
-  def groupBy(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = {
-    javaStream.groupBy(field1, field2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream using key expressions. Groups
-   * the elements of input1 and input2 according to fields1 and fields2. A
-   * field expression is either the name of a public field or a getter method
-   * with parentheses of the {@link DataStream}S underlying type. A dot can be
-   * used to drill down into objects, as in {@code "field1.getInnerField2()" }
-   * .
-   *
-   * @param fields1
-   * The grouping expressions for the first input
-   * @param fields2
-   * The grouping expressions for the second input
-   * @return The grouped { @link ConnectedDataStream}
-   */
-  def groupBy(fields1: Array[String], fields2: Array[String]): 
-  ConnectedDataStream[IN1, IN2] = {
-    javaStream.groupBy(fields1, fields2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream. Groups the elements of
-   * input1 and input2 using fun1 and fun2. Used for applying
-   * function on grouped data streams for example
-   * {@link ConnectedDataStream#reduce}
-   *
-   * @param fun1
-   * The function used for grouping the first input
-   * @param fun2
-   * The function used for grouping the second input
-   * @return @return The transformed { @link ConnectedDataStream}
-   */
-  def groupBy[K: TypeInformation](fun1: IN1 => _, fun2: IN2 => _):
-  ConnectedDataStream[IN1, IN2] = {
-
-    val keyExtractor1 = new KeySelector[IN1, Any] {
-      def getKey(in: IN1) = clean(fun1)(in)
-    }
-    val keyExtractor2 = new KeySelector[IN2, Any] {
-      def getKey(in: IN2) = clean(fun2)(in)
-    }
-
-    javaStream.groupBy(keyExtractor1, keyExtractor2)
-  }
-
-  /**
-   * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
-   * the outputs to a common type. If the {@link ConnectedDataStream} is
-   * batched or windowed then the reduce transformation is applied on every
-   * sliding batch/window of the data stream. If the connected data stream is
-   * grouped then the reducer is applied on every group of elements sharing
-   * the same key. This type of reduce is much faster than reduceGroup since
-   * the reduce function can be applied incrementally.
-   *
-   * @param coReducer
-   * The { @link CoReduceFunction} that will be called for every
-   *             element of the inputs.
-   * @return The transformed { @link DataStream}.
-   */
-  def reduce[R: TypeInformation: ClassTag](coReducer: CoReduceFunction[IN1, IN2, R]): 
-  DataStream[R] = {
-    if (coReducer == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-
-    new DataStream[R](javaStream.addCoFunction("coReduce", implicitly[TypeInformation[R]],
-      new CoReduceInvokable[IN1, IN2, R](coReducer)))
-  }
-
-  /**
-   * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
-   * the outputs to a common type. If the {@link ConnectedDataStream} is
-   * batched or windowed then the reduce transformation is applied on every
-   * sliding batch/window of the data stream. If the connected data stream is
-   * grouped then the reducer is applied on every group of elements sharing
-   * the same key. This type of reduce is much faster than reduceGroup since
-   * the reduce function can be applied incrementally.
-   *
-   * @param reducer1
-   * @param reducer2
-   * @param mapper1
-   * @param mapper2
-   *
-   * @return The transformed { @link DataStream}.
-   */
-  def reduce[R: TypeInformation: ClassTag](reducer1: (IN1, IN1) => IN1, 
-      reducer2: (IN2, IN2) => IN2,mapper1: IN1 => R, mapper2: IN2 => R): DataStream[R] = {
-    if (mapper1 == null || mapper2 == null) {
-      throw new NullPointerException("Map functions must not be null.")
-    }
-    if (reducer1 == null || reducer2 == null) {
-      throw new NullPointerException("Reduce functions must not be null.")
-    }
-
-    val reducer = new CoReduceFunction[IN1, IN2, R] {
-      def reduce1(value1: IN1, value2: IN1): IN1 = clean(reducer1)(value1, value2)
-      def map2(value: IN2): R = clean(mapper2)(value)
-      def reduce2(value1: IN2, value2: IN2): IN2 = clean(reducer2)(value1, value2)
-      def map1(value: IN1): R = clean(mapper1)(value)
-    }
-    reduce(reducer)
-  }
-
-  /**
-   * Applies a CoWindow transformation on the connected DataStreams. The
-   * transformation calls the {@link CoWindowFunction#coWindow} method for for
-   * time aligned windows of the two data streams. System time is used as
-   * default to compute windows.
-   *
-   * @param coWindowFunction
-   * The { @link CoWindowFunction} that will be applied for the time
-   *             windows.
-   * @param windowSize
-   * Size of the windows that will be aligned for both streams in
-   * milliseconds.
-   * @param slideInterval
-   * After every function call the windows will be slid by this
-   * interval.
-   *
-   * @return The transformed { @link DataStream}.
-   */
-  def windowReduce[R: TypeInformation: ClassTag](coWindowFunction: 
-      CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long) = {
-    if (coWindowFunction == null) {
-      throw new NullPointerException("CoWindow function must no be null")
-    }
-
-    javaStream.windowReduce(coWindowFunction, windowSize, slideInterval)
-  }
-
-  /**
-   * Applies a CoWindow transformation on the connected DataStreams. The
-   * transformation calls the {@link CoWindowFunction#coWindow} method for for
-   * time aligned windows of the two data streams. System time is used as
-   * default to compute windows.
-   *
-   * @param coWindower
-   * The coWindowing function to be applied for the time windows.
-   * @param windowSize
-   * Size of the windows that will be aligned for both streams in
-   * milliseconds.
-   * @param slideInterval
-   * After every function call the windows will be slid by this
-   * interval.
-   *
-   * @return The transformed { @link DataStream}.
-   */
-  def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], Seq[IN2], 
-      Collector[R]) => Unit, windowSize: Long, slideInterval: Long) = {
-    if (coWindower == null) {
-      throw new NullPointerException("CoWindow function must no be null")
-    }
-
-    val coWindowFun = new CoWindowFunction[IN1, IN2, R] {
-      def coWindow(first: util.List[IN1], second: util.List[IN2], 
-          out: Collector[R]): Unit = clean(coWindower)(first, second, out)
-    }
-
-    javaStream.windowReduce(coWindowFun, windowSize, slideInterval)
-  }
-
-  /**
-   * Returns the first {@link DataStream}.
-   *
-   * @return The first DataStream.
-   */
-  def getFirst(): DataStream[IN1] = {
-    javaStream.getFirst
-  }
-
-  /**
-   * Returns the second {@link DataStream}.
-   *
-   * @return The second DataStream.
-   */
-  def getSecond(): DataStream[IN2] = {
-    javaStream.getSecond
-  }
-
-  /**
-   * Gets the type of the first input
-   *
-   * @return The type of the first input
-   */
-  def getInputType1(): TypeInformation[IN1] = {
-    javaStream.getInputType1
-  }
-
-  /**
-   * Gets the type of the second input
-   *
-   * @return The type of the second input
-   */
-  def getInputType2(): TypeInformation[IN2] = {
-    javaStream.getInputType2
-  }
-
-}


Mime
View raw message