flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3591) Replace Quickstart K-Means Example by Streaming Example
Date Tue, 08 Mar 2016 15:13:41 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15185050#comment-15185050
] 

ASF GitHub Bot commented on FLINK-3591:
---------------------------------------

Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55371996
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering))
on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution
plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going
to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink
but it will
    +give you a good foundation from which to start building more complex analysis programs
on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for
more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root
directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs,
since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we
can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text
editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section
I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that
in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process.
For
    +the purposes of this example we are interrested in determining the number of added or
removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this
we first
    +have to specify that we want to key the stream on the user name, that is to say that
operations
    +on this should take the key into account. In our case the summation of edited bytes in
the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like
this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is
the user.
    +We can now specify that we want to have windows imposed on this stream and compute some
    +result based on elements in these windows. We need to specify a window here because we
are
    +dealing with an infinite stream of events. If you want to compute an aggregation on such
an
    +infinite stream you never know when you are finished. That's where windows come into
play,
    +they specify a time slice in which we should perform our computation. In our example
we will say
    +that we want to aggregate the sum of edited bytes for every five seconds:
    +
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> result = keyedEdits
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String,
Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent
event) {
    +            acc.f0 = event.getUser();
    +            acc.f1 += event.getByteDiff();
    +            return acc;
    +        }
    +    });
    +{% endhighlight %}
    +
    +The first call, `.window()`, specified that we want to have tumbling (non-overlapping)
windows
    +of five seconds. The second call specifies a *Fold transformation* on each window slice
for
    +each unique key. In our case we start from an initial value of `("", 0L)` and add to
it the byte
    +difference of every edit in that time window for a user. The resulting Stream now contains
    +a `Tuple2<String, Long>` for every user which gets emitted every five seconds.
    +
    +The only thing left to do is print the stream to the console and start execution:
    +
    +{% highlight java %}
    +result.print();
    +
    +see.execute();
    +{% endhighlight %}
    +
    +That last call is necessary to start the actual Flink job. All operations, such as creating
    +sources, transformations and sinks only build up a graph of internal operations. Only
when
    +`execute()` is called is this graph of operations thrown on a cluster or executed on
your local
    +machine.
    +
    +The complete code so far is this:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +import org.apache.flink.api.common.functions.FoldFunction;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    +import org.apache.flink.streaming.api.windowing.time.Time;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
    +
    +public class WikipediaAnalysis {
    +
    +  public static void main(String[] args) throws Exception {
    +
    +    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +
    +    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +          return event.getUser();
    +        }
    +      });
    +
    +    DataStream<Tuple2<String, Long>> result = keyedEdits
    +      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String,
Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent
event) {
    +          acc.f0 = event.getUser();
    +          acc.f1 += event.getByteDiff();
    +          return acc;
    +        }
    +      });
    +
    +    result.print();
    +
    +    see.execute();
    +  }
    +}
    +{% endhighlight %}
    +
    +You can run this example in your IDE or on the commandline, using Maven:
    +
    +{% highlight bash %}
    +$ mvn clean package
    +$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis
    +{% endhighlight %}
    +
    +The first command build our project and the second executes our main class. The output
should be
    +similar to this:
    +
    +{% highlight bash %}
    +1> (Fenix down,114)
    +6> (AnomieBOT,155)
    +8> (BD2412bot,-3690)
    +7> (IgnorantArmies,49)
    +3> (Ckh3111,69)
    +5> (Slade360,0)
    +7> (Narutolovehinata5,2195)
    +6> (Vuyisa2001,79)
    +4> (Ms Sarah Welch,269)
    +4> (KasparBot,-245)
    +{% endhighlight %}
    +
    +The number in front of each line tells you on which parallel instance of the print sink
the output
    +was produced.
    +
    +This should get you started with writing your own Flink programs. You can check out our
guides
    +about [basic concepts]{{{ site.baseurl }}/apis/common/index.html} and the
    +[DataStream API]{{{ site.baseurl }}/apis/streaming/index.html} if you want to learn more.
Stick
    +around for the bonus exercise if you want to learn about setting up a Flink cluster on
    +your own machine and writing results to [Kafka](http://kafka.apache.org).
    +
    +## Bonus Exercise: Running on a Cluster and Writing to Kafka
    +
    +Please follow our [setup quickstart](setup_quickstart.html) for setting up a Flink distribution
    +on your machine and refer to the [Kafka quickstart](https://kafka.apache.org/documentation.html#quickstart)
    +for setting up a Kafka installation before we proceed.
    +
    +As a first step, we have to add the Flink Kafka connector as a dependency so that we
can
    +use the Kafka sink. Add this to the `pom.xml` file in the dependencies section:
    +
    +{% highlight xml %}
    +<dependency>
    +    <groupId>org.apache.flink</groupId>
    +    <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
    +    <version>${flink.version}</version>
    +</dependency>
    +{% endhighlight %}
    +
    +Next, we need to modify our program. We'll remove the `print()` sink and instead use
a
    +Kafka sink. The new code looks like this:
    +
    +{% highlight java %}
    +
    +result
    +    .map(new MapFunction<Tuple2<String,Long>, String>() {
    +        @Override
    +        public String map(Tuple2<String, Long> tuple) {
    +            return tuple.toString();
    +        }
    +    })
    +    .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));
    +{% endhighlight %}
    +
    +Note how we first transform the Stream of `Tuple2<String, Long>` to a Stream of
`String` using
    +a MapFunction. We are doing this because it is easier to write plain strings to Kafka.
Then,
    +we create a Kafka sink. You might have to adapt the hostname and port to your setup,
`"wiki-result"`
    --- End diff --
    
    Full stop at "setup"?


> Replace Quickstart K-Means Example by Streaming Example
> -------------------------------------------------------
>
>                 Key: FLINK-3591
>                 URL: https://issues.apache.org/jira/browse/FLINK-3591
>             Project: Flink
>          Issue Type: Improvement
>          Components: Documentation
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message