Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 071E2114E5 for ; Wed, 24 Sep 2014 19:52:14 +0000 (UTC) Received: (qmail 31996 invoked by uid 500); 24 Sep 2014 19:52:14 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 31968 invoked by uid 500); 24 Sep 2014 19:52:13 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 31959 invoked by uid 99); 24 Sep 2014 19:52:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Sep 2014 19:52:13 +0000 X-ASF-Spam-Status: No, hits=-2000.8 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 24 Sep 2014 19:51:38 +0000 Received: (qmail 31401 invoked by uid 99); 24 Sep 2014 19:51:35 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Sep 2014 19:51:35 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DC6969A365D; Wed, 24 Sep 2014 19:51:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: mbalassi@apache.org To: commits@flink.incubator.apache.org Date: Wed, 24 Sep 2014 19:51:42 -0000 Message-Id: In-Reply-To: <0830c32020a9488cabe4ea045e720c5b@git.apache.org> References: <0830c32020a9488cabe4ea045e720c5b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/12] git commit: [doc] [streaming] Added Twitter connector & Projection operator to docs X-Virus-Checked: Checked by ClamAV on apache.org [doc] [streaming] Added Twitter connector & Projection operator to docs Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2dc5437a Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2dc5437a Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2dc5437a Branch: refs/heads/master Commit: 2dc5437a288d80dab5e10571c1bed630de79e1d3 Parents: 1bca326 Author: mbalassi Authored: Wed Sep 24 16:25:11 2014 +0200 Committer: mbalassi Committed: Wed Sep 24 19:54:39 2014 +0200 ---------------------------------------------------------------------- docs/streaming_guide.md | 365 +++++++++++++++++++++++++------------------ 1 file changed, 216 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2dc5437a/docs/streaming_guide.md ---------------------------------------------------------------------- diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md index f1b4189..27a32ba 100644 --- a/docs/streaming_guide.md +++ b/docs/streaming_guide.md @@ -22,9 +22,9 @@ Add the following dependency to your `pom.xml` to use the Flink Streaming. ~~~xml - org.apache.flink - flink-streaming-core - {{site.FLINK_VERSION_STABLE}} + org.apache.flink + flink-streaming-core + {{site.FLINK_VERSION_STABLE}} ~~~ @@ -39,32 +39,31 @@ The following program is a complete, working example of streaming WordCount. You ~~~java public class StreamingWordCount { - - public static void main(String[] args) { + public static void main(String[] args) { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); - - DataStream> dataStream = env - .fromElements("Who's there?", + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + + DataStream> dataStream = env + .fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?") - .flatMap(new Splitter()) - .groupBy(0) - .sum(1); - - dataStream.print(); - - env.execute(); - } - - public static class Splitter implements FlatMapFunction> { - @Override - public void flatMap(String sentence, Collector> out) throws Exception { - for (String word: sentence.split(" ")) { - out.collect(new Tuple2(word, 1)); - } - } - } - + .flatMap(new Splitter()) + .groupBy(0) + .sum(1); + + dataStream.print(); + + env.execute(); + } + + public static class Splitter implements FlatMapFunction> { + @Override + public void flatMap(String sentence, Collector> out) throws Exception { + for (String word: sentence.split(" ")) { + out.collect(new Tuple2(word, 1)); + } + } + } + } ~~~ @@ -156,7 +155,8 @@ Besides the pre-defined solutions the user can implement their own source by imp ### Sinks -`DataStreamSink` represents the different outputs of a Flink Streaming program. Every `DataStream` in a streaming program needs to be either transformed or closed down with a sink. There are several pre-defined implementations `DataStreamSink` available right away: +`DataStreamSink` represents the different outputs of a Flink Streaming program. There are several pre-defined implementations `DataStreamSink` available right away: + * `dataStream.print()` – Writes the DataStream to the standard output, practical for testing purposes * `dataStream.writeAsText(parameters)` – Writes the DataStream to a text file * `dataStream.writeAsCsv(parameters)` – Writes the DataStream to CSV format @@ -180,11 +180,11 @@ A map operator that doubles the values of the input stream: ~~~java dataStream.map(new MapFunction() { - @Override - public Integer map(Integer value) throws Exception { - return 2 * value; - } - }) + @Override + public Integer map(Integer value) throws Exception { + return 2 * value; + } + }) ~~~ #### FlatMap @@ -193,13 +193,13 @@ A flatmap operator that splits sentences to words: ~~~java dataStream.flatMap(new FlatMapFunction() { - @Override - public void flatMap(String value, Collector out) throws Exception { - for(String word: value.split(" ")){ - out.collect(word); - } - } - }) + @Override + public void flatMap(String value, Collector out) throws Exception { + for(String word: value.split(" ")){ + out.collect(word); + } + } + }) ~~~ #### Filter @@ -207,12 +207,12 @@ The Filter transformation applies a user-defined `FilterFunction` on each elemen A filter that filters out zero values: ~~~java -dataStream.filter(new FilterFunction() { - @Override - public boolean filter(Integer value) throws Exception { - return value != 0; - } - }) +dataStream.filter(new FilterFunction() { + @Override + public boolean filter(Integer value) throws Exception { + return value != 0; + } + }) ~~~ #### Reduce @@ -221,11 +221,11 @@ A reducer that sums up the incoming stream: ~~~java dataStream.reduce(new ReduceFunction() { - @Override - public Integer reduce(Integer value1, Integer value2) throws Exception { - return value1+value2; - } - }) + @Override + public Integer reduce(Integer value1, Integer value2) throws Exception { + return value1+value2; + } + }) ~~~ #### Merge @@ -286,20 +286,20 @@ A CoMap operator that outputs true if an Integer value is received and false if ~~~java DataStream dataStream1 = ... DataStream dataStream2 = ... - + dataStream1.connect(dataStream2) - .map(new CoMapFunction() { - - @Override - public Boolean map1(Integer value) { - return true; - } - - @Override - public Boolean map2(String value) { - return false; - } - }) + .map(new CoMapFunction() { + + @Override + public Boolean map1(Integer value) { + return true; + } + + @Override + public Boolean map2(String value) { + return false; + } + }) ~~~ #### FlatMap on ConnectedDataStream @@ -308,20 +308,20 @@ The FlatMap operator for the `ConnectedDataStream` works similarly to CoMap, but ~~~java DataStream dataStream1 = ... DataStream dataStream2 = ... - + dataStream1.connect(dataStream2) - .flatMap(new CoFlatMapFunction() { + .flatMap(new CoFlatMapFunction() { - @Override - public void flatMap1(Integer value, Collector out) { - out.collect(true); - } + @Override + public void flatMap1(Integer value, Collector out) { + out.collect(true); + } - @Override - public void flatMap2(String value, Collector out) { - out.collect(false); - } - }) + @Override + public void flatMap2(String value, Collector out) { + out.collect(false); + } + }) ~~~ #### Reduce on ConnectedDataStream @@ -431,7 +431,7 @@ operator.setMutability(isMutable) ~~~ [Back to top](#top) - + Stream connectors ---------------- @@ -452,6 +452,7 @@ This connector provides access to data streams from [Apache Kafka](https://kafka #### Kafka Source An abstract class providing an interface for receiving data from Kafka. By implementing the user must: + * Write a constructor calling the constructor of the abstract class, * Write a deserializer function which processes the data coming from Kafka, * Stop the source manually when necessary with one of the close functions. @@ -463,7 +464,7 @@ An example of an implementation of a constructor: ~~~java public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) { - super(zkQuorum, groupId, topicId, numThreads); + super(zkQuorum, groupId, topicId, numThreads); } ~~~ @@ -473,11 +474,11 @@ An example of an implementation of a deserializer: ~~~java @Override public String deserialize(byte[] msg) { - String s = new String(msg); - if(s.equals("q")){ - closeWithoutSend(); - } - return new String(s); + String s = new String(msg); + if(s.equals("q")){ + closeWithoutSend(); + } + return new String(s); } ~~~ @@ -490,6 +491,7 @@ In the example provided `closeWithoutSend()` is used because here the String `"q #### Kafka Sink An abstract class providing an interface for sending data to Kafka. By implementing the user must: + * Write a constructor calling the constructor of the abstract class, * Write a serializer function to send data in the desired form to Kafka, * Stop the sink manually when necessary with one of the close functions. @@ -501,7 +503,7 @@ An example of an implementation of a constructor: ~~~java public MyKafkaSink(String topicId, String brokerAddr) { - super(topicId, brokerAddr); + super(topicId, brokerAddr); } ~~~ @@ -511,10 +513,10 @@ An example of an implementation of a serializer: ~~~java @Override public String serialize(String tuple) { - if(tuple.equals("q")){ - sendAndClose(); - } - return tuple; + if(tuple.equals("q")){ + sendAndClose(); + } + return tuple; } ~~~ @@ -526,8 +528,8 @@ To use a Kafka connector as a source in Flink call the `addSource()` function wi ```java DataStream stream1 = env. - addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM) - .print(); + addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM) + .print(); ~~~ The followings have to be provided for the `MyKafkaSource()` constructor in order: @@ -541,8 +543,8 @@ Similarly to use a Kafka connector as a sink in Flink call the `addSink()` funct ```java DataStream stream2 = env - .addSource(new MySource()) - .addSink(new MyKafkaSink("test", "localhost:9092")); + .addSource(new MySource()) + .addSink(new MyKafkaSink("test", "localhost:9092")); ~~~ The followings have to be provided for the `MyKafkaSink()` constructor in order: @@ -563,6 +565,7 @@ This connector provides access to datastreams from [Apache Flume](http://flume.a #### Flume Source An abstract class providing an interface for receiving data from Flume. By implementing the user must: + * Write a constructor calling the constructor of the abstract class, * Write a deserializer function which processes the data coming from Flume, * Stop the source manually when necessary with one of the close functions. @@ -574,7 +577,7 @@ An example of an implementation of a constructor: ~~~java MyFlumeSource(String host, int port) { - super(host, port); + super(host, port); } ~~~ @@ -584,12 +587,12 @@ An example of an implementation of a deserializer: ~~~java @Override public String deserialize(byte[] msg) { - String s = (String) SerializationUtils.deserialize(msg); - String out = s; - if (s.equals("q")) { - closeWithoutSend(); - } - return out; + String s = (String) SerializationUtils.deserialize(msg); + String out = s; + if (s.equals("q")) { + closeWithoutSend(); + } + return out; } ~~~ @@ -602,6 +605,7 @@ In the example `closeWithoutSend()` is used because here the String `"q"` is met #### Flume Sink An abstract class providing an interface for sending data to Flume. By implementing the user must: + * Write a constructor calling the constructor of the abstract class, * Write a serializer function to send data in the desired form to Flume, * Stop the sink manually when necessary with one of the close functions. @@ -613,7 +617,7 @@ An example of an implementation of a constructor: ~~~java public MyFlumeSink(String host, int port) { - super(host, port); + super(host, port); } ~~~ @@ -623,15 +627,15 @@ An example of an implementation of a serializer. ~~~java @Override public byte[] serialize(String tuple) { - if (tuple.equals("q")) { - try { - sendAndClose(); - } catch (Exception e) { - new RuntimeException("Error while closing Flume connection with " + port + " at " - + host, e); - } - } - return SerializationUtils.serialize(tuple); + if (tuple.equals("q")) { + try { + sendAndClose(); + } catch (Exception e) { + new RuntimeException("Error while closing Flume connection with " + port + " at " + + host, e); + } + } + return SerializationUtils.serialize(tuple); } ~~~ @@ -643,8 +647,8 @@ To use a Flume connector as a source in Flink call the `addSource()` function wi ```java DataStream dataStream1 = env - .addSource(new MyFlumeSource("localhost", 41414)) - .print(); + .addSource(new MyFlumeSource("localhost", 41414)) + .print(); ~~~ The followings have to be provided for the `MyFlumeSource()` constructor in order: @@ -656,8 +660,8 @@ Similarly to use a Flume connector as a sink in Flink call the `addSink()` funct ```java DataStream dataStream2 = env - .fromElements("one", "two", "three", "four", "five", "q") - .addSink(new MyFlumeSink("localhost", 42424)); + .fromElements("one", "two", "three", "four", "five", "q") + .addSink(new MyFlumeSink("localhost", 42424)); ~~~ The followings have to be provided for the `MyFlumeSink()` constructor in order: @@ -719,7 +723,7 @@ An example of an implementation of a constructor: ~~~java public MyRMQSource(String HOST_NAME, String QUEUE_NAME) { - super(HOST_NAME, QUEUE_NAME); + super(HOST_NAME, QUEUE_NAME); } ~~~ @@ -729,12 +733,12 @@ An example of an implemetation of a deserializer: ~~~java @Override public String deserialize(byte[] t) { - String s = (String) SerializationUtils.deserialize(t); - String out = s; - if (s.equals("q")) { - closeWithoutSend(); - } - return out; + String s = (String) SerializationUtils.deserialize(t); + String out = s; + if (s.equals("q")) { + closeWithoutSend(); + } + return out; } ~~~ @@ -749,6 +753,7 @@ In the example `closeWithoutSend()` is used because here the String `"q"` is met #### RabbitMQ Sink An abstract class providing an interface for sending data to RabbitMQ. By implementing the user must: + * Write a constructor calling the constructor of the abstract class * Write a serializer function to send data in the desired form to RabbitMQ * Stop the sink manually when necessary with one of the close functions @@ -760,7 +765,7 @@ An example of an implementation of a constructor: ~~~java public MyRMQSink(String HOST_NAME, String QUEUE_NAME) { - super(HOST_NAME, QUEUE_NAME); + super(HOST_NAME, QUEUE_NAME); } ~~~ @@ -770,10 +775,10 @@ An example of an implementation of a serializer. ~~~java @Override public byte[] serialize(Tuple tuple) { - if (t.getField(0).equals("q")) { - sendAndClose(); - } - return SerializationUtils.serialize(tuple.f0); + if (t.getField(0).equals("q")) { + sendAndClose(); + } + return SerializationUtils.serialize(tuple.f0); } ~~~ @@ -784,10 +789,9 @@ The API provided is the [same](#rmq_source_close) as the one for `RabbitMQSource To use a RabbitMQ connector as a source in Flink call the `addSource()` function with a new instance of the class which extends `RabbitMQSource` as parameter: ~~~java -@SuppressWarnings("unused") DataStream dataStream1 = env - .addSource(new MyRMQSource("localhost", "hello")) - .print(); + .addSource(new MyRMQSource("localhost", "hello")) + .print(); ~~~ The followings have to be provided for the `MyRabbitMQSource()` constructor in order: @@ -799,8 +803,8 @@ Similarly to use a RabbitMQ connector as a sink in Flink call the `addSink()` fu ```java DataStream dataStream2 = env - .fromElements("one", "two", "three", "four", "five", "q") - .addSink(new MyRMQSink("localhost", "hello")); + .fromElements("one", "two", "three", "four", "five", "q") + .addSink(new MyRMQSink("localhost", "hello")); ~~~ The followings have to be provided for the `MyRabbitMQSink()` constructor in order: @@ -812,13 +816,79 @@ More about RabbitMQ can be found [here](http://www.rabbitmq.com/). [Back to top](#top) +### Twitter Streaming API + +Twitter Streaming API provides opportunity to connect to the stream of tweets made available by Twitter. Flink Streaming comes with a built-in `TwitterSource` class for establishing a connection to this stream. + +#### Authentication +In order to connect to Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below. + +#### Acquiring the authentication information +First of all, a Twitter account is needed. Sign up for free at [twitter.com/signup](https://twitter.com/signup) or sing in at Twitter's [Application Management](https://apps.twitter.com/) and register the application by clicking on the "Create New App" button. Fill out a form about your program and accept the Terms and Conditions. +After selecting the application you the API key and API secret (called `consumerKey` and `sonsumerSecret` in `TwitterSource` respectively) is located on the "API Keys" tab. The necessary access token data (`token` and `secret`) can be acquired here. +Remember to keep these pieces of information a secret and do not push them to public repositories. + +#### Accessing the authentication information +Create a properties file and pass its path in the constructor of `TwitterSource`. The content of the file should be similar to this: + +~~~batch +#properties file for my app +secret=*** +consumerSecret=*** +token=***-*** +consumerKey=*** +~~~ + +#### Constructors +The `TwitterSource` class has two constructors. + +1. `public TwitterSource(String authPath, int numberOfTweets);` +to emit finite number of tweets +2. `public TwitterSource(String authPath);` +for streaming + +Both constructors expect a `String authPath` argument determining the location of the properties file containing the authentication information. In the first case, `numberOfTweets` determine how many tweet the source emits. + +#### Usage +In constract to other connecters the `TwitterSource` depends on no additional services. For example the following code should run gracefully: + +~~~java +DataStream streamSource = env.AddSource(new TwitterSource("/PATH/TO/myFile.properties")); +~~~ + +The `TwitterSource` emits strings containing a JSON code. +To retrieve information from the JSON code you can add a FlatMap or a Map function handling JSON code. For example use an implementation `JSONParseFlatMap` abstract class among the examples. `JSONParseFlatMap` is an extension of the `FlatMapFunction` and has a + +~~~java +String getField(String jsonText, String field); +~~~ + +function which can be use to acquire the value of a given field. + +There are two basic types of tweets. The usual tweets contain information such as date and time of creation, id, user, language and many more details. The other type is the delete information. + +#### Example +`TwitterLocal` is an example how to use `TwitterSource`. It implements a language frequency counter program. + +[Back to top](#top) + ### Docker containers for connectors A Docker container is provided with all the required configurations for test running the connectors of Apache Flink. The servers for the message queues will be running on the docker container while the example topology can be run on the user's computer. The only exception is Flume, more can be read about this issue at the [Flume section](#flume). #### Installing Docker The official Docker installation guide can be found [here](https://docs.docker.com/installation/). -After installing Docker an image can be pulled for each connector. Containers can be started from these images where all the required configurations are set. +After installing Docker an image can be pulled for each connector. Containers can be started from these images where all the required configurations are set. + +#### Creating a jar with all the dependencies +For the easiest set up create a jar with all the dependencies of the *flink-streaming-connectors* project. + +~~~batch +cd /PATH/TO/GIT/incubator-flink/flink-addons/flink-streaming-connectors +mvn assembly:assembly +~~~batch + +This creates an assembly jar under *flink-streaming-connectors/target*. #### RabbitMQ Pull the image: @@ -844,14 +914,11 @@ sudo /etc/init.d/rabbitmq-server start To launch the example on the host computer execute: ~~~batch -java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.rabbitmq.RMQTopology > log.txt 2> errorlog.txt +java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.rabbitmq.RMQTopology \ +> log.txt 2> errorlog.txt ~~~ -The maven assemby plugin creates one jar with all the requiered dependencies. If the project is in a directory called git then the jar can be found here: (the version number may change later) -~~~batch -/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar ~~~ - -In the example there are to connectors. One that sends messages to RabbitMQ and one that receives messages from the same queue. In the logger messages the arriving messages can be observed in the following format: +In the example there are two connectors. One that sends messages to RabbitMQ and one that receives messages from the same queue. In the logger messages the arriving messages can be observed in the following format: ~~~ INFO rabbitmq.RMQTopology: String: arrived from RMQ @@ -872,33 +939,34 @@ sudo docker pull flinkstreaming/flink-connectors-kafka To run the container type: ~~~batch -sudo docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -t -i flinkstreaming/flink-connectors-kafka +sudo docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -t -i \ +flinkstreaming/flink-connectors-kafka ~~~ Now a terminal started running from the image with all the necessary configurations to test run the Kafka connector. The -p flag binds the localhost's and the Docker container's ports so Kafka can communicate with the application through these. First start a zookeeper in the background: ~~~batch -/kafka_2.9.2-0.8.1.1/bin/zookeeper-server-start.sh /kafka_2.9.2-0.8.1.1/config/zookeeper.properties > zookeeperlog.txt & +/kafka_2.9.2-0.8.1.1/bin/zookeeper-server-start.sh /kafka_2.9.2-0.8.1.1/config/zookeeper.properties \ +> zookeeperlog.txt & ~~~ Then start the kafka server in the background: ~~~batch -/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /kafka_2.9.2-0.8.1.1/config/server.properties > serverlog.txt 2> servererr.txt & +/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /kafka_2.9.2-0.8.1.1/config/server.properties \ + > serverlog.txt 2> servererr.txt & ~~~ To launch the example on the host computer execute: ~~~batch -java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.kafka.KafkaTopology > log.txt 2> errorlog.txt +java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.kafka.KafkaTopology \ +> log.txt 2> errorlog.txt ~~~ -The maven assemby plugin creates one jar with all the requiered dependencies. If the project is in a directory called git then the jar can be found here: (the version number may change later) -~~~batch -/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar ~~~ -In the example there are to connectors. One that sends messages to Kafka and one that receives messages from the same queue. In the logger messages the arriving messages can be observed in the following format: +In the example there are two connectors. One that sends messages to Kafka and one that receives messages from the same queue. In the logger messages the arriving messages can be observed in the following format: ~~~ INFO kafka.KafkaTopology: String: (0) arrived from Kafka @@ -915,7 +983,7 @@ In the example there are to connectors. One that sends messages to Kafka and one #### Apache Flume -At the moment remote access for Flume connectors does not work. This example can be run only on the same machine where the Flume server is. In this case both will be in the Docker container. +At the moment remote access for Flume connectors does not work. This example is only runnable on the same machine where the Flume server is. In this case both will be in the Docker container. Pull the image: @@ -947,7 +1015,9 @@ mvn install -DskipTests First start the server in the background: ~~~batch -/apache-flume-1.5.0-bin/bin/flume-ng agent --conf conf --conf-file /apache-flume-1.5.0-bin/example.conf --name a1 -Dflume.root.logger=INFO,console > /flumelog.txt 2> /flumeerr.txt & +/apache-flume-1.5.0-bin/bin/flume-ng agent \ +--conf conf --conf-file /apache-flume-1.5.0-bin/example.conf --name a1 \ +-Dflume.root.logger=INFO,console > /flumelog.txt 2> /flumeerr.txt & ~~~ Then press enter and launch the example with: @@ -956,9 +1026,6 @@ Then press enter and launch the example with: java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.flume.FlumeTopology ~~~ -The maven assemby plugin creates one jar with all the requiered dependencies. If the project is in a directory called git then the jar can be found here: (the version number may change later) -~~~batch -/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar ~~~ In the example there are to connectors. One that sends messages to Flume and one that receives messages from the same queue. In the logger messages the arriving messages can be observed in the following format: ~~~