flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [2/2] flink git commit: [streaming] Fixed streaming example jars packaging and termination
Date Wed, 10 Jun 2015 15:21:12 GMT
[streaming] Fixed streaming example jars packaging and termination

Closes #816


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3a96de1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3a96de1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3a96de1

Branch: refs/heads/release-0.9
Commit: f3a96de1e271f64eec9eec60135a41c9f6f77cd9
Parents: 255c554
Author: mbalassi <mbalassi@apache.org>
Authored: Wed Jun 10 11:32:45 2015 +0200
Committer: mbalassi <mbalassi@apache.org>
Committed: Wed Jun 10 17:20:45 2015 +0200

----------------------------------------------------------------------
 .../flink-streaming-examples/pom.xml            |  14 +-
 .../examples/iteration/IterateExample.java      |   8 +-
 .../streaming/examples/join/WindowJoin.java     |   8 +-
 .../ml/IncrementalLearningSkeleton.java         |  71 +------
 .../examples/windowing/TopSpeedWindowing.java   | 187 +++++++++++++++++++
 .../windowing/TopSpeedWindowingExample.java     | 183 ------------------
 .../TopSpeedWindowingExampleITCase.java         |   4 +-
 7 files changed, 215 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f3a96de1/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
index 77eb00f..bc32474 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
@@ -98,6 +98,7 @@ under the License.
 						</goals>
 						<configuration>
 							<artifactItems>
+								<!-- For WordCount example data -->
 								<artifactItem>
 									<groupId>org.apache.flink</groupId>
 									<artifactId>flink-java-examples</artifactId>
@@ -107,6 +108,16 @@ under the License.
 									<outputDirectory>${project.build.directory}/classes</outputDirectory>
 									<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
 								</artifactItem>
+								<!-- For JSON utilities -->
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-connector-twitter</artifactId>
+									<version>${project.version}</version>
+									<type>jar</type>
+									<overWrite>false</overWrite>
+									<outputDirectory>${project.build.directory}/classes</outputDirectory>
+									<includes>org/apache/flink/streaming/connectors/json/*</includes>
+								</artifactItem>
 							</artifactItems>
 						</configuration>
 					</execution>
@@ -190,7 +201,8 @@ under the License.
 
 							<includes>
 								<include>org/apache/flink/streaming/examples/twitter/*.class</include>
-								<include>org/apache/flink/streaming/examples/twitter/util/*.class</include>
					
+								<include>org/apache/flink/streaming/examples/twitter/util/*.class</include>
+								<include>org/apache/flink/streaming/connectors/json/*.class</include>
 							</includes>
 						</configuration>
 					</execution>

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a96de1/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 78d361d..8860e58 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -103,7 +103,7 @@ public class IterateExample {
 	// *************************************************************************
 
 	/**
-	 * Generate random integer pairs from the range from 0 to BOUND/2
+	 * Generate BOUND number of random integer pairs from the range from 0 to BOUND/2
 	 */
 	private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer,
