Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 151B518BE7 for ; Tue, 8 Mar 2016 15:06:41 +0000 (UTC) Received: (qmail 46172 invoked by uid 500); 8 Mar 2016 15:06:41 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 46105 invoked by uid 500); 8 Mar 2016 15:06:40 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 46092 invoked by uid 99); 8 Mar 2016 15:06:40 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Mar 2016 15:06:40 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id C06322C14F4 for ; Tue, 8 Mar 2016 15:06:40 +0000 (UTC) Date: Tue, 8 Mar 2016 15:06:40 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-3591) Replace Quickstart K-Means Example by Streaming Example MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-3591?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1518= 5026#comment-15185026 ]=20 ASF GitHub Bot commented on FLINK-3591: --------------------------------------- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1774#discussion_r55370936 =20 --- Diff: docs/quickstart/run_example_quickstart.md --- @@ -27,116 +27,360 @@ under the License. * This will be replaced by the TOC {:toc} =20 -This guide walks you through the steps of executing an example program= ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on= Flink.=20 -On the way, you will see the a visualization of the program, the optim= ized execution plan, and track the progress of its execution. +In this guide we will start from scratch and fo from setting up a Flin= k project and running +a streaming analysis program on a Flink cluster. + +Wikipedia provides an IRC channel where all edits to the wiki are logg= ed. We are going to +read this channel in Flink and count the number of bytes that each use= r edits within +a given window of time. This is easy enough to implement in a few minu= tes using Flink but it will +give you a good foundation from which to start building more complex a= nalysis programs on your own. + +## Setting up a Maven Project + +We are going to use a Flink Maven Archetype for creating our project s= tucture. Please +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quick= start.html) for more details +about this. For our purposes, the command to run is this: + +{% highlight bash %} +$ mvn archetype:generate\ + -DarchetypeGroupId=3Dorg.apache.flink\ + -DarchetypeArtifactId=3Dflink-quickstart-java\ + -DarchetypeVersion=3D1.0.0\ + -DgroupId=3Dwiki-edits\ + -DartifactId=3Dwiki-edits\ + -Dversion=3D0.1\ + -Dpackage=3Dwikiedits\ + -DinteractiveMode=3Dfalse\ +{% endhighlight %} + +You can edit the `groupId`, `artifactId` and `package` if you like. Wi= th the above parameters, +maven will create a project structure that looks like this: + +{% highlight bash %} +$ tree wiki-edits +wiki-edits/ +=E2=94=9C=E2=94=80=E2=94=80 pom.xml +=E2=94=94=E2=94=80=E2=94=80 src + =E2=94=94=E2=94=80=E2=94=80 main + =E2=94=9C=E2=94=80=E2=94=80 java + =E2=94=82=C2=A0=C2=A0 =E2=94=94=E2=94=80=E2=94=80 wikiedits + =E2=94=82=C2=A0=C2=A0 =E2=94=9C=E2=94=80=E2=94=80 Job.java + =E2=94=82=C2=A0=C2=A0 =E2=94=9C=E2=94=80=E2=94=80 SocketTe= xtStreamWordCount.java + =E2=94=82=C2=A0=C2=A0 =E2=94=94=E2=94=80=E2=94=80 WordCoun= t.java + =E2=94=94=E2=94=80=E2=94=80 resources + =E2=94=94=E2=94=80=E2=94=80 log4j.properties +{% endhighlight %} + +There is our `pom.xml` file that already has the Flink dependencies ad= ded in the root directory and +several example Flink programs in `src/main/java`. We can delete the e= xample programs, since +we are going to start from scratch: + +{% highlight bash %} +$ rm wiki-edits/src/main/java/wikiedits/*.java +{% endhighlight %} + +As a last step we need to add the Flink wikipedia connector as a depen= dency so that we can +use it in our program. Edit the `dependencies` section so that it look= s like this: + +{% highlight xml %} + + + org.apache.flink + flink-java + ${flink.version} + + + org.apache.flink + flink-streaming-java_2.10 + ${flink.version} + + + org.apache.flink + flink-clients_2.10 + ${flink.version} + + + org.apache.flink + flink-connector-wikiedits_2.10 + ${flink.version} + + +{% endhighlight %} + +Notice the `flink-connector-wikiedits_2.10` dependency that was added. + +## Writing a Flink Program + +It's coding time. Fire up your favorite IDE and import the Maven proje= ct or open a text editor and +create the file `src/main/wikiedits/WikipediaAnalysis.java`: + +{% highlight java %} +package wikiedits; + +public class WikipediaAnalysis { + + public static void main(String[] args) throws Exception { + + } +} +{% endhighlight %} + +I admit it's very bare bones now but we will fill it as we go. Note, t= hat I'll not give +import statements here since IDEs can add them automatically. At the e= nd of this section I'll show +the complete code with import statements if you simply want to skip ah= ead and enter that in your +editor. + +The first step in a Flink program is to create a `StreamExecutionEnvir= onment` +(or `ExecutionEnvironment` if you are writing a batch job). This can b= e used to set execution +parameters and create sources for reading from external systems. So le= t's go ahead, add +this to the main method: + +{% highlight java %} +StreamExecutionEnvironment see =3D StreamExecutionEnvironment.getExecu= tionEnvironment(); +{% endhighlight %} + +Next, we will create a source that reads from the Wikipedia IRC log: + +{% highlight java %} +DataStream edits =3D see.addSource(new WikipediaEd= itsSource()); +{% endhighlight %} + +This creates a `DataStream` of `WikipediaEditEvent` elements that we c= an further process. For +the purposes of this example we are interrested in determining the num= ber of added or removed +bytes that each user causes in a certain time window, let's say five s= econds. For this we first +have to specify that we want to key the stream on the user name, that = is to say that operations +on this should take the key into account. In our case the summation of= edited bytes in the windows +should be per unique user. For keying a Stream we have to provide a `K= eySelector`, like this: + +{% highlight java %} +KeyedStream keyedEdits =3D edits + .keyBy(new KeySelector() { + @Override + public String getKey(WikipediaEditEvent event) { + return event.getUser(); + } + }); +{% endhighlight %} + +This gives us the same Stream of `WikipediaEditEvent` that has a `Stri= ng` key that is the user. +We can now specify that we want to have windows imposed on this stream= and compute some +result based on elements in these windows. We need to specify a window= here because we are +dealing with an infinite stream of events. If you want to compute an a= ggregation on such an +infinite stream you never know when you are finished. That's where win= dows come into play, +they specify a time slice in which we should perform our computation. = In our example we will say --- End diff -- =20 I would move the explanation of why we need windows in streaming (and m= aybe a link to the windows docs) in the beginning. > Replace Quickstart K-Means Example by Streaming Example > ------------------------------------------------------- > > Key: FLINK-3591 > URL: https://issues.apache.org/jira/browse/FLINK-3591 > Project: Flink > Issue Type: Improvement > Components: Documentation > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > -- This message was sent by Atlassian JIRA (v6.3.4#6332)