flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [09/12] git commit: [doc] [streaming] Added Twitter connector & Projection operator to docs
Date Wed, 24 Sep 2014 19:51:42 GMT
[doc] [streaming] Added Twitter connector & Projection operator to docs


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2dc5437a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2dc5437a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2dc5437a

Branch: refs/heads/master
Commit: 2dc5437a288d80dab5e10571c1bed630de79e1d3
Parents: 1bca326
Author: mbalassi <balassi.marton@gmail.com>
Authored: Wed Sep 24 16:25:11 2014 +0200
Committer: mbalassi <balassi.marton@gmail.com>
Committed: Wed Sep 24 19:54:39 2014 +0200

----------------------------------------------------------------------
 docs/streaming_guide.md | 365 +++++++++++++++++++++++++------------------
 1 file changed, 216 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2dc5437a/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index f1b4189..27a32ba 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -22,9 +22,9 @@ Add the following dependency to your `pom.xml` to use the Flink Streaming.
 
 ~~~xml
 <dependency>
-	<groupId>org.apache.flink</groupId>
-	<artifactId>flink-streaming-core</artifactId>
-	<version>{{site.FLINK_VERSION_STABLE}}</version>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-streaming-core</artifactId>
+    <version>{{site.FLINK_VERSION_STABLE}}</version>
 </dependency>
 ~~~
 
@@ -39,32 +39,31 @@ The following program is a complete, working example of streaming WordCount.
You
 ~~~java
 public class StreamingWordCount {
 
-	
-	public static void main(String[] args) {
+    public static void main(String[] args) {
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-		
-		DataStream<Tuple2<String, Integer>> dataStream = env
-				.fromElements("Who's there?",
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+        
+        DataStream<Tuple2<String, Integer>> dataStream = env
+                .fromElements("Who's there?",
             "I think I hear them. Stand, ho! Who's there?")
-				.flatMap(new Splitter())
-				.groupBy(0)
-				.sum(1);
-		
-		dataStream.print();
-		
-		env.execute();
-	}
-	
-	public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>>
{
-		@Override
-		public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out)
throws Exception {
-			for (String word: sentence.split(" ")) {
-				out.collect(new Tuple2<String, Integer>(word, 1));
-			}
-		}
-	}
-	
+                .flatMap(new Splitter())
+                .groupBy(0)
+                .sum(1);
+        
+        dataStream.print();
+        
+        env.execute();
+    }
+    
+    public static class Splitter implements FlatMapFunction<String, Tuple2<String,
Integer>> {
+        @Override
+        public void flatMap(String sentence, Collector<Tuple2<String, Integer>>
out) throws Exception {
+            for (String word: sentence.split(" ")) {
+                out.collect(new Tuple2<String, Integer>(word, 1));
+            }
+        }
+    }
+    
 }
 ~~~
 
@@ -156,7 +155,8 @@ Besides the pre-defined solutions the user can implement their own source
by imp
 
 ### Sinks
 
-`DataStreamSink` represents the different outputs of a Flink Streaming program. Every `DataStream`
in a streaming program needs to be either transformed or closed down with a sink. There are
several pre-defined implementations `DataStreamSink` available right away:
+`DataStreamSink` represents the different outputs of a Flink Streaming program. There are
several pre-defined implementations `DataStreamSink` available right away:
+
  * `dataStream.print()` – Writes the DataStream to the standard output, practical for testing
purposes
  * `dataStream.writeAsText(parameters)` – Writes the DataStream to a text file
  * `dataStream.writeAsCsv(parameters)` – Writes the DataStream to CSV format
@@ -180,11 +180,11 @@ A map operator that doubles the values of the input stream:
 
 ~~~java
 dataStream.map(new MapFunction<Integer, Integer>() {
-			@Override
-			public Integer map(Integer value) throws Exception {
-				return 2 * value;
-			}
-		})
+            @Override
+            public Integer map(Integer value) throws Exception {
+                return 2 * value;
+            }
+        })
 ~~~
 
 #### FlatMap
@@ -193,13 +193,13 @@ A flatmap operator that splits sentences to words:
 
 ~~~java
 dataStream.flatMap(new FlatMapFunction<String, String>() {
-			@Override
-			public void flatMap(String value, Collector<String> out) throws Exception {
-				for(String word: value.split(" ")){
-					out.collect(word);
-				}
-			}
-		})
+            @Override
+            public void flatMap(String value, Collector<String> out) throws Exception
{
+                for(String word: value.split(" ")){
+                    out.collect(word);
+                }
+            }
+        })
 ~~~
 
 #### Filter
