flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [3/4] incubator-flink git commit: [streaming] Pojo wordcount example added to streaming
Date Sun, 09 Nov 2014 19:08:47 GMT
[streaming] Pojo wordcount example added to streaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/97d465b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/97d465b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/97d465b4

Branch: refs/heads/master
Commit: 97d465b496dc3f502a989f6df5c3362084c41a12
Parents: 7ae5804
Author: Gyula Fora <gyfora@apache.org>
Authored: Wed Nov 5 17:49:34 2014 +0100
Committer: mbalassi <balassi.marton@gmail.com>
Committed: Sun Nov 9 13:16:46 2014 +0100

----------------------------------------------------------------------
 .../examples/wordcount/PojoWordCount.java       | 169 +++++++++++++++++++
 .../examples/java/wordcount/PojoExample.java    |   4 +-
 2 files changed, 171 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/97d465b4/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
new file mode 100644
index 0000000..e95f042
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.examples.wordcount;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example shows an implementation of Wordcount without using the Tuple2
+ * type, but a custom class.
+ *
+ */
+public class PojoWordCount {
+
+	/**
+	 * This is the POJO (Plain Old Java Object) that is being used for all the
+	 * operations. As long as all fields are public or have a getter/setter, the
+	 * system can handle them
+	 */
+	public static class Word {
+		// fields
+		private String word;
+		private Integer frequency;
+
+		// constructors
+		public Word() {
+		}
+
+		public Word(String word, int i) {
+			this.word = word;
+			this.frequency = i;
+		}
+
+		// getters setters
+		public String getWord() {
+			return word;
+		}
+
+		public void setWord(String word) {
+			this.word = word;
+		}
+
+		public Integer getFrequency() {
+			return frequency;
+		}
+
+		public void setFrequency(Integer frequency) {
+			this.frequency = frequency;
+		}
+
+		// to String
+		@Override
+		public String toString() {
+			return "Word=" + word + " freq=" + frequency;
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		DataStream<String> text = getTextDataStream(env);
+
+		DataStream<Word> counts =
+		// split up the lines into Word objects
+		text.flatMap(new Tokenizer())
+		// group by the field word and sum up the frequency
+		.groupBy("word")
+		.sum("frequency");
+
+		if (fileOutput) {
+			counts.writeAsText(outputPath);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("WordCount-Pojo Example");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a
+	 * user-defined FlatMapFunction. The function takes a line (String) and
+	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
+	 * Integer>).
+	 */
+	public static final class Tokenizer implements FlatMapFunction<String, Word> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String value, Collector<Word> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Word(token, 1));
+				}
+			}
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: WordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WordCount <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env)
{
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return env.fromElements(WordCountData.WORDS);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/97d465b4/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
index 363c7a3..f3364fd 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
@@ -84,9 +84,9 @@ public class PojoExample {
 		DataSet<String> text = getTextDataSet(env);
 		
 		DataSet<Word> counts = 
-			// split up the lines in pairs (2-tuples) containing: (word,1)
+			// split up the lines into Word objects (with frequency = 1)
 			text.flatMap(new Tokenizer())
-			// group by the tuple field "0" and sum up tuple field "1"
+			// group by the field word and sum up the frequency
 			.groupBy("word")
 			.reduce(new ReduceFunction<Word>() {
 				@Override


Mime
View raw message