Integer>> {
 		private static final long serialVersionUID = 1L;
@@ -111,16 +111,18 @@ public class IterateExample {
 		private Random rnd = new Random();
 
 		private volatile boolean isRunning = true;
+		private int counter = 0;
 
 		@Override
 		public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception
{
 
-			while (isRunning) {
+			while (isRunning && counter < BOUND) {
 				int first = rnd.nextInt(BOUND / 2 - 1) + 1;
 				int second = rnd.nextInt(BOUND / 2 - 1) + 1;
 
 				ctx.collect(new Tuple2<Integer, Integer>(first, second));
-				Thread.sleep(500L);
+				counter++;
+				Thread.sleep(50L);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a96de1/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 0fec4c6..5230e9b 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -104,6 +104,7 @@ public class WindowJoin {
 		private Random rand;
 		private Tuple2<String, Integer> outTuple;
 		private volatile boolean isRunning = true;
+		private int counter;
 
 		public GradeSource() {
 			rand = new Random();
@@ -112,10 +113,11 @@ public class WindowJoin {
 
 		@Override
 		public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception
{
-			while (isRunning) {
+			while (isRunning && counter < 100) {
 				outTuple.f0 = names[rand.nextInt(names.length)];
 				outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
 				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+				counter++;
 				ctx.collect(outTuple);
 			}
 		}
@@ -135,6 +137,7 @@ public class WindowJoin {
 		private transient Random rand;
 		private transient Tuple2<String, Integer> outTuple;
 		private volatile boolean isRunning;
+		private int counter;
 
 		public void open(Configuration parameters) throws Exception {
 			super.open(parameters);
@@ -146,10 +149,11 @@ public class WindowJoin {
 
 		@Override
 		public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception
{
-			while (isRunning) {
+			while (isRunning && counter < 100) {
 				outTuple.f0 = names[rand.nextInt(names.length)];
 				outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
 				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+				counter++;
 				ctx.collect(outTuple);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a96de1/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 48111f6..99d45bb 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -61,8 +61,8 @@ public class IncrementalLearningSkeleton {
 		}
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		// env.setDegreeOfParallelism(1);
-		createSourceStreams(env);
+		trainingData = env.addSource(new FiniteTrainingDataSource());
+		newData = env.addSource(new FiniteNewDataSource());
 
 		// build new model on every second of new data
 		DataStream<Double[]> model = trainingData.window(Time.of(5000, new LinearTimestamp()))
@@ -90,34 +90,6 @@ public class IncrementalLearningSkeleton {
 	 * Feeds new data for newData. By default it is implemented as constantly
 	 * emitting the Integer 1 in a loop.
 	 */
-	public static class NewDataSource implements SourceFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-		private static final int NEW_DATA_SLEEP_TIME = 1000;
-
-		private volatile boolean isRunning = true;
-
-		@Override
-		public void run(SourceContext<Integer> ctx) throws Exception {
-			while (isRunning) {
-				ctx.collect(getNewData());
-			}
-		}
-
-		private Integer getNewData() throws InterruptedException {
-			Thread.sleep(NEW_DATA_SLEEP_TIME);
-			return 1;
-		}
-		
-		@Override
-		public void cancel() {
-			isRunning = true;
-		}
-	}
-
-	/**
-	 * Feeds new data for newData. By default it is implemented as constantly
-	 * emitting the Integer 1 in a loop.
-	 */
 	public static class FiniteNewDataSource implements SourceFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 		private int counter;
@@ -146,36 +118,6 @@ public class IncrementalLearningSkeleton {
 	 * Feeds new training data for the partial model builder. By default it is
 	 * implemented as constantly emitting the Integer 1 in a loop.
 	 */
-	public static class TrainingDataSource implements SourceFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-		private static final int TRAINING_DATA_SLEEP_TIME = 10;
-
-		private volatile boolean isRunning = true;
-
-		@Override
-		public void run(SourceContext<Integer> collector) throws Exception {
-			while (isRunning) {
-				collector.collect(getTrainingData());
-			}
-
-		}
-
-		private Integer getTrainingData() throws InterruptedException {
-			Thread.sleep(TRAINING_DATA_SLEEP_TIME);
-			return 1;
-
-		}
-		
-		@Override
-		public void cancel() {
-			isRunning = false;
-		}
-	}
-
-	/**
-	 * Feeds new training data for the partial model builder. By default it is
-	 * implemented as constantly emitting the Integer 1 in a loop.
-	 */
 	public static class FiniteTrainingDataSource implements SourceFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 		private int counter = 0;
@@ -292,13 +234,4 @@ public class IncrementalLearningSkeleton {
 		return true;
 	}
 
-	public static void createSourceStreams(StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			trainingData = env.addSource(new FiniteTrainingDataSource());
-			newData = env.addSource(new FiniteNewDataSource());
-		} else {
-			trainingData = env.addSource(new TrainingDataSource());
-			newData = env.addSource(new NewDataSource());
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a96de1/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
new file mode 100644
index 0000000..e7eb4da
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.helper.Delta;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+
+import java.util.Arrays;
+import java.util.Random;
+
+/**
+ * An example of grouped stream windowing where different eviction and trigger
+ * policies can be used. A source fetches events from cars every 1 sec
+ * containing their id, their current speed (kmh), overall elapsed distance (m)
+ * and a timestamp. The streaming example triggers the top speed of each car
+ * every x meters elapsed for the last y seconds.
+ */
+public class TopSpeedWindowing {
+
+	private static final int NUM_CAR_EVENTS = 100;
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		@SuppressWarnings({"rawtypes", "serial"})
+		DataStream<Tuple4<Integer, Integer, Double, Long>> carData;
+		if (fileInput) {
+			carData = env.readTextFile(inputPath).map(new ParseCarData());
+		} else {
+			carData = env.addSource(CarSource.create(numOfCars));
+		}
+		DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData.groupBy(0)
+				.window(Time.of(evictionSec, new CarTimestamp()))
+				.every(Delta.of(triggerMeters,
+						new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
+							private static final long serialVersionUID = 1L;
+
+
+							@Override
+							public double getDelta(
+									Tuple4<Integer, Integer, Double, Long> oldDataPoint,
+									Tuple4<Integer, Integer, Double, Long> newDataPoint) {
+								return newDataPoint.f2 - oldDataPoint.f2;
+							}
+						}, new Tuple4<Integer, Integer, Double, Long>(0, 0, 0d, 0l))).local().maxBy(1).flatten();
+		if (fileOutput) {
+			topSpeeds.writeAsText(outputPath);
+		} else {
+			topSpeeds.print();
+		}
+
+		env.execute("CarTopSpeedWindowingExample");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private static class CarSource implements SourceFunction<Tuple4<Integer, Integer,
Double, Long>> {
+
+		private static final long serialVersionUID = 1L;
+		private Integer[] speeds;
+		private Double[] distances;
+
+		private Random rand = new Random();
+
+		private volatile boolean isRunning = true;
+		private int counter;
+
+		private CarSource(int numOfCars) {
+			speeds = new Integer[numOfCars];
+			distances = new Double[numOfCars];
+			Arrays.fill(speeds, 50);
+			Arrays.fill(distances, 0d);
+		}
+
+		public static CarSource create(int cars) {
+			return new CarSource(cars);
+		}
+
+		@Override
+		public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx)
throws Exception {
+
+			while (isRunning && counter < NUM_CAR_EVENTS) {
+				Thread.sleep(100);
+				for (int carId = 0; carId < speeds.length; carId++) {
+					if (rand.nextBoolean()) {
+						speeds[carId] = Math.min(100, speeds[carId] + 5);
+					} else {
+						speeds[carId] = Math.max(0, speeds[carId] - 5);
+					}
+					distances[carId] += speeds[carId] / 3.6d;
+					Tuple4<Integer, Integer, Double, Long> record = new Tuple4<Integer, Integer,
Double, Long>(carId,
+							speeds[carId], distances[carId], System.currentTimeMillis());
+					ctx.collect(record);
+					counter++;
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+	}
+
+	private static class ParseCarData extends
+			RichMapFunction<String, Tuple4<Integer, Integer, Double, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple4<Integer, Integer, Double, Long> map(String record) {
+			String rawData = record.substring(1, record.length() - 1);
+			String[] data = rawData.split(",");
+			return new Tuple4<Integer, Integer, Double, Long>(Integer.valueOf(data[0]),
+					Integer.valueOf(data[1]), Double.valueOf(data[2]), Long.valueOf(data[3]));
+		}
+	}
+
+	private static class CarTimestamp implements Timestamp<Tuple4<Integer, Integer, Double,
Long>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long getTimestamp(Tuple4<Integer, Integer, Double, Long> value) {
+			return value.f3;
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileInput = false;
+	private static boolean fileOutput = false;
+	private static int numOfCars = 2;
+	private static int evictionSec = 10;
+	private static double triggerMeters = 50;
+	private static String inputPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			if (args.length == 2) {
+				fileInput = true;
+				fileOutput = true;
+				inputPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: TopSpeedWindowingExample <input path> <output path>");
+				return false;
+			}
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a96de1/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
deleted file mode 100644
index 657ce2a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.helper.Delta;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-
-import java.util.Arrays;
-import java.util.Random;
-
-/**
- * An example of grouped stream windowing where different eviction and trigger
- * policies can be used. A source fetches events from cars every 1 sec
- * containing their id, their current speed (kmh), overall elapsed distance (m)
- * and a timestamp. The streaming example triggers the top speed of each car
- * every x meters elapsed for the last y seconds.
- */
-public class TopSpeedWindowingExample {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		@SuppressWarnings({"rawtypes", "serial"})
-		DataStream<Tuple4<Integer, Integer, Double, Long>> carData;
-		if (fileInput) {
-			carData = env.readTextFile(inputPath).map(new ParseCarData());
-		} else {
-			carData = env.addSource(CarSource.create(numOfCars));
-		}
-		DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData.groupBy(0)
-				.window(Time.of(evictionSec, new CarTimestamp()))
-				.every(Delta.of(triggerMeters,
-						new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
-							private static final long serialVersionUID = 1L;
-
-
-							@Override
-							public double getDelta(
-									Tuple4<Integer, Integer, Double, Long> oldDataPoint,
-									Tuple4<Integer, Integer, Double, Long> newDataPoint) {
-								return newDataPoint.f2 - oldDataPoint.f2;
-							}
-						}, new Tuple4<Integer, Integer, Double, Long>(0, 0, 0d, 0l))).local().maxBy(1).flatten();
-		if (fileOutput) {
-			topSpeeds.writeAsText(outputPath);
-		} else {
-			topSpeeds.print();
-		}
-
-		env.execute("CarTopSpeedWindowingExample");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	private static class CarSource implements SourceFunction<Tuple4<Integer, Integer,
Double, Long>> {
-
-		private static final long serialVersionUID = 1L;
-		private Integer[] speeds;
-		private Double[] distances;
-
-		private Random rand = new Random();
-
-		private volatile boolean isRunning = true;
-
-		private CarSource(int numOfCars) {
-			speeds = new Integer[numOfCars];
-			distances = new Double[numOfCars];
-			Arrays.fill(speeds, 50);
-			Arrays.fill(distances, 0d);
-		}
-
-		public static CarSource create(int cars) {
-			return new CarSource(cars);
-		}
-
-		@Override
-		public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx)
throws Exception {
-
-			while (isRunning) {
-				Thread.sleep(1000);
-				for (int carId = 0; carId < speeds.length; carId++) {
-					if (rand.nextBoolean()) {
-						speeds[carId] = Math.min(100, speeds[carId] + 5);
-					} else {
-						speeds[carId] = Math.max(0, speeds[carId] - 5);
-					}
-					distances[carId] += speeds[carId] / 3.6d;
-					Tuple4<Integer, Integer, Double, Long> record = new Tuple4<Integer, Integer,
Double, Long>(carId,
-							speeds[carId], distances[carId], System.currentTimeMillis());
-					ctx.collect(record);
-				}
-			}
-		}
-
-		@Override
-		public void cancel() {
-			isRunning = false;
-		}
-	}
-
-	private static class ParseCarData extends
-			RichMapFunction<String, Tuple4<Integer, Integer, Double, Long>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple4<Integer, Integer, Double, Long> map(String record) {
-			String rawData = record.substring(1, record.length() - 1);
-			String[] data = rawData.split(",");
-			return new Tuple4<Integer, Integer, Double, Long>(Integer.valueOf(data[0]),
-					Integer.valueOf(data[1]), Double.valueOf(data[2]), Long.valueOf(data[3]));
-		}
-	}
-
-	private static class CarTimestamp implements Timestamp<Tuple4<Integer, Integer, Double,
Long>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long getTimestamp(Tuple4<Integer, Integer, Double, Long> value) {
-			return value.f3;
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileInput = false;
-	private static boolean fileOutput = false;
-	private static int numOfCars = 2;
-	private static int evictionSec = 10;
-	private static double triggerMeters = 50;
-	private static String inputPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			if (args.length == 2) {
-				fileInput = true;
-				fileOutput = true;
-				inputPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: TopSpeedWindowingExample <input path> <output path>");
-				return false;
-			}
-		}
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a96de1/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
index b38e51f..fb0ab0a 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.test.exampleJavaPrograms.windowing;
 
-import org.apache.flink.streaming.examples.windowing.TopSpeedWindowingExample;
+import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing;
 import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
 import org.apache.flink.streaming.util.StreamingProgramTestBase;
 
@@ -39,7 +39,7 @@ public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase
{
 
 	@Override
 	protected void testProgram() throws Exception {
-		TopSpeedWindowingExample.main(new String[]{textPath, resultPath});
+		TopSpeedWindowing.main(new String[]{textPath, resultPath});
 
 	}
 }


Mime
View raw message