@@ -207,12 +207,12 @@ The Filter transformation applies a user-defined `FilterFunction` on
each elemen
 A filter that filters out zero values:
 
 ~~~java
-dataStream.filter(new FilterFunction<Integer>() {		
-			@Override
-			public boolean filter(Integer value) throws Exception {
-				return value != 0;
-			}
-		})
+dataStream.filter(new FilterFunction<Integer>() { 
+            @Override
+            public boolean filter(Integer value) throws Exception {
+                return value != 0;
+            }
+        })
 ~~~
 
 #### Reduce
@@ -221,11 +221,11 @@ A reducer that sums up the incoming stream:
 
 ~~~java
 dataStream.reduce(new ReduceFunction<Integer>() {
-			@Override
-			public Integer reduce(Integer value1, Integer value2) throws Exception {
-				return value1+value2;
-			}
-		})
+            @Override
+            public Integer reduce(Integer value1, Integer value2) throws Exception {
+                return value1+value2;
+            }
+        })
 ~~~
 
 #### Merge
@@ -286,20 +286,20 @@ A CoMap operator that outputs true if an Integer value is received and
false if
 ~~~java
 DataStream<Integer> dataStream1 = ...
 DataStream<String> dataStream2 = ...
-		
+        
 dataStream1.connect(dataStream2)
-	.map(new CoMapFunction<Integer, String, Boolean>() {
-			
-			@Override
-			public Boolean map1(Integer value) {
-				return true;
-			}
-			
-			@Override
-			public Boolean map2(String value) {
-				return false;
-			}
-		})
+    .map(new CoMapFunction<Integer, String, Boolean>() {
+            
+            @Override
+            public Boolean map1(Integer value) {
+                return true;
+            }
+            
+            @Override
+            public Boolean map2(String value) {
+                return false;
+            }
+        })
 ~~~
 
 #### FlatMap on ConnectedDataStream
@@ -308,20 +308,20 @@ The FlatMap operator for the `ConnectedDataStream` works similarly to
CoMap, but
 ~~~java
 DataStream<Integer> dataStream1 = ...
 DataStream<String> dataStream2 = ...
-		
+        
 dataStream1.connect(dataStream2)
-	.flatMap(new CoFlatMapFunction<Integer, String, Boolean>() {
+    .flatMap(new CoFlatMapFunction<Integer, String, Boolean>() {
 
-			@Override
-			public void flatMap1(Integer value, Collector<Boolean> out) {
-				out.collect(true);
-			}
+            @Override
+            public void flatMap1(Integer value, Collector<Boolean> out) {
+                out.collect(true);
+            }
 
-			@Override
-			public void flatMap2(String value, Collector<Boolean> out) {
-				out.collect(false);
-			}
-		})
+            @Override
+            public void flatMap2(String value, Collector<Boolean> out) {
+                out.collect(false);
+            }
+        })
 ~~~
 
 #### Reduce on ConnectedDataStream
