flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [07/20] flink git commit: [FLINK-1231] [streaming] More sophisticated streaming classloader ITCase
Date Mon, 16 Feb 2015 14:25:33 GMT
[FLINK-1231] [streaming] More sophisticated streaming classloader ITCase

This closes #303


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26ae9793
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26ae9793
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26ae9793

Branch: refs/heads/master
Commit: 26ae9793cf6dbccdfb525637ea6caffa0f1712e9
Parents: 055f6dc
Author: mbalassi <mbalassi@apache.org>
Authored: Fri Feb 13 15:57:39 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Feb 16 13:06:07 2015 +0100

----------------------------------------------------------------------
 .../test/classloading/jar/StreamingProgram.java | 73 +++++++++++++++-----
 1 file changed, 57 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/26ae9793/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index 6799d9b..f7f9eae 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.test.recordJobs.wordcount.WordCount;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.util.Collector;
 
@@ -41,23 +42,63 @@ public class StreamingProgram {
 
 		DataStream<String> text = env.fromElements(WordCountData.TEXT);
 
-		DataStream<Tuple2<String, Integer>> counts =
-				text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
-					@Override
-					public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
throws Exception {
-						StringTokenizer tokenizer = new StringTokenizer(value);
-						while (tokenizer.hasMoreTokens()){
-							out.collect(new Tuple2<String, Integer>(tokenizer.nextToken(), 1));
-						}
-					}
-				}).groupBy(0).sum(1);
-
-		counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
-			@Override
-			public void invoke(Tuple2<String, Integer> value) throws Exception {
-			}
-		});
+		DataStream<Word> counts =
+				text.flatMap(new Tokenizer()).groupBy("word").sum("frequency");
+
+		counts.addSink(new NoOpSink());
 
 		env.execute();
 	}
+	// --------------------------------------------------------------------------------------------
+
+	public static class Word {
+
+		private String word;
+		private Integer frequency;
+
+		public Word() {
+		}
+
+		public Word(String word, int i) {
+			this.word = word;
+			this.frequency = i;
+		}
+
+		public String getWord() {
+			return word;
+		}
+
+		public void setWord(String word) {
+			this.word = word;
+		}
+
+		public Integer getFrequency() {
+			return frequency;
+		}
+
+		public void setFrequency(Integer frequency) {
+			this.frequency = frequency;
+		}
+
+		@Override
+		public String toString() {
+			return "(" + word + ", " + frequency + ")";
+		}
+	}
+
+	public static class Tokenizer implements FlatMapFunction<String, Word>{
+		@Override
+		public void flatMap(String value, Collector<Word> out) throws Exception {
+			StringTokenizer tokenizer = new StringTokenizer(value);
+			while (tokenizer.hasMoreTokens()){
+				out.collect(new Word(tokenizer.nextToken(), 1));
+			}
+		}
+	}
+
+	public static class NoOpSink implements SinkFunction<Word>{
+		@Override
+		public void invoke(Word value) throws Exception {
+		}
+	}
 }


Mime
View raw message