Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 50A8318E0B for ; Mon, 15 Jun 2015 09:32:57 +0000 (UTC) Received: (qmail 90320 invoked by uid 500); 15 Jun 2015 09:32:52 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 90200 invoked by uid 500); 15 Jun 2015 09:32:52 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 89949 invoked by uid 99); 15 Jun 2015 09:32:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Jun 2015 09:32:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DBC69E045F; Mon, 15 Jun 2015 09:32:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbalassi@apache.org To: commits@flink.apache.org Date: Mon, 15 Jun 2015 09:32:56 -0000 Message-Id: <8427a27ad31940438ceb08d4e2c54dce@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/27] flink git commit: [storm-compat] Added Storm compatibility word count examples [storm-compat] Added Storm compatibility word count examples Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/000b5d53 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/000b5d53 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/000b5d53 Branch: refs/heads/master Commit: 000b5d53d0d4fe33f3bf9f31edc35f8c3eaa26b1 Parents: 56e013f Author: mjsax Authored: Thu May 14 12:57:46 2015 +0200 Committer: mbalassi Committed: Sun Jun 14 22:59:24 2015 +0200 ---------------------------------------------------------------------- .../wordcount/BoltTokenizerWordCount.java | 127 +++++++++++++++ .../wordcount/SpoutSourceWordCount.java | 158 +++++++++++++++++++ .../wordcount/StormWordCountLocal.java | 80 ++++++++++ .../wordcount/StormWordCountRemoteByClient.java | 88 +++++++++++ .../StormWordCountRemoteBySubmitter.java | 89 +++++++++++ .../wordcount/WordCountTopology.java | 123 +++++++++++++++ 6 files changed, 665 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/000b5d53/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java new file mode 100644 index 0000000..8a7fc4f --- /dev/null +++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.stormcompatibility.wordcount; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer; +import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import backtype.storm.topology.IRichBolt; + + + + + +/** + * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming + * fashion. The tokenizer step is performed by a Storm {@link IRichBolt bolt}. + * + *

+ * The input is a plain text file with lines separated by newline characters. + * + *

+ * Usage: WordCount <text path> <result path>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}. + * + *

+ * This example shows how to: + *

    + *
  • use a Storm bolt within a Flink Streaming program. + *
+ */ +public class BoltTokenizerWordCount { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(final String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // set up the execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // get input data + final DataStream text = getTextDataStream(env); + + final DataStream> counts = text + // split up the lines in pairs (2-tuples) containing: (word,1) + // this is done by a Storm bolt that is wrapped accordingly + .transform("StormBoltTokenizer", + TypeExtractor.getForObject(new Tuple2(new String(), new Integer(0))), + new StormBoltWrapper>(new StormBoltTokenizer())) + // split up the lines in pairs (2-tuples) containing: (word,1) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0).sum(1); + + // emit result + if(fileOutput) { + counts.writeAsText(outputPath); + } else { + counts.print(); + } + + // execute program + env.execute("Streaming WordCount with Storm bolt tokenizer"); + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String textPath; + private static String outputPath; + + private static boolean parseParameters(final String[] args) { + + if(args.length > 0) { + // parse input arguments + fileOutput = true; + if(args.length == 2) { + textPath = args[0]; + outputPath = args[1]; + } else { + System.err.println("Usage: WordCount "); + return false; + } + } else { + System.out.println("Executing WordCount example with built-in default data."); + System.out.println(" Provide parameters to read input data from a file."); + System.out.println(" Usage: WordCount "); + } + return true; + } + + private static DataStream getTextDataStream(final StreamExecutionEnvironment env) { + if(fileOutput) { + // read the text file from given input path + return env.readTextFile(textPath); + } + + return env.fromElements(WordCountData.WORDS); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/000b5d53/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java new file mode 100644 index 0000000..caec8eb --- /dev/null +++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.stormcompatibility.wordcount; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormFileSpout; +import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormInMemorySpout; +import org.apache.flink.stormcompatibility.wrappers.StormFiniteSpoutWrapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; + +import backtype.storm.topology.IRichSpout; + + + + + +/** + * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming + * fashion. The used data source is a Storm {@link IRichSpout bolt}. + * + *

+ * The input is a plain text file with lines separated by newline characters. + * + *

+ * Usage: WordCount <text path> <result path>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}. + * + *

+ * This example shows how to: + *

    + *
  • use a Storm bolt within a Flink Streaming program. + *
+ */ +public class SpoutSourceWordCount { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(final String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // set up the execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // get input data + final DataStream text = getTextDataStream(env); + + final DataStream> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0).sum(1); + + // emit result + if(fileOutput) { + counts.writeAsText(outputPath); + } else { + counts.print(); + } + + // execute program + env.execute("Streaming WordCount with Storm spout source"); + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Implements the string tokenizer that splits sentences into words as a user-defined FlatMapFunction. The function + * takes a line (String) and splits it into multiple pairs in the form of "(word,1)" (Tuple2). + */ + public static final class Tokenizer implements FlatMapFunction> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(final String value, final Collector> out) throws Exception { + // normalize and split the line + final String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for(final String token : tokens) { + if(token.length() > 0) { + out.collect(new Tuple2(token, new Integer(1))); + } + } + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String textPath; + private static String outputPath; + + private static boolean parseParameters(final String[] args) { + + if(args.length > 0) { + // parse input arguments + fileOutput = true; + if(args.length == 2) { + textPath = args[0]; + outputPath = args[1]; + } else { + System.err.println("Usage: WordCount "); + return false; + } + } else { + System.out.println("Executing WordCount example with built-in default data."); + System.out.println(" Provide parameters to read input data from a file."); + System.out.println(" Usage: WordCount "); + } + return true; + } + + private static DataStream getTextDataStream(final StreamExecutionEnvironment env) { + if(fileOutput) { + // read the text file from given input path + final String[] tokens = textPath.split(":"); + final String localFile = tokens[tokens.length - 1]; + final DataStream stream = env.addSource( + new StormFiniteSpoutWrapper(new StormFileSpout(localFile), true), + TypeExtractor.getForClass(String.class)).setParallelism(1); + return stream; + } + + return env.addSource(new StormFiniteSpoutWrapper(new StormInMemorySpout(), true), + TypeExtractor.getForClass(String.class)); + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/000b5d53/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java new file mode 100644 index 0000000..99a2c8e --- /dev/null +++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.stormcompatibility.wordcount; + +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.stormcompatibility.api.FlinkLocalCluster; +import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; + +import backtype.storm.LocalCluster; +import backtype.storm.generated.StormTopology; +import backtype.storm.utils.Utils; + + + + + +/** + * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming + * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the + * same way as to a Storm {@link LocalCluster}. + * + * This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology} + * via Flink command line clients (ie, bin/flink). + * + *

+ * The input is a plain text file with lines separated by newline characters. + * + *

+ * Usage: WordCount <text path> <result path>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}. + * + *

+ * This example shows how to: + *

    + *
  • run a regular Storm program locally on Flink + *
+ */ +public class StormWordCountLocal { + public final static String topologyId = "Streaming WordCount"; + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(final String[] args) throws Exception { + + if(!WordCountTopology.parseParameters(args)) { + return; + } + + // build Topology the Storm way + final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(); + + // execute program locally + final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); + cluster.submitTopology(topologyId, null, builder.createTopology()); + + Utils.sleep(5 * 1000); + + // TODO kill does no do anything so far + cluster.killTopology(topologyId); + cluster.shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/000b5d53/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java new file mode 100644 index 0000000..7f3a496 --- /dev/null +++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.stormcompatibility.wordcount; + +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.stormcompatibility.api.FlinkClient; +import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; + +import backtype.storm.Config; +import backtype.storm.generated.AlreadyAliveException; +import backtype.storm.generated.InvalidTopologyException; +import backtype.storm.generated.NotAliveException; +import backtype.storm.generated.StormTopology; +import backtype.storm.utils.NimbusClient; +import backtype.storm.utils.Utils; + + + + + +/** + * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming + * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the + * same way as to a Storm cluster similar to {@link NimbusClient}. The Flink cluster can be local or remote. + * + * This example shows how to submit the program via Java, thus it cannot be used to submit a {@link StormTopology} via + * Flink command line clients (ie, bin/flink). + * + *

+ * The input is a plain text file with lines separated by newline characters. + * + *

+ * Usage: WordCount <text path> <result path>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}. + * + *

+ * This example shows how to: + *

    + *
  • submit a regular Storm program to a local or remote Flink cluster. + *
+ */ +public class StormWordCountRemoteByClient { + public final static String topologyId = "Streaming WordCount"; + private final static String uploadedJarLocation = "target/flink-storm-examples-0.9-SNAPSHOT-WordCountStorm.jar"; + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(final String[] args) throws AlreadyAliveException, InvalidTopologyException, + NotAliveException { + + if(!WordCountTopology.parseParameters(args)) { + return; + } + + // build Topology the Storm way + final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(); + + // execute program on Flink cluster + final Config conf = new Config(); + conf.put(Config.NIMBUS_HOST, "localhost"); // can be changed to remote address + conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123)); // use default flink jobmanger.rpc.port + + final FlinkClient cluster = FlinkClient.getConfiguredClient(conf); + cluster.submitTopology(topologyId, uploadedJarLocation, null, builder.createTopology()); + + Utils.sleep(5 * 1000); + + cluster.killTopology(topologyId); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/000b5d53/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java new file mode 100644 index 0000000..dadca67 --- /dev/null +++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.stormcompatibility.wordcount; + +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.stormcompatibility.api.FlinkClient; +import org.apache.flink.stormcompatibility.api.FlinkSubmitter; +import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; + +import backtype.storm.Config; +import backtype.storm.StormSubmitter; +import backtype.storm.generated.StormTopology; + + + + + +/** + * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming + * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the + * same way as to a Storm cluster similar to {@link StormSubmitter}. The Flink cluster can be local or remote. + * + * This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink). + * + *

+ * The input is a plain text file with lines separated by newline characters. + * + *

+ * Usage: WordCount <text path> <result path>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}. + * + *

+ * This example shows how to: + *

    + *
  • submit a regular Storm program to a local or remote Flink cluster. + *
+ */ +public class StormWordCountRemoteBySubmitter { + public final static String topologyId = "Streaming WordCount"; + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(final String[] args) throws Exception { + + if(!WordCountTopology.parseParameters(args)) { + return; + } + + // build Topology the Storm way + final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(); + + // execute program on Flink cluster + + final Config conf = new Config(); + // we can set Jobmanager host/port values manually or leave them blank + // if not set and + // - executed within Java, default values "localhost" and "6123" are set by FlinkSubmitter + // - executed via bin/flink values from flink-conf.yaml are set by FlinkSubmitter + // conf.put(Config.NIMBUS_HOST, "localhost"); // can be changed to remote address + // conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123)); // use default flink jobmanger.rpc.port + + // the user jar file must be specified via JVM argument if executed via Java + // => -Dstorm.jar=target/flink-storm-examples-0.9-SNAPSHOT-WordCountStorm.jar + // if bin/flink is used, the jar file is detected automatically + FlinkSubmitter.submitTopology(topologyId, conf, builder.createTopology()); + + Thread.sleep(5 * 1000); + + FlinkClient.getConfiguredClient(conf).killTopology(topologyId); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/000b5d53/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java new file mode 100644 index 0000000..c9bf7b9 --- /dev/null +++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.stormcompatibility.wordcount; + +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; +import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounter; +import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltFileSink; +import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltPrintSink; +import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer; +import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormFileSpout; +import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormInMemorySpout; + +import backtype.storm.generated.StormTopology; +import backtype.storm.tuple.Fields; + + + + + +/** + * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming + * fashion. The program is constructed as a regular {@link StormTopology}. + * + *

+ * The input is a plain text file with lines separated by newline characters. + * + *

+ * Usage: WordCount <text path> <result path>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}. + * + *

+ * This example shows how to: + *

    + *
  • how to construct a regular Storm topology as Flink program + *
+ */ +public class WordCountTopology { + public final static String spoutId = "source"; + public final static String tokenierzerId = "tokenizer"; + public final static String counterId = "counter"; + public final static String sinkId = "sink"; + + + + public static FlinkTopologyBuilder buildTopology() { + + final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); + + // get input data + if(fileInputOutput) { + // read the text file from given input path + final String[] tokens = textPath.split(":"); + final String inputFile = tokens[tokens.length - 1]; + builder.setSpout(spoutId, new StormFileSpout(inputFile)); + } else { + builder.setSpout(spoutId, new StormInMemorySpout()); + } + + // split up the lines in pairs (2-tuples) containing: (word,1) + builder.setBolt(tokenierzerId, new StormBoltTokenizer(), new Integer(4)).shuffleGrouping(spoutId); + // group by the tuple field "0" and sum up tuple field "1" + builder.setBolt(counterId, new StormBoltCounter(), new Integer(4)).fieldsGrouping(tokenierzerId, + new Fields(StormBoltTokenizer.ATTRIBUTE_WORD)); + + // emit result + if(fileInputOutput) { + // read the text file from given input path + final String[] tokens = outputPath.split(":"); + final String outputFile = tokens[tokens.length - 1]; + builder.setBolt(sinkId, new StormBoltFileSink(outputFile)).shuffleGrouping(counterId); + } else { + builder.setBolt(sinkId, new StormBoltPrintSink(), new Integer(4)).shuffleGrouping(counterId); + } + + return builder; + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileInputOutput = false; + private static String textPath; + private static String outputPath; + + static boolean parseParameters(final String[] args) { + + if(args.length > 0) { + // parse input arguments + fileInputOutput = true; + if(args.length == 2) { + textPath = args[0]; + outputPath = args[1]; + } else { + System.err.println("Usage: WordCount "); + return false; + } + } else { + System.out.println("Executing WordCount example with built-in default data."); + System.out.println(" Provide parameters to read input data from a file."); + System.out.println(" Usage: WordCount "); + } + + return true; + } + +}