Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 84EBC1194D for ; Mon, 22 Sep 2014 12:29:33 +0000 (UTC) Received: (qmail 33889 invoked by uid 500); 22 Sep 2014 12:29:33 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 33813 invoked by uid 500); 22 Sep 2014 12:29:33 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 33745 invoked by uid 99); 22 Sep 2014 12:29:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Sep 2014 12:29:33 +0000 X-ASF-Spam-Status: No, hits=-1998.9 required=5.0 tests=ALL_TRUSTED,LONGWORDS,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 22 Sep 2014 12:29:11 +0000 Received: (qmail 31460 invoked by uid 99); 22 Sep 2014 12:28:44 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Sep 2014 12:28:44 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 65B3F948A13; Mon, 22 Sep 2014 12:28:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.incubator.apache.org Date: Mon, 22 Sep 2014 12:29:07 -0000 Message-Id: <9289675ab4954999be7ed64390908308@git.apache.org> In-Reply-To: <008722037d7a42ee892e64a61277915a@git.apache.org> References: <008722037d7a42ee892e64a61277915a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [25/60] Renamed java examples package X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/util/WebLogData.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/util/WebLogData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/util/WebLogData.java deleted file mode 100644 index 3bd6c18..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/util/WebLogData.java +++ /dev/null @@ -1,428 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.example.java.relational.util; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; - -/** - * Provides the default data sets used for the Weblog Analysis example program. - * The default data sets are used, if no parameters are given to the program. - * - */ -public class WebLogData { - - public static final Object [][] DOCUMENTS = { - new Object [] {"url_0","dolor ad amet enim laoreet nostrud veniam aliquip ex nonummy diam dolore tincidunt tation exerci exerci wisi dolor nostrud "}, - new Object [] {"url_1","wisi minim adipiscing nibh adipiscing ut nibh Lorem Ut nonummy euismod nibh wisi sit consectetuer exerci sed aliquip aliquip dolore aliquam enim dolore veniam aliquam euismod suscipit ad adipiscing exerci aliquip consectetuer euismod aliquip ad exerci ex nibh ex erat exerci laoreet lobortis quis "}, - new Object [] {"url_2","diam sed convection aliquip amet commodo nonummy sed sed commodo commodo diam commodo adipiscing ad exerci magna exerci tation quis lobortis "}, - new Object [] {"url_3","exerci suscipit sed lobortis amet lobortis aliquip nibh nostrud ad convection commodo ad nibh sed minim amet ad ea ea "}, - new Object [] {"url_4","sit enim dolor quis laoreet ullamcorper veniam adipiscing ex quis commodo "}, - new Object [] {"url_5","elit aliquip ea nisl oscillations sit dolor ipsum tincidunt ullamcorper dolore enim adipiscing laoreet elit ea volutpat adipiscing ea nibh nostrud Ut aliquam veniam Lorem laoreet veniam aliquip "}, - new Object [] {"url_6","consectetuer ad sed suscipit euismod aliquip quis ullamcorper oscillations tation consectetuer tation amet suscipit nibh enim nonummy veniam commodo commodo diam euismod dolor Ut aliquip diam ex ad nonummy ad tincidunt minim exerci consectetuer veniam convection aliquam ut ut Lorem euismod sed ipsum volutpat "}, - new Object [] {"url_7","Ut volutpat veniam ut consectetuer diam ut aliquam dolor nostrud erat consectetuer adipiscing exerci consectetuer Ut ullamcorper suscipit aliquam sed dolor nisl "}, - new Object [] {"url_8","suscipit amet wisi nisl veniam lobortis sit Lorem aliquam nostrud aliquam ipsum ut laoreet suscipit Lorem laoreet editors adipiscing ullamcorper veniam erat consectetuer ut lobortis dolore elit sed tincidunt ipsum tation ullamcorper nonummy adipiscing ex ad laoreet ipsum suscipit lobortis lobortis Ut nonummy adipiscing erat volutpat aliquam "}, - new Object [] {"url_9","nonummy commodo tation editors ut quis sit quis lobortis ea dolore oscillations diam ad dolor lobortis nisl ad veniam ullamcorper quis magna volutpat sit ipsum consectetuer dolore exerci commodo magna erat enim ut suscipit "}, - new Object [] {"url_10","amet erat magna consectetuer tation tation aliquip nibh aliquam sed adipiscing ut commodo ex erat tincidunt aliquam ipsum Ut Ut sit tincidunt adipiscing suscipit minim sed erat dolor consectetuer Lorem consectetuer Lorem amet nibh diam ea ex enim suscipit wisi dolor nonummy magna enim euismod ullamcorper ut suscipit adipiscing "}, - new Object [] {"url_11","ex quis exerci tation diam elit nostrud nostrud ut ipsum elit amet diam laoreet amet consectetuer volutpat sed lobortis "}, - new Object [] {"url_12","elit suscipit sit ullamcorper ut ad erat ut dolor nostrud quis nisl enim erat dolor convection ad minim ut veniam nostrud sed editors adipiscing volutpat Ut aliquip commodo sed euismod adipiscing erat adipiscing dolore nostrud minim sed lobortis ea diam "}, - new Object [] {"url_13","enim ut quis commodo veniam minim erat lobortis ad diam ex dolor tincidunt exerci ut aliquip tincidunt minim ut magna sed enim wisi veniam oscillations Lorem consectetuer "}, - new Object [] {"url_14","nibh ipsum ullamcorper volutpat ut wisi dolor quis amet euismod quis ipsum ipsum minim tation volutpat sit exerci volutpat amet nonummy euismod veniam consectetuer sit consectetuer tincidunt nibh aliquam lobortis tation veniam ut ullamcorper wisi magna Ut volutpat consectetuer erat quis dolore ea tation "}, - new Object [] {"url_15","ad wisi sed enim aliquam oscillations nibh Lorem lobortis veniam nibh laoreet nonummy sed nibh Lorem adipiscing diam magna nostrud magna oscillations ut oscillations elit nostrud diam editors Lorem "}, - new Object [] {"url_16","nostrud volutpat veniam exerci tincidunt nostrud quis elit ipsum ea nonummy volutpat dolor elit lobortis magna nisl ut ullamcorper magna Lorem exerci nibh nisl magna editors erat aliquam aliquam ullamcorper sit aliquam sit nostrud oscillations consectetuer adipiscing suscipit convection exerci ea ullamcorper ex nisl "}, - new Object [] {"url_17","ad ex aliquam erat aliquam elit veniam laoreet ut amet amet nostrud ut adipiscing Ut Lorem suscipit ex magna ullamcorper aliquam ullamcorper ullamcorper amet amet commodo aliquam volutpat nonummy nonummy tincidunt amet tation tincidunt volutpat ut veniam nisl erat dolor enim nonummy nostrud adipiscing laoreet adipiscing "}, - new Object [] {"url_18","lobortis ipsum ex tincidunt tincidunt editors euismod consectetuer ipsum adipiscing lobortis exerci adipiscing nonummy nisl dolore nonummy erat exerci nisl ut dolore wisi volutpat lobortis magna "}, - new Object [] {"url_19","ipsum tation laoreet tation adipiscing wisi nibh diam Ut suscipit ad wisi "}, - new Object [] {"url_20","diam Lorem enim wisi ad lobortis dolor Ut ipsum amet dolore consectetuer nisl exerci nisl nonummy minim Ut erat oscillations ut Lorem nostrud dolore Ut dolore exerci ad ipsum dolore ex dolore aliquip sed aliquam ex aliquip magna amet ex dolore oscillations aliquip tation magna Ut "}, - new Object [] {"url_21","lobortis ut amet ex nisl ullamcorper tincidunt ut elit diam quis suscipit ad amet ipsum magna Ut ex tincidunt "}, - new Object [] {"url_22","amet commodo nisl ad quis lobortis ut commodo sit ut erat exerci lobortis suscipit nibh ut nostrud ut adipiscing commodo commodo quis quis nostrud nisl ipsum nostrud laoreet Lorem nostrud erat nostrud amet consectetuer laoreet oscillations wisi sit magna nibh amet "}, - new Object [] {"url_23","adipiscing suscipit suscipit aliquip suscipit consectetuer minim magna ea erat nibh sit suscipit sed dolor oscillations nonummy volutpat ut tincidunt "}, - new Object [] {"url_24","commodo sed tincidunt aliquip aliquip dolore commodo nonummy sed erat ut ex exerci dolore adipiscing tincidunt ex diam amet aliquam "}, - new Object [] {"url_25","consectetuer consectetuer exerci quis ea veniam aliquam laoreet minim ex "}, - new Object [] {"url_26","dolor exerci euismod minim magna quis erat consectetuer sed ex erat dolore quis ut oscillations ullamcorper Lorem exerci ex nibh ut exerci ullamcorper veniam nibh ut commodo ut Ut nostrud tincidunt tincidunt ad dolore Lorem ea tation enim erat nibh ut ea nonummy sed sed wisi nisl dolore "}, - new Object [] {"url_27","amet elit ea ea nostrud editors Ut nostrud amet laoreet adipiscing ut nisl nonummy tincidunt ea ipsum ex dolore dolore oscillations sit minim Ut wisi ut laoreet minim elit "}, - new Object [] {"url_28","wisi exerci volutpat Ut nostrud euismod minim Ut sit euismod ut ea magna consectetuer nisl ad minim tation nisl adipiscing Lorem aliquam quis exerci erat minim aliquip sit Lorem wisi wisi ut "}, - new Object [] {"url_29","amet sed laoreet amet aliquam minim enim tincidunt Lorem sit aliquip amet suscipit ut laoreet elit suscipit erat ut tincidunt suscipit ipsum sed euismod elit dolore euismod dolore ut dolor nostrud ipsum tincidunt commodo adipiscing aliquam ut wisi dolor dolor suscipit "}, - new Object [] {"url_30","euismod Lorem ex tincidunt amet enim minim suscipit exerci diam veniam amet nostrud ea ea "}, - new Object [] {"url_31","ex ipsum sit euismod euismod ullamcorper tincidunt ut wisi ea adipiscing sed diam tation ipsum dolor aliquam veniam nonummy aliquip aliquip Lorem ut minim nisl tation sit exerci ullamcorper Ut dolor euismod aliquam consectetuer ad nonummy commodo exerci "}, - new Object [] {"url_32","volutpat ipsum lobortis nisl veniam minim adipiscing dolor editors quis nostrud amet nostrud "}, - new Object [] {"url_33","commodo wisi aliquip ut aliquam sed nostrud ex diam ad nostrud enim ut amet enim ea ad sed tation nostrud suscipit ea magna magna Lorem amet lobortis ut quis nibh aliquam aliquam exerci aliquip lobortis consectetuer enim wisi ea nisl laoreet erat dolore "}, - new Object [] {"url_34","tincidunt adipiscing enim tation nibh Ut dolore tincidunt tation laoreet suscipit minim aliquam volutpat laoreet suscipit tincidunt nibh ut ut sit nostrud nonummy tincidunt exerci sit ad sed consectetuer minim dolor dolore laoreet nostrud nibh laoreet ea adipiscing exerci dolore ipsum "}, - new Object [] {"url_35","tation ut erat ut tation dolor Lorem laoreet Lorem elit adipiscing wisi aliquip nostrud elit Ut volutpat ea aliquam aliquip "}, - new Object [] {"url_36","lobortis enim ullamcorper adipiscing consectetuer aliquip wisi enim minim Ut minim elit elit aliquam exerci ullamcorper amet lobortis adipiscing diam laoreet consectetuer nostrud diam diam amet ut enim ullamcorper aliquip diam ut nostrud diam magna amet nonummy commodo wisi enim ullamcorper suscipit euismod dolore tincidunt magna suscipit elit "}, - new Object [] {"url_37","elit adipiscing nisl nisl ex aliquip nibh sed ut ad Lorem elit consectetuer ad volutpat lobortis amet veniam ipsum nibh ut consectetuer editors ad aliquam "}, - new Object [] {"url_38","elit quis nibh adipiscing sit consectetuer ut euismod quis tincidunt quis nisl consectetuer dolor diam suscipit quis dolore Lorem suscipit nonummy sed ex "}, - new Object [] {"url_39","nisl sit consectetuer elit oscillations enim ipsum enim nostrud adipiscing nostrud editors aliquam "}, - new Object [] {"url_40","sed wisi dolor diam commodo ullamcorper commodo nostrud ullamcorper laoreet minim dolore suscipit laoreet tation aliquip "}, - new Object [] {"url_41","ad consectetuer exerci nisl exerci amet enim diam lobortis Lorem ex volutpat volutpat nibh aliquam ut ullamcorper volutpat nostrud ut adipiscing ullamcorper "}, - new Object [] {"url_42","minim laoreet tation magna veniam ut ea sit ipsum tincidunt Ut amet ex aliquip ex euismod exerci wisi elit editors ad amet veniam ad editors "}, - new Object [] {"url_43","ut nisl ad ullamcorper nibh Ut editors exerci enim exerci ea laoreet veniam ea amet exerci volutpat amet ad "}, - new Object [] {"url_44","volutpat tincidunt enim amet sed tincidunt consectetuer ullamcorper nisl Ut adipiscing tation ad ad amet nonummy elit erat nibh Lorem erat elit laoreet consectetuer sed aliquip nostrud "}, - new Object [] {"url_45","sed aliquam ut ut consectetuer wisi euismod enim erat euismod quis exerci amet tation sit "}, - new Object [] {"url_46","lobortis oscillations tation aliquam dolore Lorem aliquip tation exerci ullamcorper aliquam aliquip lobortis ex tation dolor ut ut sed suscipit nisl ullamcorper sed editors laoreet aliquip enim dolor veniam tincidunt sed euismod tation "}, - new Object [] {"url_47","Lorem Lorem ut wisi ad ut tation consectetuer exerci convection tation ullamcorper sed dolore quis aliquam ipsum lobortis commodo nonummy "}, - new Object [] {"url_48","laoreet minim veniam nisl elit sit amet commodo ex ullamcorper suscipit aliquip laoreet convection Ut ex minim aliquam "}, - new Object [] {"url_49","lobortis nonummy minim amet sit veniam quis consectetuer tincidunt laoreet quis "}, - new Object [] {"url_50","lobortis nisl commodo dolor amet nibh editors enim magna minim elit euismod diam laoreet laoreet ad minim sed ut Ut lobortis adipiscing quis sed ut aliquam oscillations exerci tation consectetuer lobortis elit tincidunt consectetuer minim amet dolore quis aliquam Ut exerci sed aliquam quis quis ullamcorper Ut ex tincidunt "}, - new Object [] {"url_51","nostrud nisl ea erat ut suscipit Ut sit oscillations ullamcorper nonummy magna lobortis dolore editors tincidunt nostrud suscipit ex quis tation ut sit amet nostrud laoreet ex tincidunt "}, - new Object [] {"url_52","ea tation commodo elit sed ex sed quis enim nisl magna laoreet adipiscing amet sit nostrud consectetuer nibh tincidunt veniam ex veniam euismod exerci sed dolore suscipit nisl tincidunt euismod quis Ut enim euismod dolor diam exerci magna exerci ut exerci nisl "}, - new Object [] {"url_53","volutpat amet Ut lobortis dolor tation minim nonummy lobortis convection nostrud "}, - new Object [] {"url_54","ullamcorper commodo Ut amet sit nostrud aliquam ad amet wisi enim nostrud ipsum nisl veniam erat aliquam ex aliquam dolor dolor ut consectetuer euismod exerci elit exerci Ut ea minim enim consectetuer ad consectetuer nonummy convection adipiscing ad ullamcorper lobortis nonummy laoreet nonummy aliquam ullamcorper ad nostrud amet "}, - new Object [] {"url_55","wisi magna editors amet aliquam diam amet aliquip nisl consectetuer laoreet nonummy suscipit euismod diam enim tation elit ut lobortis quis euismod suscipit nostrud ea ea commodo lobortis dolore Ut nisl nostrud dolor laoreet euismod ea dolore aliquam ut Lorem exerci ex sit "}, - new Object [] {"url_56","ex dolor veniam wisi laoreet ut exerci diam ad ex ut ut laoreet ut nisl ullamcorper nisl "}, - new Object [] {"url_57","diam adipiscing Ut ut Lorem amet erat elit erat magna adipiscing euismod elit ullamcorper nostrud aliquam dolor ullamcorper sit tation tation "}, - new Object [] {"url_58","laoreet convection veniam lobortis dolore ut nonummy commodo erat lobortis veniam nostrud dolore minim commodo ut consectetuer magna erat ea dolore Lorem suscipit ex ipsum exerci sed enim ea tation suscipit enim adipiscing "}, - new Object [] {"url_59","amet ut ut Ut ad dolor quis ad magna exerci suscipit magna nibh commodo euismod amet euismod wisi diam suscipit dolore Lorem dolor ex amet exerci aliquip ut ut lobortis quis elit minim sed Lorem "}, - new Object [] {"url_60","ut ut amet ullamcorper amet euismod dolor amet elit exerci adipiscing sed suscipit sed exerci wisi diam veniam wisi suscipit ut quis nibh ullamcorper ex quis magna dolore volutpat editors minim ut sit aliquip oscillations nisl ipsum "}, - new Object [] {"url_61","nibh nostrud tincidunt lobortis adipiscing adipiscing ullamcorper ullamcorper ipsum nisl ullamcorper aliquip laoreet commodo ut tation wisi diam commodo aliquip commodo suscipit tincidunt volutpat elit enim laoreet ut nostrud ad nonummy ipsum "}, - new Object [] {"url_62","Ut ut minim enim amet euismod erat elit commodo consectetuer Ut quis dolor ex diam quis wisi tation tincidunt laoreet volutpat "}, - new Object [] {"url_63","ut erat volutpat euismod amet ea nonummy lobortis ut Ut ea veniam sed veniam nostrud "}, - new Object [] {"url_64","tation dolor suscipit minim nisl wisi consectetuer aliquip tation Ut commodo ut dolore consectetuer elit wisi nisl ipsum "}, - new Object [] {"url_65","ullamcorper nisl Lorem magna tation veniam aliquam diam amet euismod "}, - new Object [] {"url_66","euismod aliquam tincidunt Ut volutpat ea lobortis sit ut volutpat ut lobortis ut lobortis ut nisl amet dolor sed ipsum enim ullamcorper diam euismod nostrud wisi erat quis diam nibh Ut dolore sed amet tation enim diam "}, - new Object [] {"url_67","amet minim minim amet laoreet Lorem aliquam veniam elit volutpat magna adipiscing enim enim euismod laoreet sed ex sed aliquam ad ea ut adipiscing suscipit ex minim dolore minim ea laoreet nisl "}, - new Object [] {"url_68","aliquam ea volutpat ut wisi tation tation nibh nisl erat laoreet ea volutpat dolor dolor aliquam exerci quis ullamcorper aliquam ut quis suscipit "}, - new Object [] {"url_69","quis exerci ut aliquip wisi dolore magna nibh consectetuer magna tation ullamcorper lobortis sed amet adipiscing minim suscipit nibh nibh nostrud euismod enim "}, - new Object [] {"url_70","tation enim consectetuer adipiscing wisi laoreet diam aliquip nostrud elit nostrud aliquip ea minim amet diam dolore "}, - new Object [] {"url_71","consectetuer tincidunt nibh amet tation nonummy sit tation diam sed diam tation "}, - new Object [] {"url_72","Lorem ut nostrud nonummy minim quis euismod lobortis nostrud nonummy adipiscing tincidunt consectetuer ut nibh ad suscipit dolor ut elit dolore amet ut quis tation ullamcorper nonummy laoreet ullamcorper aliquam dolore convection dolor tincidunt ut ullamcorper ex dolor suscipit erat oscillations ad "}, - new Object [] {"url_73","elit Ut commodo ut ullamcorper ullamcorper ut euismod commodo diam aliquip suscipit consectetuer exerci tation nostrud ut wisi exerci sed ut elit sed volutpat Lorem nibh laoreet consectetuer ex Lorem elit aliquam commodo lobortis ad "}, - new Object [] {"url_74","quis magna laoreet commodo aliquam nisl ullamcorper veniam tation wisi consectetuer commodo consectetuer ad dolore aliquam dolor elit amet sit amet nibh commodo erat veniam aliquip dolore ad magna ad ipsum Ut exerci ea volutpat nisl amet nostrud sit "}, - new Object [] {"url_75","tincidunt suscipit sit aliquip aliquam adipiscing dolore exerci Ut suscipit ut sit laoreet suscipit wisi sit enim nonummy consectetuer dolore editors "}, - new Object [] {"url_76","veniam ullamcorper tation sit suscipit dolor suscipit veniam sit Lorem quis sed nostrud ad tincidunt elit adipiscing "}, - new Object [] {"url_77","volutpat sit amet veniam quis ipsum nibh elit enim commodo magna veniam magna convection "}, - new Object [] {"url_78","tation dolore minim elit nisl volutpat tation laoreet enim nostrud exerci dolore tincidunt aliquip Lorem ipsum nostrud quis adipiscing ullamcorper erat lobortis tation commodo Ut ipsum commodo magna ad ipsum ut enim "}, - new Object [] {"url_79","lobortis amet elit Lorem amet nonummy commodo tation ex ea amet Lorem ea nonummy commodo veniam volutpat nibh wisi ad ipsum euismod ea convection nostrud nisl erat veniam Ut aliquip ad aliquip editors wisi magna tation nostrud nonummy adipiscing ullamcorper aliquip "}, - new Object [] {"url_80","tincidunt nostrud nostrud magna ea euismod ea consectetuer nisl exerci ea dolor nisl commodo ex erat ipsum exerci suscipit ad nisl ea nonummy suscipit adipiscing laoreet sit euismod nibh adipiscing sed minim commodo amet "}, - new Object [] {"url_81","nostrud erat ut sed editors erat amet magna lobortis diam laoreet dolor amet nibh ut ipsum ipsum amet ut sed ut exerci elit suscipit wisi magna ut veniam nisl commodo enim adipiscing laoreet ad Lorem oscillations "}, - new Object [] {"url_82","quis commodo nibh nibh volutpat suscipit dolore magna tincidunt nibh ut ad ullamcorper ullamcorper quis enim ad ut tation minim laoreet veniam dolor sed tincidunt exerci exerci nostrud ullamcorper amet ut ut ullamcorper "}, - new Object [] {"url_83","sit suscipit volutpat elit tation elit sed sed dolor ex ex ipsum euismod laoreet magna lobortis ad "}, - new Object [] {"url_84","lobortis ipsum euismod enim ea tation veniam tation oscillations aliquip consectetuer euismod ut sed lobortis tation oscillations commodo euismod laoreet suscipit amet elit ullamcorper volutpat aliquam ea enim ullamcorper consectetuer laoreet tation quis ut commodo erat euismod dolor laoreet ullamcorper laoreet "}, - new Object [] {"url_85","adipiscing sit quis commodo consectetuer quis enim euismod exerci nonummy ea nostrud Ut veniam sit aliquip nisl enim "}, - new Object [] {"url_86","nostrud dolore veniam veniam wisi aliquip adipiscing diam sed quis ullamcorper "}, - new Object [] {"url_87","quis Lorem suscipit Ut nibh diam euismod consectetuer lobortis ipsum sed suscipit consectetuer euismod laoreet ut wisi nisl elit quis commodo adipiscing adipiscing suscipit aliquam nisl quis magna ipsum enim ad quis ea magna Lorem nibh ea "}, - new Object [] {"url_88","euismod commodo sed tincidunt Ut veniam consectetuer quis erat ex ea erat laoreet commodo nibh minim "}, - new Object [] {"url_89","tation diam editors Ut enim nibh Lorem volutpat quis diam suscipit exerci wisi ad "}, - new Object [] {"url_90","volutpat editors ea nibh wisi ad amet volutpat nisl ullamcorper nibh volutpat minim ex ut sit veniam Lorem consectetuer quis ad sit suscipit volutpat wisi diam sed tincidunt ipsum minim convection ea diam oscillations quis lobortis "}, - new Object [] {"url_91","enim minim nonummy ea minim euismod adipiscing editors volutpat magna sit magna ut ipsum ut "}, - new Object [] {"url_92","nisl Ut commodo amet euismod lobortis ea ea wisi commodo Lorem sit ipsum volutpat nonummy exerci erat elit exerci magna ad erat enim laoreet quis nostrud wisi ut veniam amet ullamcorper lobortis ad suscipit volutpat veniam nostrud nibh quis ipsum dolore consectetuer veniam ipsum aliquip dolore sed laoreet ipsum "}, - new Object [] {"url_93","nonummy aliquam ad lobortis Lorem erat ad tation Lorem exerci ex "}, - new Object [] {"url_94","nonummy dolore commodo exerci ex quis ut suscipit elit laoreet sit tation magna veniam ea sit nonummy veniam Lorem quis nibh aliquip exerci amet ullamcorper adipiscing erat nisl editors diam commodo ad euismod adipiscing ea suscipit exerci aliquip volutpat tation enim volutpat sit "}, - new Object [] {"url_95","sit suscipit oscillations ipsum nibh dolor ea dolore ea elit ipsum minim editors magna consectetuer ullamcorper commodo nonummy sit nostrud aliquip sit erat ullamcorper ullamcorper nibh veniam erat quis dolore nonummy "}, - new Object [] {"url_96","nostrud quis ut volutpat magna ad quis adipiscing Lorem commodo exerci laoreet magna adipiscing erat quis wisi ea ea laoreet enim convection ad dolor nisl amet nibh aliquam adipiscing tincidunt minim diam Lorem commodo adipiscing volutpat "}, - new Object [] {"url_97","laoreet laoreet suscipit nostrud dolore adipiscing volutpat Ut sed nisl diam ullamcorper ex ut ut dolor amet nostrud euismod dolore veniam veniam enim tation veniam ea minim minim volutpat tincidunt "}, - new Object [] {"url_98","quis lobortis amet wisi nostrud ipsum aliquam convection tincidunt dolore ullamcorper nibh lobortis volutpat ea nostrud oscillations minim nonummy enim ad lobortis exerci ipsum ullamcorper nibh nonummy diam amet enim veniam ut nostrud "}, - new Object [] {"url_99","aliquam wisi suscipit commodo diam amet amet magna nisl enim nostrud tation nisl nostrud nibh ut "} - }; - - public static final Object [][] RANKS = { - new Object [] {30,"url_0",43}, - new Object [] {82,"url_1",39}, - new Object [] {56,"url_2",31}, - new Object [] {96,"url_3",36}, - new Object [] {31,"url_4",36}, - new Object [] {29,"url_5",6}, - new Object [] {33,"url_6",48}, - new Object [] {66,"url_7",40}, - new Object [] {28,"url_8",51}, - new Object [] {9,"url_9",4}, - new Object [] {49,"url_10",24}, - new Object [] {26,"url_11",12}, - new Object [] {39,"url_12",46}, - new Object [] {84,"url_13",53}, - new Object [] {29,"url_14",50}, - new Object [] {21,"url_15",12}, - new Object [] {69,"url_16",34}, - new Object [] {11,"url_17",38}, - new Object [] {96,"url_18",13}, - new Object [] {56,"url_19",48}, - new Object [] {18,"url_20",36}, - new Object [] {31,"url_21",21}, - new Object [] {29,"url_22",11}, - new Object [] {71,"url_23",30}, - new Object [] {85,"url_24",48}, - new Object [] {19,"url_25",45}, - new Object [] {69,"url_26",9}, - new Object [] {20,"url_27",51}, - new Object [] {33,"url_28",46}, - new Object [] {75,"url_29",38}, - new Object [] {96,"url_30",51}, - new Object [] {73,"url_31",40}, - new Object [] {67,"url_32",16}, - new Object [] {24,"url_33",24}, - new Object [] {27,"url_34",35}, - new Object [] {33,"url_35",35}, - new Object [] {7,"url_36",22}, - new Object [] {83,"url_37",41}, - new Object [] {23,"url_38",49}, - new Object [] {41,"url_39",33}, - new Object [] {66,"url_40",38}, - new Object [] {4,"url_41",52}, - new Object [] {34,"url_42",4}, - new Object [] {28,"url_43",12}, - new Object [] {14,"url_44",14}, - new Object [] {41,"url_45",11}, - new Object [] {48,"url_46",37}, - new Object [] {75,"url_47",41}, - new Object [] {78,"url_48",3}, - new Object [] {63,"url_49",28} - }; - - - public static final Object [][] VISITS = { - new Object [] {"url_2","2003-12-17"}, - new Object [] {"url_9","2008-11-11"}, - new Object [] {"url_14","2003-11-5"}, - new Object [] {"url_46","2009-2-16"}, - new Object [] {"url_14","2004-11-9"}, - new Object [] {"url_36","2001-3-9"}, - new Object [] {"url_35","2006-8-13"}, - new Object [] {"url_22","2008-1-18"}, - new Object [] {"url_36","2002-3-9"}, - new Object [] {"url_13","2007-7-17"}, - new Object [] {"url_23","2009-6-16"}, - new Object [] {"url_16","2000-7-15"}, - new Object [] {"url_41","2002-5-10"}, - new Object [] {"url_6","2004-11-9"}, - new Object [] {"url_5","2003-6-7"}, - new Object [] {"url_22","2002-11-5"}, - new Object [] {"url_11","2007-7-21"}, - new Object [] {"url_38","2009-12-2"}, - new Object [] {"url_6","2004-11-2"}, - new Object [] {"url_46","2000-6-4"}, - new Object [] {"url_34","2003-9-2"}, - new Object [] {"url_31","2008-2-24"}, - new Object [] {"url_0","2003-2-2"}, - new Object [] {"url_47","2003-7-8"}, - new Object [] {"url_49","2009-9-13"}, - new Object [] {"url_11","2003-4-2"}, - new Object [] {"url_20","2000-6-18"}, - new Object [] {"url_38","2000-2-22"}, - new Object [] {"url_44","2009-2-17"}, - new Object [] {"url_26","2000-6-21"}, - new Object [] {"url_13","2000-11-25"}, - new Object [] {"url_47","2005-4-19"}, - new Object [] {"url_46","2008-1-7"}, - new Object [] {"url_33","2004-12-24"}, - new Object [] {"url_32","2009-2-8"}, - new Object [] {"url_26","2000-9-21"}, - new Object [] {"url_9","2002-8-18"}, - new Object [] {"url_38","2002-11-27"}, - new Object [] {"url_37","2008-2-26"}, - new Object [] {"url_1","2007-3-22"}, - new Object [] {"url_37","2002-3-20"}, - new Object [] {"url_27","2008-11-12"}, - new Object [] {"url_30","2000-12-16"}, - new Object [] {"url_48","2000-12-17"}, - new Object [] {"url_46","2008-4-16"}, - new Object [] {"url_29","2006-3-9"}, - new Object [] {"url_0","2007-7-26"}, - new Object [] {"url_46","2009-12-15"}, - new Object [] {"url_34","2002-2-13"}, - new Object [] {"url_24","2009-3-1"}, - new Object [] {"url_43","2007-11-4"}, - new Object [] {"url_3","2004-2-16"}, - new Object [] {"url_26","2000-10-26"}, - new Object [] {"url_42","2004-7-14"}, - new Object [] {"url_13","2004-9-10"}, - new Object [] {"url_21","2000-2-21"}, - new Object [] {"url_9","2006-6-5"}, - new Object [] {"url_46","2001-12-17"}, - new Object [] {"url_24","2006-12-8"}, - new Object [] {"url_25","2006-9-2"}, - new Object [] {"url_37","2002-6-26"}, - new Object [] {"url_18","2006-6-2"}, - new Object [] {"url_46","2003-5-24"}, - new Object [] {"url_32","2000-10-17"}, - new Object [] {"url_45","2002-1-12"}, - new Object [] {"url_12","2005-12-13"}, - new Object [] {"url_49","2009-3-9"}, - new Object [] {"url_31","2001-9-19"}, - new Object [] {"url_22","2002-7-9"}, - new Object [] {"url_27","2005-2-3"}, - new Object [] {"url_43","2008-7-15"}, - new Object [] {"url_20","2000-3-23"}, - new Object [] {"url_25","2002-5-8"}, - new Object [] {"url_41","2004-4-27"}, - new Object [] {"url_17","2008-7-17"}, - new Object [] {"url_26","2009-12-16"}, - new Object [] {"url_34","2006-2-10"}, - new Object [] {"url_8","2009-4-14"}, - new Object [] {"url_16","2000-2-24"}, - new Object [] {"url_2","2009-2-10"}, - new Object [] {"url_35","2003-2-24"}, - new Object [] {"url_34","2008-3-16"}, - new Object [] {"url_27","2005-1-5"}, - new Object [] {"url_8","2008-12-10"}, - new Object [] {"url_38","2009-2-11"}, - new Object [] {"url_38","2006-11-3"}, - new Object [] {"url_47","2003-2-13"}, - new Object [] {"url_8","2008-11-17"}, - new Object [] {"url_26","2009-5-11"}, - new Object [] {"url_12","2007-11-26"}, - new Object [] {"url_10","2003-1-13"}, - new Object [] {"url_8","2005-9-23"}, - new Object [] {"url_42","2001-4-5"}, - new Object [] {"url_30","2009-12-10"}, - new Object [] {"url_2","2003-1-3"}, - new Object [] {"url_2","2009-2-19"}, - new Object [] {"url_7","2000-6-25"}, - new Object [] {"url_15","2004-9-26"}, - new Object [] {"url_25","2009-10-5"}, - new Object [] {"url_23","2009-8-9"}, - new Object [] {"url_27","2004-4-3"}, - new Object [] {"url_37","2008-6-9"}, - new Object [] {"url_9","2002-5-25"}, - new Object [] {"url_43","2009-5-18"}, - new Object [] {"url_21","2008-4-19"}, - new Object [] {"url_12","2001-12-25"}, - new Object [] {"url_16","2006-9-25"}, - new Object [] {"url_27","2002-1-2"}, - new Object [] {"url_2","2009-1-21"}, - new Object [] {"url_31","2009-3-20"}, - new Object [] {"url_42","2002-3-1"}, - new Object [] {"url_31","2001-11-26"}, - new Object [] {"url_20","2003-5-15"}, - new Object [] {"url_32","2004-1-22"}, - new Object [] {"url_28","2008-9-16"}, - new Object [] {"url_27","2006-7-3"}, - new Object [] {"url_11","2008-12-26"}, - new Object [] {"url_15","2004-8-16"}, - new Object [] {"url_34","2002-10-5"}, - new Object [] {"url_44","2000-2-15"}, - new Object [] {"url_9","2000-10-23"}, - new Object [] {"url_45","2005-4-24"}, - new Object [] {"url_0","2006-8-7"}, - new Object [] {"url_48","2003-8-7"}, - new Object [] {"url_8","2007-12-13"}, - new Object [] {"url_42","2003-8-2"}, - new Object [] {"url_25","2008-3-5"}, - new Object [] {"url_3","2007-3-9"}, - new Object [] {"url_49","2003-10-7"}, - new Object [] {"url_18","2007-12-6"}, - new Object [] {"url_3","2006-7-5"}, - new Object [] {"url_27","2000-9-14"}, - new Object [] {"url_42","2002-10-20"}, - new Object [] {"url_44","2007-1-13"}, - new Object [] {"url_6","2003-1-21"}, - new Object [] {"url_40","2009-10-20"}, - new Object [] {"url_28","2009-6-17"}, - new Object [] {"url_22","2000-2-17"}, - new Object [] {"url_3","2005-1-15"}, - new Object [] {"url_9","2008-12-9"}, - new Object [] {"url_9","2005-2-19"}, - new Object [] {"url_28","2000-4-22"}, - new Object [] {"url_44","2001-9-9"}, - new Object [] {"url_43","2008-6-21"}, - new Object [] {"url_39","2008-5-9"}, - new Object [] {"url_15","2006-9-15"}, - new Object [] {"url_23","2001-12-18"}, - new Object [] {"url_14","2002-5-23"}, - new Object [] {"url_11","2007-7-11"}, - new Object [] {"url_34","2000-12-8"}, - new Object [] {"url_47","2005-7-3"}, - new Object [] {"url_38","2004-3-26"}, - new Object [] {"url_19","2003-9-14"}, - new Object [] {"url_24","2007-7-16"}, - new Object [] {"url_40","2008-8-21"}, - new Object [] {"url_17","2007-12-4"}, - new Object [] {"url_25","2006-6-24"}, - new Object [] {"url_2","2000-10-8"}, - new Object [] {"url_12","2008-6-10"}, - new Object [] {"url_11","2004-11-24"}, - new Object [] {"url_13","2005-11-3"}, - new Object [] {"url_43","2005-1-2"}, - new Object [] {"url_14","2008-6-12"}, - new Object [] {"url_43","2001-8-27"}, - new Object [] {"url_45","2000-3-3"}, - new Object [] {"url_0","2006-9-27"}, - new Object [] {"url_22","2007-12-18"}, - new Object [] {"url_25","2006-4-4"}, - new Object [] {"url_32","2001-6-25"}, - new Object [] {"url_6","2007-6-9"}, - new Object [] {"url_8","2009-10-3"}, - new Object [] {"url_15","2003-2-23"}, - new Object [] {"url_37","2000-5-6"}, - new Object [] {"url_27","2004-3-21"}, - new Object [] {"url_17","2005-6-20"}, - new Object [] {"url_2","2004-2-27"}, - new Object [] {"url_36","2005-3-16"}, - new Object [] {"url_1","2009-12-3"}, - new Object [] {"url_9","2004-4-27"}, - new Object [] {"url_18","2009-5-26"}, - new Object [] {"url_31","2000-9-21"}, - new Object [] {"url_12","2008-9-25"}, - new Object [] {"url_2","2004-2-16"}, - new Object [] {"url_28","2008-11-12"}, - new Object [] {"url_28","2001-6-26"}, - new Object [] {"url_12","2006-3-15"}, - new Object [] {"url_0","2009-3-1"}, - new Object [] {"url_36","2006-10-13"}, - new Object [] {"url_15","2004-11-5"}, - new Object [] {"url_32","2008-2-11"}, - new Object [] {"url_19","2009-8-3"}, - new Object [] {"url_2","2006-8-6"}, - new Object [] {"url_11","2009-10-13"}, - new Object [] {"url_21","2002-9-14"}, - new Object [] {"url_18","2000-11-2"}, - new Object [] {"url_35","2006-5-15"}, - new Object [] {"url_11","2006-2-18"}, - new Object [] {"url_0","2001-4-25"}, - new Object [] {"url_14","2009-4-8"}, - new Object [] {"url_16","2009-4-7"} - }; - - public static DataSet> getDocumentDataSet(ExecutionEnvironment env) { - - List> data = new ArrayList>(100); - for (Object [] document : DOCUMENTS) { - data.add(new Tuple2((String) document[0], (String) document[1])); - } - - return env.fromCollection(data); - } - - public static DataSet> getRankDataSet(ExecutionEnvironment env) { - - List> data = new ArrayList>(100); - for (Object [] rank : RANKS) { - data.add(new Tuple3((Integer) rank[0], (String) rank[1], (Integer) rank[2])); - } - return env.fromCollection(data); - } - - public static DataSet> getVisitDataSet(ExecutionEnvironment env) { - - List> data = new ArrayList>(100); - - for (Object [] visit : VISITS) { - data.add(new Tuple2((String) visit[0], (String) visit[1])); - } - return env.fromCollection(data); - - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/util/WebLogDataGenerator.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/util/WebLogDataGenerator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/util/WebLogDataGenerator.java deleted file mode 100644 index 5a8f0ac..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/util/WebLogDataGenerator.java +++ /dev/null @@ -1,211 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.example.java.relational.util; - -import java.io.FileWriter; -import java.io.IOException; -import java.util.Calendar; -import java.util.Random; - -import org.apache.flink.example.java.relational.WebLogAnalysis; - -/** - * Data generator for the {@link WebLogAnalysis} example program. - * - */ -public class WebLogDataGenerator { - - /** - * Main method to generate data for the {@link WebLogAnalysis} example program. - *

- * The generator creates to files: - *

    - *
  • {tmp.dir}/documents for the web documents - *
  • {tmp.dir}/ranks for the ranks of the web documents - *
  • {tmp.dir}/visits for the logged visits of web documents - *
- * - * @param args - *
    - *
  1. Int: Number of web documents - *
  2. Int: Number of visits - *
- */ - public static void main(String[] args) { - - // parse parameters - if (args.length < 2) { - System.out.println("WebLogDataGenerator "); - System.exit(1); - } - - int noDocs = Integer.parseInt(args[0]); - int noVisits = Integer.parseInt(args[1]); - - String[] filterKWs = { "editors", "oscillations", "convection" }; - - String[] words = { "Lorem", "ipsum", "dolor", "sit", "amet", - "consectetuer", "adipiscing", "elit", "sed", "diam", "nonummy", - "nibh", "euismod", "tincidunt", "ut", "laoreet", "dolore", - "magna", "aliquam", "erat", "volutpat", "Ut", "wisi", "enim", - "ad", "minim", "veniam", "quis", "nostrud", "exerci", "tation", - "ullamcorper", "suscipit", "lobortis", "nisl", "ut", "aliquip", - "ex", "ea", "commodo" }; - - - final String outPath = System.getProperty("java.io.tmpdir"); - - System.out.println("Generating documents files..."); - genDocs(noDocs, filterKWs, words, outPath + "/documents"); - System.out.println("Generating ranks files..."); - genRanks(noDocs, outPath + "/ranks"); - System.out.println("Generating visits files..."); - genVisits(noVisits, noDocs, outPath + "/visits"); - - System.out.println("Done!"); - } - - /** - * Generates the files for the documents relation. The entries apply the - * following format:
- * URL | Content - * - * @param noDocs - * Number of entries for the documents relation - * @param filterKeyWords - * A list of keywords that should be contained - * @param words - * A list of words to fill the entries - * @param path - * Output path for the documents relation - */ - private static void genDocs(int noDocs, String[] filterKeyWords, String[] words, String path) { - - Random rand = new Random(Calendar.getInstance().getTimeInMillis()); - - try { - FileWriter fw = new FileWriter(path); - - for (int i = 0; i < noDocs; i++) { - - int wordsInDoc = rand.nextInt(40) + 10; - // URL - StringBuilder doc = new StringBuilder("url_" + i + "|"); - for (int j = 0; j < wordsInDoc; j++) { - if (rand.nextDouble() > 0.9) { - // Approx. every 10th word is a keyword - doc.append(filterKeyWords[rand - .nextInt(filterKeyWords.length)] + " "); - } else { - // Fills up the docs file(s) with random words - doc.append(words[rand.nextInt(words.length)] + " "); - } - } - doc.append("|\n"); - - fw.write(doc.toString()); - } - fw.close(); - - } catch (IOException e) { - e.printStackTrace(); - } - } - - /** - * Generates the files for the ranks relation. The ranks entries apply the - * following format:
- * Rank | URL | Average Duration |\n - * - * @param noDocs - * Number of entries in the documents relation - * @param path - * Output path for the ranks relation - */ - private static void genRanks(int noDocs, String path) { - - Random rand = new Random(Calendar.getInstance().getTimeInMillis()); - - try { - FileWriter fw = new FileWriter(path); - - for (int i = 0; i < noDocs; i++) { - // Rank - StringBuilder rank = new StringBuilder(rand.nextInt(100) + "|"); - // URL - rank.append("url_" + i + "|"); - // Average duration - rank.append(rand.nextInt(10) + rand.nextInt(50) + "|\n"); - - fw.write(rank.toString()); - } - fw.close(); - - } catch (IOException e) { - e.printStackTrace(); - } - } - - /** - * Generates the files for the visits relation. The visits entries apply the - * following format:
- * IP Address | URL | Date (YYYY-MM-DD) | Misc. Data (e.g. User-Agent) |\n - * - * @param noVisits - * Number of entries for the visits relation - * @param noDocs - * Number of entries in the documents relation - * @param path - * Output path for the visits relation - */ - private static void genVisits(int noVisits, int noDocs, String path) { - - Random rand = new Random(Calendar.getInstance().getTimeInMillis()); - - try { - FileWriter fw = new FileWriter(path); - - for (int i = 0; i < noVisits; i++) { - - int year = 2000 + rand.nextInt(10); // yearFilter 3 - int month = rand.nextInt(12) + 1; // month between 1 and 12 - int day = rand.nextInt(27) + 1; // day between 1 and 28 - - // IP address - StringBuilder visit = new StringBuilder(rand.nextInt(256) + "." - + rand.nextInt(256) + "." + rand.nextInt(256) + "." - + rand.nextInt(256) + "|"); - // URL - visit.append("url_" + rand.nextInt(noDocs) + "|"); - // Date (format: YYYY-MM-DD) - visit.append(year + "-" + month + "-" + day + "|"); - // Miscellaneous data, e.g. User-Agent - visit.append("0.12|Mozilla Firefox 3.1|de|de|Nothing special|124|\n"); - - fw.write(visit.toString()); - } - fw.close(); - - } catch (IOException e) { - e.printStackTrace(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCount.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCount.java deleted file mode 100644 index 3e95ccd..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCount.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.example.java.wordcount; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Collector; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.example.java.wordcount.util.WordCountData; - -/** - * Implements the "WordCount" program that computes a simple word occurrence histogram - * over text files. - * - *

- * The input is a plain text file with lines separated by newline characters. - * - *

- * Usage: WordCount <text path> <result path>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * - *

- * This example shows how to: - *

    - *
  • write a simple Flink program. - *
  • use Tuple data types. - *
  • write and use user-defined functions. - *
- * - */ -@SuppressWarnings("serial") -public class WordCount { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - // set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // get input data - DataSet text = getTextDataSet(env); - - DataSet> counts = - // split up the lines in pairs (2-tuples) containing: (word,1) - text.flatMap(new Tokenizer()) - // group by the tuple field "0" and sum up tuple field "1" - .groupBy(0) - .sum(1); - - // emit result - if(fileOutput) { - counts.writeAsCsv(outputPath, "\n", " "); - } else { - counts.print(); - } - - // execute program - env.execute("WordCount Example"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Implements the string tokenizer that splits sentences into words as a user-defined - * FlatMapFunction. The function takes a line (String) and splits it into - * multiple pairs in the form of "(word,1)" (Tuple2). - */ - public static final class Tokenizer implements FlatMapFunction> { - - @Override - public void flatMap(String value, Collector> out) { - // normalize and split the line - String[] tokens = value.toLowerCase().split("\\W+"); - - // emit the pairs - for (String token : tokens) { - if (token.length() > 0) { - out.collect(new Tuple2(token, 1)); - } - } - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String textPath; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - if(args.length > 0) { - // parse input arguments - fileOutput = true; - if(args.length == 2) { - textPath = args[0]; - outputPath = args[1]; - } else { - System.err.println("Usage: WordCount "); - return false; - } - } else { - System.out.println("Executing WordCount example with built-in default data."); - System.out.println(" Provide parameters to read input data from a file."); - System.out.println(" Usage: WordCount "); - } - return true; - } - - private static DataSet getTextDataSet(ExecutionEnvironment env) { - if(fileOutput) { - // read the text file from given input path - return env.readTextFile(textPath); - } else { - // get default test text data - return WordCountData.getDefaultTextLineDataSet(env); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/util/WordCountData.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/util/WordCountData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/util/WordCountData.java deleted file mode 100644 index b7ee4b5..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/util/WordCountData.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.example.java.wordcount.util; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; - -/** - * Provides the default data sets used for the WordCount example program. - * The default data sets are used, if no parameters are given to the program. - * - */ -public class WordCountData { - - public static final String[] WORDS = new String[] { - "To be, or not to be,--that is the question:--", - "Whether 'tis nobler in the mind to suffer", - "The slings and arrows of outrageous fortune", - "Or to take arms against a sea of troubles,", - "And by opposing end them?--To die,--to sleep,--", - "No more; and by a sleep to say we end", - "The heartache, and the thousand natural shocks", - "That flesh is heir to,--'tis a consummation", - "Devoutly to be wish'd. To die,--to sleep;--", - "To sleep! perchance to dream:--ay, there's the rub;", - "For in that sleep of death what dreams may come,", - "When we have shuffled off this mortal coil,", - "Must give us pause: there's the respect", - "That makes calamity of so long life;", - "For who would bear the whips and scorns of time,", - "The oppressor's wrong, the proud man's contumely,", - "The pangs of despis'd love, the law's delay,", - "The insolence of office, and the spurns", - "That patient merit of the unworthy takes,", - "When he himself might his quietus make", - "With a bare bodkin? who would these fardels bear,", - "To grunt and sweat under a weary life,", - "But that the dread of something after death,--", - "The undiscover'd country, from whose bourn", - "No traveller returns,--puzzles the will,", - "And makes us rather bear those ills we have", - "Than fly to others that we know not of?", - "Thus conscience does make cowards of us all;", - "And thus the native hue of resolution", - "Is sicklied o'er with the pale cast of thought;", - "And enterprises of great pith and moment,", - "With this regard, their currents turn awry,", - "And lose the name of action.--Soft you now!", - "The fair Ophelia!--Nymph, in thy orisons", - "Be all my sins remember'd." - }; - - public static DataSet getDefaultTextLineDataSet(ExecutionEnvironment env) { - return env.fromElements(WORDS); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java new file mode 100644 index 0000000..1ba05a4 --- /dev/null +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java @@ -0,0 +1,337 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.java.clustering; + +import java.io.Serializable; +import java.util.Collection; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.examples.java.clustering.util.KMeansData; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.IterativeDataSet; + +/** + * This example implements a basic K-Means clustering algorithm. + * + *

+ * K-Means is an iterative clustering algorithm and works as follows:
+ * K-Means is given a set of data points to be clustered and an initial set of K cluster centers. + * In each iteration, the algorithm computes the distance of each data point to each cluster center. + * Each point is assigned to the cluster center which is closest to it. + * Subsequently, each cluster center is moved to the center (mean) of all points that have been assigned to it. + * The moved cluster centers are fed into the next iteration. + * The algorithm terminates after a fixed number of iterations (as in this implementation) + * or if cluster centers do not (significantly) move in an iteration.
+ * This is the Wikipedia entry for the K-Means Clustering algorithm. + * + *

+ * This implementation works on two-dimensional data points.
+ * It computes an assignment of data points to cluster centers, i.e., + * each data point is annotated with the id of the final cluster (center) it belongs to. + * + *

+ * Input files are plain text files and must be formatted as follows: + *

    + *
  • Data points are represented as two double values separated by a blank character. + * Data points are separated by newline characters.
    + * For example "1.2 2.3\n5.3 7.2\n" gives two data points (x=1.2, y=2.3) and (x=5.3, y=7.2). + *
  • Cluster centers are represented by an integer id and a point value.
    + * For example "1 6.2 3.2\n2 2.9 5.7\n" gives two centers (id=1, x=6.2, y=3.2) and (id=2, x=2.9, y=5.7). + *
+ * + *

+ * Usage: KMeans <points path> <centers path> <result path> <num iterations>
+ * If no parameters are provided, the program is run with default data from {@link KMeansData} and 10 iterations. + * + *

+ * This example shows how to use: + *

    + *
  • Bulk iterations + *
  • Broadcast variables in bulk iterations + *
  • Custom Java objects (PoJos) + *
+ */ +@SuppressWarnings("serial") +public class KMeans { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // set up execution environment + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataSet points = getPointDataSet(env); + DataSet centroids = getCentroidDataSet(env); + + // set number of bulk iterations for KMeans algorithm + IterativeDataSet loop = centroids.iterate(numIterations); + + DataSet newCentroids = points + // compute closest centroid for each point + .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids") + // count and sum point coordinates for each centroid + .map(new CountAppender()) + .groupBy(0).reduce(new CentroidAccumulator()) + // compute new centroids from point counts and coordinate sums + .map(new CentroidAverager()); + + // feed new centroids back into next iteration + DataSet finalCentroids = loop.closeWith(newCentroids); + + DataSet> clusteredPoints = points + // assign points to final clusters + .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); + + // emit result + if(fileOutput) { + clusteredPoints.writeAsCsv(outputPath, "\n", " "); + } else { + clusteredPoints.print(); + } + + // execute program + env.execute("KMeans Example"); + + } + + // ************************************************************************* + // DATA TYPES + // ************************************************************************* + + /** + * A simple two-dimensional point. + */ + public static class Point implements Serializable { + + public double x, y; + + public Point() {} + + public Point(double x, double y) { + this.x = x; + this.y = y; + } + + public Point add(Point other) { + x += other.x; + y += other.y; + return this; + } + + public Point div(long val) { + x /= val; + y /= val; + return this; + } + + public double euclideanDistance(Point other) { + return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y)); + } + + public void clear() { + x = y = 0.0; + } + + @Override + public String toString() { + return x + " " + y; + } + } + + /** + * A simple two-dimensional centroid, basically a point with an ID. + */ + public static class Centroid extends Point { + + public int id; + + public Centroid() {} + + public Centroid(int id, double x, double y) { + super(x,y); + this.id = id; + } + + public Centroid(int id, Point p) { + super(p.x, p.y); + this.id = id; + } + + @Override + public String toString() { + return id + " " + super.toString(); + } + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** Converts a Tuple2 into a Point. */ + public static final class TuplePointConverter implements MapFunction, Point> { + + @Override + public Point map(Tuple2 t) throws Exception { + return new Point(t.f0, t.f1); + } + } + + /** Converts a Tuple3 into a Centroid. */ + public static final class TupleCentroidConverter implements MapFunction, Centroid> { + + @Override + public Centroid map(Tuple3 t) throws Exception { + return new Centroid(t.f0, t.f1, t.f2); + } + } + + /** Determines the closest cluster center for a data point. */ + public static final class SelectNearestCenter extends RichMapFunction> { + private Collection centroids; + + /** Reads the centroid values from a broadcast variable into a collection. */ + @Override + public void open(Configuration parameters) throws Exception { + this.centroids = getRuntimeContext().getBroadcastVariable("centroids"); + } + + @Override + public Tuple2 map(Point p) throws Exception { + + double minDistance = Double.MAX_VALUE; + int closestCentroidId = -1; + + // check all cluster centers + for (Centroid centroid : centroids) { + // compute distance + double distance = p.euclideanDistance(centroid); + + // update nearest cluster if necessary + if (distance < minDistance) { + minDistance = distance; + closestCentroidId = centroid.id; + } + } + + // emit a new record with the center id and the data point. + return new Tuple2(closestCentroidId, p); + } + } + + /** Appends a count variable to the tuple. */ + public static final class CountAppender implements MapFunction, Tuple3> { + + @Override + public Tuple3 map(Tuple2 t) { + return new Tuple3(t.f0, t.f1, 1L); + } + } + + /** Sums and counts point coordinates. */ + public static final class CentroidAccumulator implements ReduceFunction> { + + @Override + public Tuple3 reduce(Tuple3 val1, Tuple3 val2) { + return new Tuple3(val1.f0, val1.f1.add(val2.f1), val1.f2 + val2.f2); + } + } + + /** Computes new centroid from coordinate sum and count of points. */ + public static final class CentroidAverager implements MapFunction, Centroid> { + + @Override + public Centroid map(Tuple3 value) { + return new Centroid(value.f0, value.f1.div(value.f2)); + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String pointsPath = null; + private static String centersPath = null; + private static String outputPath = null; + private static int numIterations = 10; + + private static boolean parseParameters(String[] programArguments) { + + if(programArguments.length > 0) { + // parse input arguments + fileOutput = true; + if(programArguments.length == 4) { + pointsPath = programArguments[0]; + centersPath = programArguments[1]; + outputPath = programArguments[2]; + numIterations = Integer.parseInt(programArguments[3]); + } else { + System.err.println("Usage: KMeans "); + return false; + } + } else { + System.out.println("Executing K-Means example with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println(" We provide a data generator to create synthetic input files for this program."); + System.out.println(" Usage: KMeans "); + } + return true; + } + + private static DataSet getPointDataSet(ExecutionEnvironment env) { + if(fileOutput) { + // read points from CSV file + return env.readCsvFile(pointsPath) + .fieldDelimiter(' ') + .includeFields(true, true) + .types(Double.class, Double.class) + .map(new TuplePointConverter()); + } else { + return KMeansData.getDefaultPointDataSet(env); + } + } + + private static DataSet getCentroidDataSet(ExecutionEnvironment env) { + if(fileOutput) { + return env.readCsvFile(centersPath) + .fieldDelimiter(' ') + .includeFields(true, true, true) + .types(Integer.class, Double.class, Double.class) + .map(new TupleCentroidConverter()); + } else { + return KMeansData.getDefaultCentroidDataSet(env); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java new file mode 100644 index 0000000..233408b --- /dev/null +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.examples.java.clustering.util; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.examples.java.clustering.KMeans.Centroid; +import org.apache.flink.examples.java.clustering.KMeans.Point; + +import java.util.LinkedList; +import java.util.List; + +/** + * Provides the default data sets used for the K-Means example program. + * The default data sets are used, if no parameters are given to the program. + * + */ +public class KMeansData { + + // We have the data as object arrays so that we can also generate Scala Data Sources from it. + public static final Object[][] CENTROIDS = new Object[][] { + new Object[] {1, -31.85, -44.77}, + new Object[]{2, 35.16, 17.46}, + new Object[]{3, -5.16, 21.93}, + new Object[]{4, -24.06, 6.81} + }; + + public static final Object[][] POINTS = new Object[][] { + new Object[] {-14.22, -48.01}, + new Object[] {-22.78, 37.10}, + new Object[] {56.18, -42.99}, + new Object[] {35.04, 50.29}, + new Object[] {-9.53, -46.26}, + new Object[] {-34.35, 48.25}, + new Object[] {55.82, -57.49}, + new Object[] {21.03, 54.64}, + new Object[] {-13.63, -42.26}, + new Object[] {-36.57, 32.63}, + new Object[] {50.65, -52.40}, + new Object[] {24.48, 34.04}, + new Object[] {-2.69, -36.02}, + new Object[] {-38.80, 36.58}, + new Object[] {24.00, -53.74}, + new Object[] {32.41, 24.96}, + new Object[] {-4.32, -56.92}, + new Object[] {-22.68, 29.42}, + new Object[] {59.02, -39.56}, + new Object[] {24.47, 45.07}, + new Object[] {5.23, -41.20}, + new Object[] {-23.00, 38.15}, + new Object[] {44.55, -51.50}, + new Object[] {14.62, 59.06}, + new Object[] {7.41, -56.05}, + new Object[] {-26.63, 28.97}, + new Object[] {47.37, -44.72}, + new Object[] {29.07, 51.06}, + new Object[] {0.59, -31.89}, + new Object[] {-39.09, 20.78}, + new Object[] {42.97, -48.98}, + new Object[] {34.36, 49.08}, + new Object[] {-21.91, -49.01}, + new Object[] {-46.68, 46.04}, + new Object[] {48.52, -43.67}, + new Object[] {30.05, 49.25}, + new Object[] {4.03, -43.56}, + new Object[] {-37.85, 41.72}, + new Object[] {38.24, -48.32}, + new Object[] {20.83, 57.85} + }; + + public static DataSet getDefaultCentroidDataSet(ExecutionEnvironment env) { + List centroidList = new LinkedList(); + for (Object[] centroid : CENTROIDS) { + centroidList.add( + new Centroid((Integer) centroid[0], (Double) centroid[1], (Double) centroid[2])); + } + return env.fromCollection(centroidList); + } + + public static DataSet getDefaultPointDataSet(ExecutionEnvironment env) { + List pointList = new LinkedList(); + for (Object[] point : POINTS) { + pointList.add(new Point((Double) point[0], (Double) point[1])); + } + return env.fromCollection(pointList); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java new file mode 100644 index 0000000..0d94e77 --- /dev/null +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.examples.java.clustering.util; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.text.DecimalFormat; +import java.util.Locale; +import java.util.Random; + +import org.apache.flink.examples.java.clustering.KMeans; + +/** + * Generates data for the {@link KMeans} example program. + */ +public class KMeansDataGenerator { + + static { + Locale.setDefault(Locale.US); + } + + private static final String CENTERS_FILE = "centers"; + private static final String POINTS_FILE = "points"; + private static final long DEFAULT_SEED = 4650285087650871364L; + private static final double DEFAULT_VALUE_RANGE = 100.0; + private static final double RELATIVE_STDDEV = 0.08; + private static final int DIMENSIONALITY = 2; + private static final DecimalFormat FORMAT = new DecimalFormat("#0.00"); + private static final char DELIMITER = ' '; + + /** + * Main method to generate data for the {@link KMeans} example program. + *

+ * The generator creates to files: + *

    + *
  • {tmp.dir}/points for the data points + *
  • {tmp.dir}/centers for the cluster centers + *
+ * + * @param args + *
    + *
  1. Int: Number of data points + *
  2. Int: Number of cluster centers + *
  3. Optional Double: Standard deviation of data points + *
  4. Optional Double: Value range of cluster centers + *
  5. Optional Long: Random seed + *
+ */ + public static void main(String[] args) throws IOException { + + // check parameter count + if (args.length < 2) { + System.out.println("KMeansDataGenerator [] [] []"); + System.exit(1); + } + + // parse parameters + final int numDataPoints = Integer.parseInt(args[0]); + final int k = Integer.parseInt(args[1]); + final double stddev = args.length > 2 ? Double.parseDouble(args[2]) : RELATIVE_STDDEV; + final double range = args.length > 3 ? Double.parseDouble(args[4]) : DEFAULT_VALUE_RANGE; + final long firstSeed = args.length > 4 ? Long.parseLong(args[4]) : DEFAULT_SEED; + + final double absoluteStdDev = stddev * range; + final Random random = new Random(firstSeed); + final String tmpDir = System.getProperty("java.io.tmpdir"); + + // the means around which data points are distributed + final double[][] means = uniformRandomCenters(random, k, DIMENSIONALITY, range); + + // write the points out + BufferedWriter pointsOut = null; + try { + pointsOut = new BufferedWriter(new FileWriter(new File(tmpDir+"/"+POINTS_FILE))); + StringBuilder buffer = new StringBuilder(); + + double[] point = new double[DIMENSIONALITY]; + int nextCentroid = 0; + + for (int i = 1; i <= numDataPoints; i++) { + // generate a point for the current centroid + double[] centroid = means[nextCentroid]; + for (int d = 0; d < DIMENSIONALITY; d++) { + point[d] = (random.nextGaussian() * absoluteStdDev) + centroid[d]; + } + writePoint(point, buffer, pointsOut); + nextCentroid = (nextCentroid + 1) % k; + } + } + finally { + if (pointsOut != null) { + pointsOut.close(); + } + } + + // write the uniformly distributed centers to a file + BufferedWriter centersOut = null; + try { + centersOut = new BufferedWriter(new FileWriter(new File(tmpDir+"/"+CENTERS_FILE))); + StringBuilder buffer = new StringBuilder(); + + double[][] centers = uniformRandomCenters(random, k, DIMENSIONALITY, range); + + for (int i = 0; i < k; i++) { + writeCenter(i + 1, centers[i], buffer, centersOut); + } + } + finally { + if (centersOut != null) { + centersOut.close(); + } + } + + System.out.println("Wrote "+numDataPoints+" data points to "+tmpDir+"/"+POINTS_FILE); + System.out.println("Wrote "+k+" cluster centers to "+tmpDir+"/"+CENTERS_FILE); + } + + private static final double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) { + final double halfRange = range / 2; + final double[][] points = new double[num][dimensionality]; + + for (int i = 0; i < num; i++) { + for (int dim = 0; dim < dimensionality; dim ++) { + points[i][dim] = (rnd.nextDouble() * range) - halfRange; + } + } + return points; + } + + private static void writePoint(double[] coordinates, StringBuilder buffer, BufferedWriter out) throws IOException { + buffer.setLength(0); + + // write coordinates + for (int j = 0; j < coordinates.length; j++) { + buffer.append(FORMAT.format(coordinates[j])); + if(j < coordinates.length - 1) { + buffer.append(DELIMITER); + } + } + + out.write(buffer.toString()); + out.newLine(); + } + + private static void writeCenter(long id, double[] coordinates, StringBuilder buffer, BufferedWriter out) throws IOException { + buffer.setLength(0); + + // write id + buffer.append(id); + buffer.append(DELIMITER); + + // write coordinates + for (int j = 0; j < coordinates.length; j++) { + buffer.append(FORMAT.format(coordinates[j])); + if(j < coordinates.length - 1) { + buffer.append(DELIMITER); + } + } + + out.write(buffer.toString()); + out.newLine(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java new file mode 100644 index 0000000..7312dd8 --- /dev/null +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.examples.java.graph; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.aggregation.Aggregations; +import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; +import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.examples.java.graph.util.ConnectedComponentsData; + +/** + * An implementation of the connected components algorithm, using a delta iteration. + * + *

+ * Initially, the algorithm assigns each vertex an unique ID. In each step, a vertex picks the minimum of its own ID and its + * neighbors' IDs, as its new ID and tells its neighbors about its new ID. After the algorithm has completed, all vertices in the + * same component will have the same ID. + * + *

+ * A vertex whose component ID did not change needs not propagate its information in the next step. Because of that, + * the algorithm is easily expressible via a delta iteration. We here model the solution set as the vertices with + * their current component ids, and the workset as the changed vertices. Because we see all vertices initially as + * changed, the initial workset and the initial solution set are identical. Also, the delta to the solution set + * is consequently also the next workset.
+ * + *

+ * Input files are plain text files and must be formatted as follows: + *

    + *
  • Vertices represented as IDs and separated by new-line characters.
    + * For example "1\n2\n12\n42\n63\n" gives five vertices (1), (2), (12), (42), and (63). + *
  • Edges are represented as pairs for vertex IDs which are separated by space + * characters. Edges are separated by new-line characters.
    + * For example "1 2\n2 12\n1 12\n42 63\n" gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63). + *
+ * + *

+ * Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>
+ * If no parameters are provided, the program is run with default data from {@link ConnectedComponentsData} and 10 iterations. + * + *

+ * This example shows how to use: + *

    + *
  • Delta Iterations + *
  • Generic-typed Functions + *
+ */ +@SuppressWarnings("serial") +public class ConnectedComponents implements ProgramDescription { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String... args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // set up execution environment + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // read vertex and edge data + DataSet vertices = getVertexDataSet(env); + DataSet> edges = getEdgeDataSet(env).flatMap(new UndirectEdge()); + + // assign the initial components (equal to the vertex id) + DataSet> verticesWithInitialId = vertices.map(new DuplicateValue()); + + // open a delta iteration + DeltaIteration, Tuple2> iteration = + verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0); + + // apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller + DataSet> changes = iteration.getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin()) + .groupBy(0).aggregate(Aggregations.MIN, 1) + .join(iteration.getSolutionSet()).where(0).equalTo(0) + .with(new ComponentIdFilter()); + + // close the delta iteration (delta and new workset are identical) + DataSet> result = iteration.closeWith(changes, changes); + + // emit result + if(fileOutput) { + result.writeAsCsv(outputPath, "\n", " "); + } else { + result.print(); + } + + // execute program + env.execute("Connected Components Example"); + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Function that turns a value into a 2-tuple where both fields are that value. + */ + @ConstantFields("0 -> 0,1") + public static final class DuplicateValue implements MapFunction> { + + @Override + public Tuple2 map(T vertex) { + return new Tuple2(vertex, vertex); + } + } + + /** + * Undirected edges by emitting for each input edge the input edges itself and an inverted version. + */ + public static final class UndirectEdge implements FlatMapFunction, Tuple2> { + Tuple2 invertedEdge = new Tuple2(); + + @Override + public void flatMap(Tuple2 edge, Collector> out) { + invertedEdge.f0 = edge.f1; + invertedEdge.f1 = edge.f0; + out.collect(edge); + out.collect(invertedEdge); + } + } + + /** + * UDF that joins a (Vertex-ID, Component-ID) pair that represents the current component that + * a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function + * produces a (Target-vertex-ID, Component-ID) pair. + */ + @ConstantFieldsFirst("1 -> 0") + @ConstantFieldsSecond("1 -> 1") + public static final class NeighborWithComponentIDJoin implements JoinFunction, Tuple2, Tuple2> { + + @Override + public Tuple2 join(Tuple2 vertexWithComponent, Tuple2 edge) { + return new Tuple2(edge.f1, vertexWithComponent.f1); + } + } + + + + @ConstantFieldsFirst("0") + public static final class ComponentIdFilter implements FlatJoinFunction, Tuple2, Tuple2> { + + @Override + public void join(Tuple2 candidate, Tuple2 old, Collector> out) { + if (candidate.f1 < old.f1) { + out.collect(candidate); + } + } + } + + + + @Override + public String getDescription() { + return "Parameters: "; + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String verticesPath = null; + private static String edgesPath = null; + private static String outputPath = null; + private static int maxIterations = 10; + + private static boolean parseParameters(String[] programArguments) { + + if(programArguments.length > 0) { + // parse input arguments + fileOutput = true; + if(programArguments.length == 4) { + verticesPath = programArguments[0]; + edgesPath = programArguments[1]; + outputPath = programArguments[2]; + maxIterations = Integer.parseInt(programArguments[3]); + } else { + System.err.println("Usage: ConnectedComponents "); + return false; + } + } else { + System.out.println("Executing Connected Components example with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println(" Usage: ConnectedComponents "); + } + return true; + } + + private static DataSet getVertexDataSet(ExecutionEnvironment env) { + + if(fileOutput) { + return env.readCsvFile(verticesPath).types(Long.class) + .map( + new MapFunction, Long>() { + public Long map(Tuple1 value) { return value.f0; } + }); + } else { + return ConnectedComponentsData.getDefaultVertexDataSet(env); + } + } + + private static DataSet> getEdgeDataSet(ExecutionEnvironment env) { + + if(fileOutput) { + return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class); + } else { + return ConnectedComponentsData.getDefaultEdgeDataSet(env); + } + } + + +}