flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [06/27] flink git commit: [storm-compat] Added Storm compatibility word count examples
Date Mon, 15 Jun 2015 09:32:56 GMT
[storm-compat] Added Storm compatibility word count examples


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/000b5d53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/000b5d53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/000b5d53

Branch: refs/heads/master
Commit: 000b5d53d0d4fe33f3bf9f31edc35f8c3eaa26b1
Parents: 56e013f
Author: mjsax <mjsax@informatik.hu-berlin.de>
Authored: Thu May 14 12:57:46 2015 +0200
Committer: mbalassi <mbalassi@apache.org>
Committed: Sun Jun 14 22:59:24 2015 +0200

----------------------------------------------------------------------
 .../wordcount/BoltTokenizerWordCount.java       | 127 +++++++++++++++
 .../wordcount/SpoutSourceWordCount.java         | 158 +++++++++++++++++++
 .../wordcount/StormWordCountLocal.java          |  80 ++++++++++
 .../wordcount/StormWordCountRemoteByClient.java |  88 +++++++++++
 .../StormWordCountRemoteBySubmitter.java        |  89 +++++++++++
 .../wordcount/WordCountTopology.java            | 123 +++++++++++++++
 6 files changed, 665 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/000b5d53/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
new file mode 100644
index 0000000..8a7fc4f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer;
+import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import backtype.storm.topology.IRichBolt;
+
+
+
+
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over
text files in a streaming
+ * fashion. The tokenizer step is performed by a Storm {@link IRichBolt bolt}.
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Storm bolt within a Flink Streaming program.
+ * </ul>
+ */
+public class BoltTokenizerWordCount {
+	
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+	
+	public static void main(final String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		
+		// get input data
+		final DataStream<String> text = getTextDataStream(env);
+		
+		final DataStream<Tuple2<String, Integer>> counts = text
+		// split up the lines in pairs (2-tuples) containing: (word,1)
+		// this is done by a Storm bolt that is wrapped accordingly
+			.transform("StormBoltTokenizer",
+				TypeExtractor.getForObject(new Tuple2<String, Integer>(new String(), new Integer(0))),
+				new StormBoltWrapper<String, Tuple2<String, Integer>>(new StormBoltTokenizer()))
+			// split up the lines in pairs (2-tuples) containing: (word,1)
+			// group by the tuple field "0" and sum up tuple field "1"
+			.groupBy(0).sum(1);
+		
+		// emit result
+		if(fileOutput) {
+			counts.writeAsText(outputPath);
+		} else {
+			counts.print();
+		}
+		
+		// execute program
+		env.execute("Streaming WordCount with Storm bolt tokenizer");
+	}
+	
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+	
+	private static boolean parseParameters(final String[] args) {
+		
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: WordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WordCount <text path> <result path>");
+		}
+		return true;
+	}
+	
+	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment
env) {
+		if(fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		}
+		
+		return env.fromElements(WordCountData.WORDS);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/000b5d53/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
new file mode 100644
index 0000000..caec8eb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormFileSpout;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormInMemorySpout;
+import org.apache.flink.stormcompatibility.wrappers.StormFiniteSpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+import backtype.storm.topology.IRichSpout;
+
+
+
+
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over
text files in a streaming
+ * fashion. The used data source is a Storm {@link IRichSpout bolt}.
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Storm bolt within a Flink Streaming program.
+ * </ul>
+ */
+public class SpoutSourceWordCount {
+	
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+	
+	public static void main(final String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		
+		// get input data
+		final DataStream<String> text = getTextDataStream(env);
+		
+		final DataStream<Tuple2<String, Integer>> counts =
+		// split up the lines in pairs (2-tuples) containing: (word,1)
+		text.flatMap(new Tokenizer())
+		// group by the tuple field "0" and sum up tuple field "1"
+			.groupBy(0).sum(1);
+		
+		// emit result
+		if(fileOutput) {
+			counts.writeAsText(outputPath);
+		} else {
+			counts.print();
+		}
+		
+		// execute program
+		env.execute("Streaming WordCount with Storm spout source");
+	}
+	
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+	
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a user-defined FlatMapFunction.
The function
+	 * takes a line (String) and splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
Integer>).
+	 */
+	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String,
Integer>> {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public void flatMap(final String value, final Collector<Tuple2<String, Integer>>
out) throws Exception {
+			// normalize and split the line
+			final String[] tokens = value.toLowerCase().split("\\W+");
+			
+			// emit the pairs
+			for(final String token : tokens) {
+				if(token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, new Integer(1)));
+				}
+			}
+		}
+	}
+	
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+	
+	private static boolean parseParameters(final String[] args) {
+		
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: WordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WordCount <text path> <result path>");
+		}
+		return true;
+	}
+	
+	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment
env) {
+		if(fileOutput) {
+			// read the text file from given input path
+			final String[] tokens = textPath.split(":");
+			final String localFile = tokens[tokens.length - 1];
+			final DataStream<String> stream = env.addSource(
+				new StormFiniteSpoutWrapper<String>(new StormFileSpout(localFile), true),
+				TypeExtractor.getForClass(String.class)).setParallelism(1);
+			return stream;
+		}
+		
+		return env.addSource(new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(),
true),
+			TypeExtractor.getForClass(String.class));
+		
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/000b5d53/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
new file mode 100644
index 0000000..99a2c8e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.utils.Utils;
+
+
+
+
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over
text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology} and submitted to
Flink for execution in the
+ * same way as to a Storm {@link LocalCluster}.
+ * 
+ * This example shows how to run program directly within Java, thus it cannot be used to
submit a {@link StormTopology}
+ * via Flink command line clients (ie, bin/flink).
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>run a regular Storm program locally on Flink
+ * </ul>
+ */
+public class StormWordCountLocal {
+	public final static String topologyId = "Streaming WordCount";
+	
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+	
+	public static void main(final String[] args) throws Exception {
+		
+		if(!WordCountTopology.parseParameters(args)) {
+			return;
+		}
+		
+		// build Topology the Storm way
+		final FlinkTopologyBuilder builder = WordCountTopology.buildTopology();
+		
+		// execute program locally
+		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
+		cluster.submitTopology(topologyId, null, builder.createTopology());
+		
+		Utils.sleep(5 * 1000);
+		
+		// TODO kill does no do anything so far
+		cluster.killTopology(topologyId);
+		cluster.shutdown();
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/000b5d53/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
new file mode 100644
index 0000000..7f3a496
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.api.FlinkClient;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.NotAliveException;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+
+
+
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over
text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology} and submitted to
Flink for execution in the
+ * same way as to a Storm cluster similar to {@link NimbusClient}. The Flink cluster can
be local or remote.
+ * 
+ * This example shows how to submit the program via Java, thus it cannot be used to submit
a {@link StormTopology} via
+ * Flink command line clients (ie, bin/flink).
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>submit a regular Storm program to a local or remote Flink cluster.
+ * </ul>
+ */
+public class StormWordCountRemoteByClient {
+	public final static String topologyId = "Streaming WordCount";
+	private final static String uploadedJarLocation = "target/flink-storm-examples-0.9-SNAPSHOT-WordCountStorm.jar";
+	
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+	
+	public static void main(final String[] args) throws AlreadyAliveException, InvalidTopologyException,
+		NotAliveException {
+		
+		if(!WordCountTopology.parseParameters(args)) {
+			return;
+		}
+		
+		// build Topology the Storm way
+		final FlinkTopologyBuilder builder = WordCountTopology.buildTopology();
+		
+		// execute program on Flink cluster
+		final Config conf = new Config();
+		conf.put(Config.NIMBUS_HOST, "localhost"); // can be changed to remote address
+		conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123)); // use default flink jobmanger.rpc.port
+		
+		final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);
+		cluster.submitTopology(topologyId, uploadedJarLocation, null, builder.createTopology());
+		
+		Utils.sleep(5 * 1000);
+		
+		cluster.killTopology(topologyId);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/000b5d53/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
new file mode 100644
index 0000000..dadca67
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.api.FlinkClient;
+import org.apache.flink.stormcompatibility.api.FlinkSubmitter;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+
+
+
+
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over
text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology} and submitted to
Flink for execution in the
+ * same way as to a Storm cluster similar to {@link StormSubmitter}. The Flink cluster can
be local or remote.
+ * 
+ * This example shows how to submit the program via Java as well as Flink's command line
client (ie, bin/flink).
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>submit a regular Storm program to a local or remote Flink cluster.
+ * </ul>
+ */
+public class StormWordCountRemoteBySubmitter {
+	public final static String topologyId = "Streaming WordCount";
+	
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+	
+	public static void main(final String[] args) throws Exception {
+		
+		if(!WordCountTopology.parseParameters(args)) {
+			return;
+		}
+		
+		// build Topology the Storm way
+		final FlinkTopologyBuilder builder = WordCountTopology.buildTopology();
+		
+		// execute program on Flink cluster
+		
+		final Config conf = new Config();
+		// we can set Jobmanager host/port values manually or leave them blank
+		// if not set and
+		// - executed within Java, default values "localhost" and "6123" are set by FlinkSubmitter
+		// - executed via bin/flink values from flink-conf.yaml are set by FlinkSubmitter
+		// conf.put(Config.NIMBUS_HOST, "localhost"); // can be changed to remote address
+		// conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123)); // use default flink jobmanger.rpc.port
+		
+		// the user jar file must be specified via JVM argument if executed via Java
+		// => -Dstorm.jar=target/flink-storm-examples-0.9-SNAPSHOT-WordCountStorm.jar
+		// if bin/flink is used, the jar file is detected automatically
+		FlinkSubmitter.submitTopology(topologyId, conf, builder.createTopology());
+		
+		Thread.sleep(5 * 1000);
+		
+		FlinkClient.getConfiguredClient(conf).killTopology(topologyId);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/000b5d53/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
new file mode 100644
index 0000000..c9bf7b9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounter;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltFileSink;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltPrintSink;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormFileSpout;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormInMemorySpout;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+
+
+
+
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over
text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology}.
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>how to construct a regular Storm topology as Flink program
+ * </ul>
+ */
+public class WordCountTopology {
+	public final static String spoutId = "source";
+	public final static String tokenierzerId = "tokenizer";
+	public final static String counterId = "counter";
+	public final static String sinkId = "sink";
+	
+	
+	
+	public static FlinkTopologyBuilder buildTopology() {
+		
+		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+		
+		// get input data
+		if(fileInputOutput) {
+			// read the text file from given input path
+			final String[] tokens = textPath.split(":");
+			final String inputFile = tokens[tokens.length - 1];
+			builder.setSpout(spoutId, new StormFileSpout(inputFile));
+		} else {
+			builder.setSpout(spoutId, new StormInMemorySpout());
+		}
+		
+		// split up the lines in pairs (2-tuples) containing: (word,1)
+		builder.setBolt(tokenierzerId, new StormBoltTokenizer(), new Integer(4)).shuffleGrouping(spoutId);
+		// group by the tuple field "0" and sum up tuple field "1"
+		builder.setBolt(counterId, new StormBoltCounter(), new Integer(4)).fieldsGrouping(tokenierzerId,
+			new Fields(StormBoltTokenizer.ATTRIBUTE_WORD));
+		
+		// emit result
+		if(fileInputOutput) {
+			// read the text file from given input path
+			final String[] tokens = outputPath.split(":");
+			final String outputFile = tokens[tokens.length - 1];
+			builder.setBolt(sinkId, new StormBoltFileSink(outputFile)).shuffleGrouping(counterId);
+		} else {
+			builder.setBolt(sinkId, new StormBoltPrintSink(), new Integer(4)).shuffleGrouping(counterId);
+		}
+		
+		return builder;
+	}
+	
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileInputOutput = false;
+	private static String textPath;
+	private static String outputPath;
+	
+	static boolean parseParameters(final String[] args) {
+		
+		if(args.length > 0) {
+			// parse input arguments
+			fileInputOutput = true;
+			if(args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: WordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WordCount <text path> <result path>");
+		}
+		
+		return true;
+	}
+	
+}


Mime
View raw message