@@ -431,7 +431,7 @@ operator.setMutability(isMutable)
 ~~~
 
 [Back to top](#top)
-	
+    
 Stream connectors
 ----------------
 
@@ -452,6 +452,7 @@ This connector provides access to data streams from [Apache Kafka](https://kafka
 
 #### Kafka Source
 An abstract class providing an interface for receiving data from Kafka. By implementing the
user must:
+
  * Write a constructor calling the constructor of the abstract class,
  * Write a deserializer function which processes the data coming from Kafka,
  * Stop the source manually when necessary with one of the close functions.
@@ -463,7 +464,7 @@ An example of an implementation of a constructor:
 
 ~~~java
 public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
-	super(zkQuorum, groupId, topicId, numThreads);
+    super(zkQuorum, groupId, topicId, numThreads);
 }
 ~~~
 
@@ -473,11 +474,11 @@ An example of an implementation of a deserializer:
 ~~~java
 @Override
 public String deserialize(byte[] msg) {
-	String s = new String(msg);
-	if(s.equals("q")){
-		closeWithoutSend();
-	}
-	return new String(s);
+    String s = new String(msg);
+    if(s.equals("q")){
+        closeWithoutSend();
+    }
+    return new String(s);
 }
 ~~~
 
@@ -490,6 +491,7 @@ In the example provided `closeWithoutSend()` is used because here the
String `"q
 
 #### Kafka Sink
 An abstract class providing an interface for sending data to Kafka. By implementing the user
must:
+
  * Write a constructor calling the constructor of the abstract class,
  * Write a serializer function to send data in the desired form to Kafka,
  * Stop the sink manually when necessary with one of the close functions.
@@ -501,7 +503,7 @@ An example of an implementation of a constructor:
 
 ~~~java
 public MyKafkaSink(String topicId, String brokerAddr) {
-	super(topicId, brokerAddr);
+    super(topicId, brokerAddr);
 }
 ~~~
 
@@ -511,10 +513,10 @@ An example of an implementation of a serializer:
 ~~~java
 @Override
 public String serialize(String tuple) {
-	if(tuple.equals("q")){
-		sendAndClose();
-	}
-	return tuple;
+    if(tuple.equals("q")){
+        sendAndClose();
+    }
+    return tuple;
 }
 ~~~
 
@@ -526,8 +528,8 @@ To use a Kafka connector as a source in Flink call the `addSource()` function
wi
 
 ```java
 DataStream<String> stream1 = env.
-	addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
-	.print();
+    addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
+    .print();
 ~~~
 
 The followings have to be provided for the `MyKafkaSource()` constructor in order:
@@ -541,8 +543,8 @@ Similarly to use a Kafka connector as a sink in Flink call the `addSink()`
funct
 
 ```java
 DataStream<String> stream2 = env
-	.addSource(new MySource())
-	.addSink(new MyKafkaSink("test", "localhost:9092"));
+    .addSource(new MySource())
+    .addSink(new MyKafkaSink("test", "localhost:9092"));
 ~~~
 
 The followings have to be provided for the `MyKafkaSink()` constructor in order:
@@ -563,6 +565,7 @@ This connector provides access to datastreams from [Apache Flume](http://flume.a
 
 #### Flume Source
 An abstract class providing an interface for receiving data from Flume. By implementing the
user must:
+
  * Write a constructor calling the constructor of the abstract class,
  * Write a deserializer function which processes the data coming from Flume,
  * Stop the source manually when necessary with one of the close functions.
@@ -574,7 +577,7 @@ An example of an implementation of a constructor:
 
 ~~~java
 MyFlumeSource(String host, int port) {
-	super(host, port);
+    super(host, port);
 }
 ~~~
 
@@ -584,12 +587,12 @@ An example of an implementation of a deserializer:
 ~~~java
 @Override
 public String deserialize(byte[] msg) {
-	String s = (String) SerializationUtils.deserialize(msg);
-	String out = s;
-	if (s.equals("q")) {
-		closeWithoutSend();
-	}
-	return out;
+    String s = (String) SerializationUtils.deserialize(msg);
+    String out = s;
+    if (s.equals("q")) {
+        closeWithoutSend();
+    }
+    return out;
 }
 ~~~
 
@@ -602,6 +605,7 @@ In the example `closeWithoutSend()` is used because here the String `"q"`
is met
 
 #### Flume Sink
 An abstract class providing an interface for sending data to Flume. By implementing the user
must:
+
 * Write a constructor calling the constructor of the abstract class,
 * Write a serializer function to send data in the desired form to Flume,
 * Stop the sink manually when necessary with one of the close functions.
@@ -613,7 +617,7 @@ An example of an implementation of a constructor:
 
 ~~~java
 public MyFlumeSink(String host, int port) {
-	super(host, port);
+    super(host, port);
 }
 ~~~
 
@@ -623,15 +627,15 @@ An example of an implementation of a serializer.
 ~~~java
 @Override
 public byte[] serialize(String tuple) {
-	if (tuple.equals("q")) {
-		try {
-			sendAndClose();
-		} catch (Exception e) {
-			new RuntimeException("Error while closing Flume connection with " + port + " at "
-				+ host, e);
-		}
-	}
-	return SerializationUtils.serialize(tuple);
+    if (tuple.equals("q")) {
+        try {
+            sendAndClose();
+        } catch (Exception e) {
+            new RuntimeException("Error while closing Flume connection with " + port + "
at "
+                + host, e);
+        }
+    }
+    return SerializationUtils.serialize(tuple);
 }
 ~~~
 
@@ -643,8 +647,8 @@ To use a Flume connector as a source in Flink call the `addSource()` function
wi
 
 ```java
 DataStream<String> dataStream1 = env
-	.addSource(new MyFlumeSource("localhost", 41414))
-	.print();
+    .addSource(new MyFlumeSource("localhost", 41414))
+    .print();
 ~~~
 
 The followings have to be provided for the `MyFlumeSource()` constructor in order:
@@ -656,8 +660,8 @@ Similarly to use a Flume connector as a sink in Flink call the `addSink()`
funct
 
 ```java
 DataStream<String> dataStream2 = env
-	.fromElements("one", "two", "three", "four", "five", "q")
-	.addSink(new MyFlumeSink("localhost", 42424));
+    .fromElements("one", "two", "three", "four", "five", "q")
+    .addSink(new MyFlumeSink("localhost", 42424));
 ~~~
 
 The followings have to be provided for the `MyFlumeSink()` constructor in order:
@@ -719,7 +723,7 @@ An example of an implementation of a constructor:
 
 ~~~java
 public MyRMQSource(String HOST_NAME, String QUEUE_NAME) {
-	super(HOST_NAME, QUEUE_NAME);
+    super(HOST_NAME, QUEUE_NAME);
 }
 ~~~
 
@@ -729,12 +733,12 @@ An example of an implemetation of a deserializer:
 ~~~java
 @Override
 public String deserialize(byte[] t) {
-	String s = (String) SerializationUtils.deserialize(t);
-	String out = s;
-	if (s.equals("q")) {
-		closeWithoutSend();
-	}
-	return out;
+    String s = (String) SerializationUtils.deserialize(t);
+    String out = s;
+    if (s.equals("q")) {
+        closeWithoutSend();
+    }
+    return out;
 }
 ~~~
 
@@ -749,6 +753,7 @@ In the example `closeWithoutSend()` is used because here the String `"q"`
is met
 
 #### RabbitMQ Sink
 An abstract class providing an interface for sending data to RabbitMQ. By implementing the
user must:
+
 * Write a constructor calling the constructor of the abstract class
 * Write a serializer function to send data in the desired form to RabbitMQ
 * Stop the sink manually when necessary with one of the close functions
@@ -760,7 +765,7 @@ An example of an implementation of a constructor:
 
 ~~~java
 public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
-	super(HOST_NAME, QUEUE_NAME);
+    super(HOST_NAME, QUEUE_NAME);
 }
 ~~~
 
@@ -770,10 +775,10 @@ An example of an implementation of a serializer.
 ~~~java
 @Override
 public byte[] serialize(Tuple tuple) {
-	if (t.getField(0).equals("q")) {
-		sendAndClose();
-	}
-	return SerializationUtils.serialize(tuple.f0);
+    if (t.getField(0).equals("q")) {
+        sendAndClose();
+    }
+    return SerializationUtils.serialize(tuple.f0);
 }
 ~~~
 
@@ -784,10 +789,9 @@ The API provided is the [same](#rmq_source_close) as the one for `RabbitMQSource
 To use a RabbitMQ connector as a source in Flink call the `addSource()` function with a new
instance of the class which extends `RabbitMQSource` as parameter:
 
 ~~~java
-@SuppressWarnings("unused")
 DataStream<String> dataStream1 = env
-	.addSource(new MyRMQSource("localhost", "hello"))
-	.print();
+    .addSource(new MyRMQSource("localhost", "hello"))
+    .print();
 ~~~
 
 The followings have to be provided for the `MyRabbitMQSource()` constructor in order:
@@ -799,8 +803,8 @@ Similarly to use a RabbitMQ connector as a sink in Flink call the `addSink()`
fu
 
 ```java
 DataStream<String> dataStream2 = env
-	.fromElements("one", "two", "three", "four", "five", "q")
-	.addSink(new MyRMQSink("localhost", "hello"));
+    .fromElements("one", "two", "three", "four", "five", "q")
+    .addSink(new MyRMQSink("localhost", "hello"));
 ~~~
 
 The followings have to be provided for the `MyRabbitMQSink()` constructor in order:
@@ -812,13 +816,79 @@ More about RabbitMQ can be found [here](http://www.rabbitmq.com/).
 
 [Back to top](#top)
 
+### Twitter Streaming API
+
+Twitter Streaming API provides opportunity to connect to the stream of tweets made available
by Twitter. Flink Streaming comes with a built-in `TwitterSource` class for establishing a
connection to this stream.
+
+#### Authentication
+In order to connect to Twitter stream the user has to register their program and acquire
the necessary information for the authentication. The process is described below.
+
+#### Acquiring the authentication information
+First of all, a Twitter account is needed. Sign up for free at [twitter.com/signup](https://twitter.com/signup)
or sing in at Twitter's [Application Management](https://apps.twitter.com/) and register the
application by clicking on the "Create New App" button. Fill out a form about your program
and accept the Terms and Conditions. 
+After selecting the application you the API key and API secret (called `consumerKey` and
`sonsumerSecret` in `TwitterSource` respectively) is located on the "API Keys" tab. The necessary
access token data (`token` and `secret`) can be acquired here. 
+Remember to keep these pieces of information a secret and do not push them to public repositories.
+
+#### Accessing the authentication information
+Create a properties file and pass its path in the constructor of `TwitterSource`. The content
of the file should be similar to this:
+
+~~~batch
+#properties file for my app
+secret=***
+consumerSecret=***
+token=***-***
+consumerKey=***
+~~~
+
+#### Constructors
+The `TwitterSource` class has two constructors.
+
+1. `public TwitterSource(String authPath, int numberOfTweets);` 
+to emit finite number of tweets
+2. `public TwitterSource(String authPath);` 
+for streaming
+
+Both constructors expect a `String authPath` argument determining the location of the properties
file containing the authentication information. In the first case, `numberOfTweets` determine
how many tweet the source emits. 
+
+#### Usage
+In constract to other connecters the `TwitterSource` depends on no additional services. For
example the following code should run gracefully:
+
+~~~java
+DataStream<String> streamSource = env.AddSource(new TwitterSource("/PATH/TO/myFile.properties"));
+~~~
+
+The `TwitterSource` emits strings containing a JSON code. 
+To retrieve information from the JSON code you can add a FlatMap or a Map function handling
JSON code. For example use an implementation `JSONParseFlatMap` abstract class among the examples.
`JSONParseFlatMap` is an extension of the `FlatMapFunction` and has a
+
+~~~java
+String getField(String jsonText, String field);
+~~~
+
+function which can be use to acquire the value of a given field. 
+
+There are two basic types of tweets. The usual tweets contain information such as date and
time of creation, id, user, language and many more details. The other type is the delete information.
+
+#### Example
+`TwitterLocal` is an example how to use `TwitterSource`. It implements a language frequency
counter program. 
+
+[Back to top](#top)
+
 ### Docker containers for connectors<a name="docker_connectors"></a>
 
 A Docker container is provided with all the required configurations for test running the
connectors of Apache Flink. The servers for the message queues will be running on the docker
container while the example topology can be run on the user's computer. The only exception
is Flume, more can be read about this issue at the [Flume section](#flume). 
 
 #### Installing Docker
 The official Docker installation guide can be found [here](https://docs.docker.com/installation/).
-After installing Docker an image can be pulled for each connector. Containers can be started
from these images where all the required configurations are set. 
+After installing Docker an image can be pulled for each connector. Containers can be started
from these images where all the required configurations are set.
+
+#### Creating a jar with all the dependencies
+For the easiest set up create a jar with all the dependencies of the *flink-streaming-connectors*
project.
+
+~~~batch
+cd /PATH/TO/GIT/incubator-flink/flink-addons/flink-streaming-connectors
+mvn assembly:assembly
+~~~batch
+
+This creates an assembly jar under *flink-streaming-connectors/target*. 
 
 #### RabbitMQ
 Pull the image:
@@ -844,14 +914,11 @@ sudo /etc/init.d/rabbitmq-server start
 To launch the example on the host computer execute:
 
 ~~~batch
-java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.rabbitmq.RMQTopology
> log.txt 2> errorlog.txt
+java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.rabbitmq.RMQTopology
\
+> log.txt 2> errorlog.txt
 ~~~
 
-The maven assemby plugin creates one jar with all the requiered dependencies. If the project
is in a directory called git then the jar can be found here: (the version number may change
later)
-~~~batch
-/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar
~~~
-
-In the example there are to connectors. One that sends messages to RabbitMQ and one that
receives messages from the same queue. In the logger messages the arriving messages can be
observed in the following format:
+In the example there are two connectors. One that sends messages to RabbitMQ and one that
receives messages from the same queue. In the logger messages the arriving messages can be
observed in the following format:
 
 ~~~
 <DATE> INFO rabbitmq.RMQTopology: String: <one> arrived from RMQ 
@@ -872,33 +939,34 @@ sudo docker pull flinkstreaming/flink-connectors-kafka
 To run the container type:
 
 ~~~batch
-sudo docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -t -i flinkstreaming/flink-connectors-kafka
+sudo docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -t -i \
+flinkstreaming/flink-connectors-kafka
 ~~~
 
 Now a terminal started running from the image with all the necessary configurations to test
run the Kafka connector. The -p flag binds the localhost's and the Docker container's ports
so Kafka can communicate with the application through these.
 First start a zookeeper in the background:
 
 ~~~batch
-/kafka_2.9.2-0.8.1.1/bin/zookeeper-server-start.sh /kafka_2.9.2-0.8.1.1/config/zookeeper.properties
> zookeeperlog.txt &
+/kafka_2.9.2-0.8.1.1/bin/zookeeper-server-start.sh /kafka_2.9.2-0.8.1.1/config/zookeeper.properties
\
+> zookeeperlog.txt &
 ~~~
 
 Then start the kafka server in the background:
 
 ~~~batch
-/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /kafka_2.9.2-0.8.1.1/config/server.properties
> serverlog.txt 2> servererr.txt &
+/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /kafka_2.9.2-0.8.1.1/config/server.properties
\
+ > serverlog.txt 2> servererr.txt &
 ~~~
 
 To launch the example on the host computer execute:
 
 ~~~batch
-java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.kafka.KafkaTopology
> log.txt 2> errorlog.txt
+java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.kafka.KafkaTopology
\
+> log.txt 2> errorlog.txt
 ~~~
 
-The maven assemby plugin creates one jar with all the requiered dependencies. If the project
is in a directory called git then the jar can be found here: (the version number may change
later)
-~~~batch
-/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar
~~~
 
-In the example there are to connectors. One that sends messages to Kafka and one that receives
messages from the same queue. In the logger messages the arriving messages can be observed
in the following format:
+In the example there are two connectors. One that sends messages to Kafka and one that receives
messages from the same queue. In the logger messages the arriving messages can be observed
in the following format:
 
 ~~~
 <DATE> INFO kafka.KafkaTopology: String: (0) arrived from Kafka
@@ -915,7 +983,7 @@ In the example there are to connectors. One that sends messages to Kafka
and one
 
 #### Apache Flume
 
-At the moment remote access for Flume connectors does not work. This example can be run only
on the same machine where the Flume server is. In this case both will be in the Docker container.
+At the moment remote access for Flume connectors does not work. This example is only runnable
on the same machine where the Flume server is. In this case both will be in the Docker container.
 
 Pull the image:
 
@@ -947,7 +1015,9 @@ mvn install -DskipTests
 First start the server in the background:
 
 ~~~batch
-/apache-flume-1.5.0-bin/bin/flume-ng agent --conf conf --conf-file /apache-flume-1.5.0-bin/example.conf
--name a1 -Dflume.root.logger=INFO,console > /flumelog.txt 2> /flumeerr.txt &
+/apache-flume-1.5.0-bin/bin/flume-ng agent \
+--conf conf --conf-file /apache-flume-1.5.0-bin/example.conf --name a1 \
+-Dflume.root.logger=INFO,console > /flumelog.txt 2> /flumeerr.txt &
 ~~~
 
 Then press enter and launch the example with:
@@ -956,9 +1026,6 @@ Then press enter and launch the example with:
 java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.flume.FlumeTopology
 ~~~
 
-The maven assemby plugin creates one jar with all the requiered dependencies. If the project
is in a directory called git then the jar can be found here: (the version number may change
later)
-~~~batch
-/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar
~~~
 In the example there are to connectors. One that sends messages to Flume and one that receives
messages from the same queue. In the logger messages the arriving messages can be observed
in the following format:
 
 ~~~


Mime
View raw message