Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8B4C9114E4 for ; Wed, 24 Sep 2014 19:52:12 +0000 (UTC) Received: (qmail 31947 invoked by uid 500); 24 Sep 2014 19:52:12 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 31918 invoked by uid 500); 24 Sep 2014 19:52:12 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 31909 invoked by uid 99); 24 Sep 2014 19:52:12 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Sep 2014 19:52:12 +0000 X-ASF-Spam-Status: No, hits=-2000.8 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 24 Sep 2014 19:51:38 +0000 Received: (qmail 31402 invoked by uid 99); 24 Sep 2014 19:51:35 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Sep 2014 19:51:35 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C56359A365B; Wed, 24 Sep 2014 19:51:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbalassi@apache.org To: commits@flink.incubator.apache.org Date: Wed, 24 Sep 2014 19:51:41 -0000 Message-Id: In-Reply-To: <0830c32020a9488cabe4ea045e720c5b@git.apache.org> References: <0830c32020a9488cabe4ea045e720c5b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/12] git commit: [FLINK-1103] [streaming] Updated WordCount example to become self-contained and removed old TestDataUtil X-Virus-Checked: Checked by ClamAV on apache.org [FLINK-1103] [streaming] Updated WordCount example to become self-contained and removed old TestDataUtil Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/30ac9fe6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/30ac9fe6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/30ac9fe6 Branch: refs/heads/master Commit: 30ac9fe650b833bea2a9ee61b7b2f34f6181eb6d Parents: 2dc5437 Author: mbalassi Authored: Wed Sep 24 18:08:49 2014 +0200 Committer: mbalassi Committed: Wed Sep 24 19:54:39 2014 +0200 ---------------------------------------------------------------------- .../flink/streaming/util/TestDataUtil.java | 118 --------------- .../flink/streaming/util/TestDataUtilTest.java | 44 ------ .../flink-streaming-examples/pom.xml | 6 + .../streaming/examples/wordcount/WordCount.java | 149 +++++++++++++++++++ .../examples/wordcount/WordCountLocal.java | 59 -------- .../testdata_checksum/ASTopology.data.md5 | 1 - .../testdata_checksum/MovieLens100k.data.md5 | 1 - .../resources/testdata_checksum/hamlet.txt.md5 | 1 - .../testdata_checksum/terainput.txt.md5 | 1 - 9 files changed, 155 insertions(+), 225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java deleted file mode 100644 index ad42f1f..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.MalformedURLException; -import java.net.URL; - -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestDataUtil { - - // TODO: Exception handling - // TODO: check checksum after download - private static final Logger LOG = LoggerFactory.getLogger(TestDataUtil.class); - public static final String testDataDir = "src/test/resources/testdata/"; - public static final String testRepoUrl = "http://info.ilab.sztaki.hu/~mbalassi/flink-streaming/testdata/"; - public static final String testChekSumDir = "src/test/resources/testdata_checksum/"; - - public static void downloadIfNotExists(String fileName) { - - File file = new File(testDataDir + fileName); - File checkFile = new File(testChekSumDir + fileName + ".md5"); - String checkSumDesired = new String(); - String checkSumActaul = new String(); - - File testDataDirectory = new File(testDataDir); - testDataDirectory.mkdirs(); - - try { - FileReader fileReader = new FileReader(checkFile); - BufferedReader bufferedReader = new BufferedReader(fileReader); - checkSumDesired = bufferedReader.readLine(); - bufferedReader.close(); - fileReader.close(); - } catch (FileNotFoundException e) { - throw new RuntimeException("File not found: " + file.getAbsolutePath(), e); - } catch (IOException e) { - throw new RuntimeException("Cannot read file: " + file.getAbsolutePath(), e); - } - - if (file.exists()) { - if (LOG.isInfoEnabled()) { - LOG.info("{} already exists.", fileName); - } - - try { - checkSumActaul = DigestUtils.md5Hex(FileUtils.readFileToByteArray(file)); - } catch (IOException e) { - throw new RuntimeException("Cannot read file to byte array: " - + file.getAbsolutePath(), e); - } - if (!checkSumActaul.equals(checkSumDesired)) { - if (LOG.isInfoEnabled()) { - LOG.info("Checksum is incorrect."); - LOG.info("Downloading file."); - } - download(fileName); - } - } else { - if (LOG.isInfoEnabled()) { - LOG.info("File does not exist."); - LOG.info("Downloading file."); - } - download(fileName); - } - - } - - public static void download(String fileName) { - if (LOG.isInfoEnabled()) { - LOG.info("downloading {}", fileName); - } - - try { - URL website = new URL(testRepoUrl + fileName); - BufferedReader bReader = new BufferedReader(new InputStreamReader(website.openStream())); - File outFile = new File(testDataDir + fileName); - BufferedWriter bWriter = new BufferedWriter(new FileWriter(outFile)); - - String line; - while ((line = bReader.readLine()) != null) { - bWriter.write(line); - bWriter.newLine(); - } - bWriter.close(); - } catch (MalformedURLException e) { - throw new RuntimeException("URL is malformed: ", e); - } catch (IOException e) { - throw new RuntimeException("Unexpected problem while downloading file " + fileName, e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestDataUtilTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestDataUtilTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestDataUtilTest.java deleted file mode 100644 index fc11bcc..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestDataUtilTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; - -public class TestDataUtilTest { - - @SuppressWarnings("resource") - public boolean compareFile(String file1, String file2) throws FileNotFoundException, - IOException { - - BufferedReader myInput1 = new BufferedReader(new InputStreamReader(new FileInputStream( - file1))); - BufferedReader myInput2 = new BufferedReader(new InputStreamReader(new FileInputStream( - file2))); - - String line1, line2; - while ((line1 = myInput1.readLine()) != null && (line2 = myInput2.readLine()) != null) { - if (!line1.equals(line2)) - return false; - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml index 9c9f00d..7222879 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml +++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml @@ -44,6 +44,12 @@ under the License. org.apache.flink + flink-java-examples + ${project.version} + + + + org.apache.flink flink-streaming-connectors ${project.version} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java new file mode 100644 index 0000000..3be0c89 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.wordcount; + +import java.util.StringTokenizer; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; + +/** + * Implements the "WordCount" program that computes a simple word occurrence + * histogram over text files in a streaming fashion. + * + *

+ * The input is a plain text file with lines separated by newline characters. + * + *

+ * Usage: WordCount <text path> <result path>
+ * If no parameters are provided, the program is run with default data from + * {@link WordCountData}. + * + *

+ * This example shows how to: + *

    + *
  • write a simple Flink Streaming program. + *
  • use Tuple data types. + *
  • write and use user-defined functions. + *
+ * + */ +public class WordCount { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + // set up the execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataStream text = getTextDataStream(env); + + DataStream> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0) + .sum(1); + + // emit result + if (fileOutput) { + counts.writeAsText(outputPath, 1); + } else { + counts.print(); + } + + // execute program + env.execute(); + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Implements the string tokenizer that splits sentences into words as a + * user-defined FlatMapFunction. The function takes a line (String) and + * splits it into multiple pairs in the form of "(word,1)" (Tuple2). + */ + public static final class Tokenizer implements FlatMapFunction> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(String inTuple, Collector> out) + throws Exception { + // tokenize the line + StringTokenizer tokenizer = new StringTokenizer(inTuple); + + // emit the pairs + while (tokenizer.hasMoreTokens()) { + out.collect(new Tuple2(tokenizer.nextToken(), 1)); + } + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String textPath; + private static String outputPath; + + private static boolean parseParameters(String[] args) { + + if (args.length > 0) { + // parse input arguments + fileOutput = true; + if (args.length == 2) { + textPath = args[0]; + outputPath = args[1]; + } else { + System.err.println("Usage: WordCount "); + return false; + } + } else { + System.out.println("Executing WordCount example with built-in default data."); + System.out.println(" Provide parameters to read input data from a file."); + System.out.println(" Usage: WordCount "); + } + return true; + } + + private static DataStream getTextDataStream(StreamExecutionEnvironment env) { + if (fileOutput) { + // read the text file from given input path + return env.readTextFile(textPath); + } else { + // get default test text data + return env.fromElements(WordCountData.WORDS); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java deleted file mode 100644 index 9ffeeb1..0000000 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.wordcount; - -import java.util.StringTokenizer; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.TestDataUtil; -import org.apache.flink.util.Collector; - -// This example will count the occurrence of each word in the input file. -public class WordCountLocal { - - public static class WordCountSplitter implements - FlatMapFunction> { - private static final long serialVersionUID = 1L; - - @Override - public void flatMap(String inTuple, Collector> out) - throws Exception { - StringTokenizer tokenizer = new StringTokenizer(inTuple); - while (tokenizer.hasMoreTokens()) { - out.collect(new Tuple2(tokenizer.nextToken(), 1)); - } - } - } - - public static void main(String[] args) throws Exception { - - TestDataUtil.downloadIfNotExists("hamlet.txt"); - StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); - - DataStream> dataStream = env - .readTextFile("src/test/resources/testdata/hamlet.txt") - .flatMap(new WordCountSplitter()).groupBy(0).sum(1); - - dataStream.print(); - - env.execute(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/ASTopology.data.md5 ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/ASTopology.data.md5 b/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/ASTopology.data.md5 deleted file mode 100644 index 2c386b7..0000000 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/ASTopology.data.md5 +++ /dev/null @@ -1 +0,0 @@ -f1b947a26b33b32f1de2cdd841f7b4c8 http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/MovieLens100k.data.md5 ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/MovieLens100k.data.md5 b/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/MovieLens100k.data.md5 deleted file mode 100644 index 6499b43..0000000 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/MovieLens100k.data.md5 +++ /dev/null @@ -1 +0,0 @@ -6e47046882bad158b0efbb84cd5cb987 http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/hamlet.txt.md5 ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/hamlet.txt.md5 b/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/hamlet.txt.md5 deleted file mode 100644 index 6526a51..0000000 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/hamlet.txt.md5 +++ /dev/null @@ -1 +0,0 @@ -4bb8c10cdde12a4953250423266465cc http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/terainput.txt.md5 ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/terainput.txt.md5 b/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/terainput.txt.md5 deleted file mode 100644 index 365f210..0000000 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/terainput.txt.md5 +++ /dev/null @@ -1 +0,0 @@ -7002e15fe547614160a0df6f22a5b8d0