flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject git commit: [doc] Re-add streaming guide that was lost in rebase
Date Mon, 22 Sep 2014 13:34:11 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 63e30ff5d -> 582cd0304


[doc] Re-add streaming guide that was lost in rebase

Also convert to kramdown syntax


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/582cd030
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/582cd030
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/582cd030

Branch: refs/heads/master
Commit: 582cd03047c01186a2147e76153ebc871de125db
Parents: 63e30ff
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Sep 22 15:33:12 2014 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Sep 22 15:33:12 2014 +0200

----------------------------------------------------------------------
 docs/_includes/sidenav.html   |   1 +
 docs/_plugins/build_apidoc.rb |   2 +-
 docs/streaming_guide.md       | 301 ++++++++++++++++++-------------------
 3 files changed, 148 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/582cd030/docs/_includes/sidenav.html
----------------------------------------------------------------------
diff --git a/docs/_includes/sidenav.html b/docs/_includes/sidenav.html
index 783c5c4..40cea81 100644
--- a/docs/_includes/sidenav.html
+++ b/docs/_includes/sidenav.html
@@ -23,6 +23,7 @@
     <ul>
       <li><a href="programming_guide.html">Programming Guide</a></li>
       <li><a href="dataset_transformations.html">DataSet Transformations</a></li>
+      <li><a href="streaming_guide.html">Streaming Guide</a></li>
       <li><a href="iterations.html">Iterations</a></li>
       <li><a href="spargel_guide.html">Spargel Graph API</a></li>
     </ul>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/582cd030/docs/_plugins/build_apidoc.rb
----------------------------------------------------------------------
diff --git a/docs/_plugins/build_apidoc.rb b/docs/_plugins/build_apidoc.rb
index 5f17197..420d101 100644
--- a/docs/_plugins/build_apidoc.rb
+++ b/docs/_plugins/build_apidoc.rb
@@ -36,7 +36,7 @@ if ENV['BUILD_API'] == '1' then
   puts `mvn clean install -DskipTests`
 
   puts "Generating Javadoc"
-  puts `mvn javadoc:aggregate -Pdocs-and-source $JAVA8JAVADOCFIX -Dmaven.javadoc.failOnError=false
-Dquiet=true -Dheader="<a href=\"/\" target=\"_top\"><h1>Back to Flink Documentation</h1></a>"`
+  puts `mvn javadoc:aggregate -Pdocs-and-source $JAVA8JAVADOCFIX -Dmaven.javadoc.failOnError=false
-Dquiet=true -Dheader="<a href=\"/docs/0.7-incubating/\" target=\"_top\"><h1>Back
to Flink Documentation</h1></a>"`
   
   cd("docs")
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/582cd030/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 87d851d..e2f5854 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -2,9 +2,14 @@
 title: "Flink Stream Processing API"
 ---
 
-<section id="top">
-Flink Streaming
-=======
+* This will be replaced by the TOC
+{:toc}
+
+<a href="#top"></a>
+
+Introduction
+------------
+
 
 Flink Streaming is an extension of the core Flink API for high-throughput, low-latency data
stream processing. The system can connect to and process data streams from many data sources
like Flume, Twitter, ZeroMQ and also from any user defined data source. Data streams can be
transformed and modified using high-level functions similar to the ones provided by the batch
processing API. Flink Streaming provides native support for iterative stream processing. The
processed data can be pushed to different output types.
 
@@ -15,32 +20,23 @@ The Streaming API is part of the *addons* Maven project. All relevant
classes ar
 
 Add the following dependency to your `pom.xml` to use the Flink Streaming.
 
-```xml
+~~~xml
 <dependency>
 	<groupId>org.apache.flink</groupId>
 	<artifactId>flink-streaming-core</artifactId>
 	<version>{{site.FLINK_VERSION_STABLE}}</version>
 </dependency>
-```
+~~~
 
-Create a data stream flow with our Java API as described below. In order to create your own
Flink Streaming program, we encourage you to start with the [skeleton](#skeleton) and gradually
add your own [operations](#operations). The remaining sections act as references for additional
operations and advanced features.
+Create a data stream flow with our Java API as described below. In order to create your own
Flink Streaming program, we encourage you to start with the [skeleton](#program-skeleton)
and gradually add your own [operations](#operations). The remaining sections act as references
for additional operations and advanced features.
 
-<section id="toc">
-<div id="docs_05_toc">
-  <div class="list-group">
-{% for sublink in page.toc %}
-   <a href="#{{ sublink.anchor }}" class="list-group-item">{{forloop.index}}. <strong>{{
sublink.title }}</strong></a>
-{% endfor %}
-  </div>
-</div>
 
-<section id="example">
 Example Program
 ---------------
 
 The following program is a complete, working example of streaming WordCount. You can copy
&amp; paste the code to run it locally.
 
-```java
+~~~java
 public class StreamingWordCount {
 
 	
@@ -70,15 +66,14 @@ public class StreamingWordCount {
 	}
 	
 }
-```
+~~~
 
 [Back to top](#top)
 
-<section id="skeleton">
 Program Skeleton
 ----------------
 
-As presented in the [example](#example), a Flink Streaming program looks almost identical
to a regular Flink program. Each stream processing program consists of the following parts:
+As presented in the [example](#example-program), a Flink Streaming program looks almost identical
to a regular Flink program. Each stream processing program consists of the following parts:
 
 1. Creating a `StreamExecutionEnvironment`,
 2. Connecting to data stream sources,
@@ -89,38 +84,37 @@ As presented in the [example](#example), a Flink Streaming program looks
almost
 As these steps are basically the same as in the core API we will only note the important
differences.
 For stream processing jobs, the user needs to obtain a `StreamExecutionEnvironment` in contrast
with the batch API where one would need an `ExecutionEnvironment`. The process otherwise is
essentially the same:
 
-```java 
+~~~java 
 StreamExecutionEnvironment.createLocalEnvironment(params…)
 StreamExecutionEnvironment.createRemoteEnvironment(params…)
-```
+~~~
 
 For connecting to data streams the `StreamExecutionEnvironment` has many different methods,
from basic file sources to completely general user defined data sources. We will go into details
in the [basics](#basics) section.
 
-```java
+~~~java
 env.readTextFile(filePath)
-```
+~~~
 
 After defining the data stream sources, the user can specify transformations on the data
streams to create a new data stream. Different data streams can be also combined together
for joint transformations which are being showcased in the [operations](#operations) section.
 
-```java
+~~~java
 dataStream.map(new Mapper()).reduce(new Reducer())
-```
+~~~
 
 The processed data can be pushed to different outputs called sinks. The user can define their
own sinks or use any predefined filesystem or database sink.
 
-```java
+~~~java
 dataStream.writeAsCsv(path)
-```
+~~~
 
 Once the complete program is specified `execute()` needs to be called on the `StreamExecutionEnvironment`.
This will either execute on the local machine or submit the program for execution on a cluster,
depending on the chosen execution environment.
 
-```java
+~~~java
 env.execute()
-```
+~~~
 
 [Back to top](#top)
 
-<section id="basics">
 Basics
 ----------------
 
@@ -157,7 +151,7 @@ The user can connect to data streams by the different implemenations of
`DataStr
  * `env.readTextFile(filepath)`
 
 These can be used to easily test and debug streaming programs. There are also some streaming
specific sources for example `env.readTextStream(filepath)` which iterates over the same file
infinitely providing yet another nice testing tool.
-There are implemented connectors for a number of the most popular message queue services,
please refer to the section on [connectors](#connectors) for more detail.
+There are implemented connectors for a number of the most popular message queue services,
please refer to the section on [connectors](#stream-connectors) for more detail.
 Besides the pre-defined solutions the user can implement their own source by implementing
the `SourceFunction` interface and using the `env.addSource(sourceFunction)` method of the
`StreamExecutionEnvironment`.
 
 ### Sinks
@@ -171,7 +165,6 @@ The user can also implement arbitrary sink functionality by implementing
the `Si
 
 [Back to top](#top)
 
-<section id="operations">
 Operations
 ----------------
 
@@ -185,20 +178,20 @@ Basic operators can be seen as functions that transform each data element
in the
 The Map transformation applies a user-defined `MapFunction` on each element of a `DataStream`.
It implements a one-to-one mapping, that is, exactly one element must be returned by the function.
 A map operator that doubles the values of the input stream:
 
-```java
+~~~java
 dataStream.map(new MapFunction<Integer, Integer>() {
 			@Override
 			public Integer map(Integer value) throws Exception {
 				return 2 * value;
 			}
 		})
-```
+~~~
 
 #### FlatMap
 The FlatMap transformation applies a user-defined `FlatMapFunction` on each element of a
`DataStream`. This variant of a map function can return arbitrary many result elements (including
none) for each input element.
 A flatmap operator that splits sentences to words:
 
-```java
+~~~java
 dataStream.flatMap(new FlatMapFunction<String, String>() {
 			@Override
 			public void flatMap(String value, Collector<String> out) throws Exception {
@@ -207,40 +200,40 @@ dataStream.flatMap(new FlatMapFunction<String, String>() {
 				}
 			}
 		})
-```
+~~~
 
 #### Filter
 The Filter transformation applies a user-defined `FilterFunction` on each element of a `DataStream`
and retains only those elements for which the function returns true.
 A filter that filters out zero values:
 
-```java
+~~~java
 dataStream.filter(new FilterFunction<Integer>() {		
 			@Override
 			public boolean filter(Integer value) throws Exception {
 				return value != 0;
 			}
 		})
-```
+~~~
 
 #### Reduce
 The Reduce transformation applies a user-defined `ReduceFunction` to all elements of a `DataStream`.
The `ReduceFunction` subsequently combines pairs of elements into one element and outputs
the current reduced value as a `DataStream`.
 A reducer that sums up the incoming stream:
 
-```java
+~~~java
 dataStream.reduce(new ReduceFunction<Integer>() {
 			@Override
 			public Integer reduce(Integer value1, Integer value2) throws Exception {
 				return value1+value2;
 			}
 		})
-```
+~~~
 
 #### Merge
 Merges two or more `DataStream` instances creating a new DataStream containing all the elements
from all the streams.
 
-```java
+~~~java
 dataStream.merge(otherStream1, otherStream2…)
-```
+~~~
 
 ### Grouped operators
 
@@ -270,16 +263,16 @@ The transformation calls a user-defined `ReduceFunction` on records
received in
 
 A window reduce that sums the elements in the last minute with 10 seconds slide interval:
 
-```java
+~~~java
 dataStream.window(60000, 10000).sum();
-```
+~~~
 
 #### ReduceGroup on windowed/batched data streams
 The transformation calls a `GroupReduceFunction` for each data batch or data window. The
batch/window slides by the predefined number of elements/time after each call.
 
-```java
+~~~java
 dataStream.batch(1000, 100).reduceGroup(reducer);
-```
+~~~
 
 ### Co operators
 
@@ -290,7 +283,7 @@ Co operators can be applied to `ConnectedDataStreams` which represent
two `DataS
 Applies a CoMap transformation on two separate DataStreams, mapping them to a common output
type. The transformation calls a `CoMapFunction.map1()` for each element of the first input
and `CoMapFunction.map2()` for each element of the second input. Each CoMapFunction call returns
exactly one element.
 A CoMap operator that outputs true if an Integer value is received and false if a String
value is received:
 
-```java
+~~~java
 DataStream<Integer> dataStream1 = ...
 DataStream<String> dataStream2 = ...
 		
@@ -307,12 +300,12 @@ dataStream1.connect(dataStream2)
 				return false;
 			}
 		})
-```
+~~~
 
 #### FlatMap on ConnectedDataStream
 The FlatMap operator for the `ConnectedDataStream` works similarly to CoMap, but instead
of returning exactly one element after each map call the user can output arbitrarily many
values using the Collector interface. 
 
-```java
+~~~java
 DataStream<Integer> dataStream1 = ...
 DataStream<String> dataStream2 = ...
 		
@@ -329,31 +322,30 @@ dataStream1.connect(dataStream2)
 				out.collect(false);
 			}
 		})
-```
+~~~
 
 #### Reduce on ConnectedDataStream
 The Reduce operator for the `ConnectedDataStream` applies a simple reduce transformation
on the joined data streams and then maps the reduced elements to a common output type.
 
-<section id="output-splitting">
 ### Output splitting
 
 Most data stream operators support directed outputs, meaning that different data elements
are received by only given outputs. The outputs are referenced by their name given at the
point of receiving:
 
-```java
+~~~java
 SplitDataStream<Integer> split = someDataStream.split(outputSelector);
 DataStream<Integer> even = split.select("even");
 DataStream<Integer> odd = split.select("odd");
-```
+~~~
 
 Data streams only receive the elements directed to selected output names. These outputs are
directed by implementing a selector function (extending `OutputSelector`):
 
-```java
+~~~java
 void select(OUT value, Collection<String> outputs);
-```
+~~~
 
 The data is sent to all the outputs added to the collection outputs (referenced by their
name). This way the direction of the outputs can be determined by the value of the data sent.
For example:
 
-```java
+~~~java
 @Override
 void select(Integer value, Collection<String> outputs) {
     if (value % 2 == 0) {
@@ -362,7 +354,7 @@ void select(Integer value, Collection<String> outputs) {
         outputs.add("odd");
     }
 }
-```
+~~~
 
 This output selection allows data streams to listen to multiple outputs, and data points
to be sent to multiple outputs. A value is sent to all the outputs specified in the `OutputSelector`
and a data stream will receive a value if it has selected any of the outputs the value is
sent to. The stream will receive the data at most once.
 It is common that a stream listens to all the outputs, so `split.selectAll()` is provided
as an alias for explicitly selecting all output names.
@@ -373,26 +365,26 @@ The Flink Streaming API supports implementing iterative stream processing
datafl
 Unlike in the core API the user does not define the maximum number of iterations, but at
the tail of each iteration the output is both streamed forward to the next operator and also
streamed back to the iteration head. The user controls the output of the iteration tail using
[output splitting](#output-splitting).
 To start an iterative part of the program the user defines the iteration starting point:
 
-```java
+~~~java
 IterativeDataStream<Integer> iteration = source.iterate();
-```
+~~~
 The operator applied on the iteration starting point is the head of the iteration, where
data is fed back from the iteration tail.
 
-```java
+~~~java
 DataStream<Integer> head = iteration.map(new IterationHead());
-```
+~~~
 
 To close an iteration and define the iteration tail, the user calls `.closeWith(tail)` method
of the `IterativeDataStream`:
 
-```java
+~~~java
 DataStream<Integer> tail = head.map(new IterationTail());
 iteration.closeWith(tail);
-```
+~~~
 Or to use with output splitting:
-```java
+~~~java
 SplitDataStream<Integer> tail = head.map(new IterationTail()).split(outputSelector);
 iteration.closeWith(tail.select("iterate"));
-``` 
+~~~ 
 
 Because iterative streaming programs do not have a set number of iteratons for each data
element, the streaming program has no information on the end of its input. From this it follows
that iterative streaming programs run until the user manually stops the program. While this
is acceptable under normal circumstances a method is provided to allow iterative programs
to shut down automatically if no input received by the iteration head for a predefined number
of milliseconds.
 To use this function the user needs to call, the `iteration.setMaxWaitTime(millis)` to control
the max wait time. 
@@ -400,17 +392,17 @@ To use this function the user needs to call, the `iteration.setMaxWaitTime(milli
 ### Rich functions
 The usage of rich functions are essentially the same as in the core Flink API. All transformations
that take as argument a user-defined function can instead take a rich function as argument:
 
-```java
+~~~java
 dataStream.map(new RichMapFunction<String, Integer>() {
   public Integer map(String value) { return value.toString(); }
 });
-```
+~~~
 
 Rich functions provide, in addition to the user-defined function (`map()`, `reduce()`, etc),
the `open()` and `close()` methods for initialization and finalization. (In contrast to the
core API, the streaming API currently does not support the  `getRuntimeContext()` and `setRuntimeContext()`
methods.)
 
 [Back to top](#top)
 
-<section id="Operator-settings">
+
 Operator Settings
 ----------------
 
@@ -423,24 +415,23 @@ Setting parallelism for operators works exactly the same way as in the
core Flin
 By default data points are not transferred on the network one-by-one, which would cause unnecessary
network traffic, but are buffered in the output buffers. The size of the output buffers can
be set in the Flink config files. While this method is good for optimizing throughput, it
can cause latency issues when the incoming stream is not fast enough.
 To tackle this issue the user can call `env.setBufferTimeout(timeoutMillis)` on the execution
environment (or on individual operators) to set a maximum wait time for the buffers to fill
up. After this time the buffers are flushed automatically even if they are not full. Usage:
 
-```java
+~~~java
 LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
 env.setBufferTimeout(timeoutMillis);
 
 env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
-```
+~~~
 
 ### Mutability
 
 Most operators allow setting mutability for reading input data. If the operator is set mutable
then the variable used to store input data for operators will be reused in a mutable fashion
to avoid excessive object creation. By default, all operators are set to immutable.
 Usage:
-```java
+~~~java
 operator.setMutability(isMutable)
-```
+~~~
 
 [Back to top](#top)
 	
-<section id="connectors">
 Stream connectors
 ----------------
 
@@ -470,16 +461,16 @@ The implemented class must extend `KafkaSource`, for example: `KafkaSource<Tuple
 ##### Constructor
 An example of an implementation of a constructor:
 
-```java
+~~~java
 public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
 	super(zkQuorum, groupId, topicId, numThreads);
 }
-```
+~~~
 
 ##### Deserializer
 An example of an implementation of a deserializer:
 
-```java
+~~~java
 @Override
 public Tuple1<String> deserialize(byte[] msg) {
 	String s = new String(msg);
@@ -488,7 +479,7 @@ public Tuple1<String> deserialize(byte[] msg) {
 	}
 	return new Tuple1<String>(s);
 }
-```
+~~~
 
 The source closes when it receives the String `"q"`.
 
@@ -508,16 +499,16 @@ The implemented class must extend `KafkaSink`, for example `KafkaSink<Tuple1<Str
 ##### Constructor
 An example of an implementation of a constructor:
 
-```java
+~~~java
 public MyKafkaSink(String topicId, String brokerAddr) {
 	super(topicId, brokerAddr);
 }
-```
+~~~
 
 ##### Serializer
 An example of an implementation of a serializer:
 
-```java
+~~~java
 @Override
 public String serialize(Tuple1<String> tuple) {
 	if(tuple.f0.equals("q")){
@@ -525,7 +516,7 @@ public String serialize(Tuple1<String> tuple) {
 	}
 	return tuple.f0;
 }
-```
+~~~
 
 ##### Close
 The API provided is the [same](#kafka_source_close) as the one for `KafkaSource`.
@@ -533,11 +524,11 @@ The API provided is the [same](#kafka_source_close) as the one for `KafkaSource`
 #### Building A Topology
 To use a Kafka connector as a source in Flink call the `addSource()` function with a new
instance of the class which extends `KafkaSource` as parameter:
 
-```java
+~~~java
 DataStream<Tuple1<String>> stream1 = env.
 	addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
 	.print();
-```
+~~~
 
 The followings have to be provided for the `MyKafkaSource()` constructor in order:
 
@@ -548,11 +539,11 @@ The followings have to be provided for the `MyKafkaSource()` constructor
in orde
 
 Similarly to use a Kafka connector as a sink in Flink call the `addSink()` function with
a new instance of the class which extends `KafkaSink`:
 
-```java
+~~~java
 DataStream<Tuple1<String>> stream2 = env
 	.addSource(new MySource())
 	.addSink(new MyKafkaSink("test", "localhost:9092"));
-```
+~~~
 
 The followings have to be provided for the `MyKafkaSink()` constructor in order:
 
@@ -581,16 +572,16 @@ The implemented class must extend `FlumeSource` for example: `FlumeSource<Tuple1
 ##### Constructor
 An example of an implementation of a constructor:
 
-```java
+~~~java
 MyFlumeSource(String host, int port) {
 	super(host, port);
 }
-```
+~~~
 
 ##### Deserializer
 An example of an implementation of a deserializer:
 
-```java
+~~~java
 @Override
 public Tuple1<String> deserialize(byte[] msg) {
 	String s = (String) SerializationUtils.deserialize(msg);
@@ -601,7 +592,7 @@ public Tuple1<String> deserialize(byte[] msg) {
 	}
 	return out;
 }
-```
+~~~
 
 The source closes when it receives the String `"q"`.
 
@@ -621,16 +612,16 @@ The implemented class must extend `FlumeSink`, for example `FlumeSink<Tuple1<Str
 ##### Constructor
 An example of an implementation of a constructor:
 
-```java
+~~~java
 public MyFlumeSink(String host, int port) {
 	super(host, port);
 }
-```
+~~~
 
 ##### Serializer
 An example of an implementation of a serializer.
 
-```java
+~~~java
 @Override
 public byte[] serialize(Tuple1<String> tuple) {
 	if (tuple.f0.equals("q")) {
@@ -643,7 +634,7 @@ public byte[] serialize(Tuple1<String> tuple) {
 	}
 	return SerializationUtils.serialize(tuple.f0);
 }
-```
+~~~
 
 ##### Close
 The API provided is the [same](#flume_source_close) as the one for `FlumeSource`.
@@ -651,11 +642,11 @@ The API provided is the [same](#flume_source_close) as the one for `FlumeSource`
 #### Building A Topology
 To use a Flume connector as a source in Flink call the `addSource()` function with a new
instance of the class which extends `FlumeSource` as parameter:
 
-```java
+~~~java
 DataStream<Tuple1<String>> dataStream1 = env
 	.addSource(new MyFlumeSource("localhost", 41414))
 	.print();
-```
+~~~
 
 The followings have to be provided for the `MyFlumeSource()` constructor in order:
 
@@ -664,11 +655,11 @@ The followings have to be provided for the `MyFlumeSource()` constructor
in orde
 
 Similarly to use a Flume connector as a sink in Flink call the `addSink()` function with
a new instance of the class which extends `FlumeSink`
 
-```java
+~~~java
 DataStream<Tuple1<String>> dataStream2 = env
 	.fromElements("one", "two", "three", "four", "five", "q")
 	.addSink(new MyFlumeSink("localhost", 42424));
-```
+~~~
 
 The followings have to be provided for the `MyFlumeSink()` constructor in order:
 
@@ -678,7 +669,7 @@ The followings have to be provided for the `MyFlumeSink()` constructor
in order:
 ##### Configuration file<a name="config_file"></a>
 An example of a configuration file:
 
-```
+~~~
 a1.channels = c1
 a1.sources = r1
 a1.sinks = k1
@@ -694,13 +685,13 @@ a1.sinks.k1.channel = c1
 a1.sinks.k1.type = avro
 a1.sinks.k1.hostname = localhost
 a1.sinks.k1.port = 41414
-```
+~~~
 
 To run the `FlumeTopology` example the previous configuration file must located in the Flume
directory and named example.conf and the agent can be started with the following command:
 
-```
+~~~
 bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
-```
+~~~
 
 If the agent is not started before the application starts a `FlumeSink` then the sink will
retry to build the connection for 90 seconds, if unsuccessful it throws a `RuntimeException`.
 
@@ -727,16 +718,16 @@ The implemented class must extend `RabbitMQSource` for example: `RabbitMQSource<
 ##### Constructor
 An example of an implementation of a constructor:
 
-```java
+~~~java
 public MyRMQSource(String HOST_NAME, String QUEUE_NAME) {
 	super(HOST_NAME, QUEUE_NAME);
 }
-```
+~~~
 
 ##### Deserializer
 An example of an implemetation of a deserializer:
 
-```java
+~~~java
 @Override
 public Tuple1<String> deserialize(byte[] t) {
 	String s = (String) SerializationUtils.deserialize(t);
@@ -747,7 +738,7 @@ public Tuple1<String> deserialize(byte[] t) {
 	}
 	return out;
 }
-```
+~~~
 
 The source closes when it receives the String `"q"`.
 
@@ -769,16 +760,16 @@ The implemented class must extend `RabbitMQSink` for example: `RabbitMQSink<Tupl
 ##### Constructor
 An example of an implementation of a constructor:
 
-```java
+~~~java
 public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
 	super(HOST_NAME, QUEUE_NAME);
 }
-```
+~~~
 
 ##### Serializer
 An example of an implementation of a serializer.
 
-```java
+~~~java
 @Override
 public byte[] serialize(Tuple tuple) {
 	if (t.getField(0).equals("q")) {
@@ -786,7 +777,7 @@ public byte[] serialize(Tuple tuple) {
 	}
 	return SerializationUtils.serialize(tuple.f0);
 }
-```
+~~~
 
 ##### Close
 The API provided is the [same](#rmq_source_close) as the one for `RabbitMQSource`.
@@ -794,12 +785,12 @@ The API provided is the [same](#rmq_source_close) as the one for `RabbitMQSource
 #### Building A Topology
 To use a RabbitMQ connector as a source in Flink call the `addSource()` function with a new
instance of the class which extends `RabbitMQSource` as parameter:
 
-```java
+~~~java
 @SuppressWarnings("unused")
 DataStream<Tuple1<String>> dataStream1 = env
 	.addSource(new MyRMQSource("localhost", "hello"))
 	.print();
-```
+~~~
 
 The followings have to be provided for the `MyRabbitMQSource()` constructor in order:
 
@@ -808,11 +799,11 @@ The followings have to be provided for the `MyRabbitMQSource()` constructor
in o
 
 Similarly to use a RabbitMQ connector as a sink in Flink call the `addSink()` function with
a new instance of the class which extends `RabbitMQSink`
 
-```java
+~~~java
 DataStream<Tuple1<String>> dataStream2 = env
 	.fromElements("one", "two", "three", "four", "five", "q")
 	.addSink(new MyRMQSink("localhost", "hello"));
-```
+~~~
 
 The followings have to be provided for the `MyRabbitMQSink()` constructor in order:
 
@@ -834,84 +825,84 @@ After installing Docker an image can be pulled for each connector. Containers
ca
 #### RabbitMQ
 Pull the image:
 
-```batch
+~~~batch
 sudo docker pull flinkstreaming/flink-connectors-rabbitmq 
-```
+~~~
 
 To run the container type:
 
-```batch
+~~~batch
 sudo docker run -p 127.0.0.1:5672:5672 -t -i flinkstreaming/flink-connectors-rabbitmq
-```
+~~~
 
 Now a terminal started running from the image with all the necessary configurations to test
run the RabbitMQ connector. The -p flag binds the localhost's and the Docker container's port
so RabbitMQ can communicate with the application through this.
 
 To start the RabbitMQ server:
 
-```batch
+~~~batch
 sudo /etc/init.d/rabbitmq-server start
-```
+~~~
 
 To launch the example on the host computer execute:
 
-```batch
+~~~batch
 java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.rabbitmq.RMQTopology
> log.txt 2> errorlog.txt
-```
+~~~
 
 The maven assemby plugin creates one jar with all the requiered dependencies. If the project
is in a directory called git then the jar can be found here: (the version number may change
later)
-```batch
-/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar
```
+~~~batch
+/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar
~~~
 
 In the example there are to connectors. One that sends messages to RabbitMQ and one that
receives messages from the same queue. In the logger messages the arriving messages can be
observed in the following format:
 
-```
+~~~
 <DATE> INFO rabbitmq.RMQTopology: String: <one> arrived from RMQ 
 <DATE> INFO rabbitmq.RMQTopology: String: <two> arrived from RMQ
 <DATE> INFO rabbitmq.RMQTopology: String: <three> arrived from RMQ
 <DATE> INFO rabbitmq.RMQTopology: String: <four> arrived from RMQ
 <DATE> INFO rabbitmq.RMQTopology: String: <five> arrived from RMQ
-```
+~~~
 
 #### Apache Kafka
 
 Pull the image:
 
-```batch
+~~~batch
 sudo docker pull flinkstreaming/flink-connectors-kafka 
-```
+~~~
 
 To run the container type:
 
-```batch
+~~~batch
 sudo docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -t -i flinkstreaming/flink-connectors-kafka
-```
+~~~
 
 Now a terminal started running from the image with all the necessary configurations to test
run the Kafka connector. The -p flag binds the localhost's and the Docker container's port
so Kafka can communicate with the application through this.
 First start a zookeeper in the background:
 
-```batch
+~~~batch
 /kafka_2.9.2-0.8.1.1/bin/zookeeper-server-start.sh /kafka_2.9.2-0.8.1.1/config/zookeeper.properties
> zookeeperlog.txt &
-```
+~~~
 
 Then start the kafka server in the background:
 
-```batch
+~~~batch
 /kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /kafka_2.9.2-0.8.1.1/config/server.properties
> serverlog.txt 2> servererr.txt &
-```
+~~~
 
 To launch the example on the host computer execute:
 
-```batch
+~~~batch
 java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.kafka.KafkaTopology
> log.txt 2> errorlog.txt
-```
+~~~
 
 The maven assemby plugin creates one jar with all the requiered dependencies. If the project
is in a directory called git then the jar can be found here: (the version number may change
later)
-```batch
-/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar
```
+~~~batch
+/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar
~~~
 
 In the example there are to connectors. One that sends messages to Kafka and one that receives
messages from the same queue. In the logger messages the arriving messages can be observed
in the following format:
 
-```
+~~~
 <DATE> INFO kafka.KafkaTopology: String: (0) arrived from Kafka
 <DATE> INFO kafka.KafkaTopology: String: (1) arrived from Kafka
 <DATE> INFO kafka.KafkaTopology: String: (2) arrived from Kafka
@@ -922,7 +913,7 @@ In the example there are to connectors. One that sends messages to Kafka
and one
 <DATE> INFO kafka.KafkaTopology: String: (7) arrived from Kafka
 <DATE> INFO kafka.KafkaTopology: String: (8) arrived from Kafka
 <DATE> INFO kafka.KafkaTopology: String: (9) arrived from Kafka
-```
+~~~
 
 #### Apache Flume
 
@@ -930,54 +921,54 @@ At the moment remote access for Flume connectors does not work. This
example can
 
 Pull the image:
 
-```batch
+~~~batch
 sudo docker pull flinkstreaming/flink-connectors-flume
-```
+~~~
 
 To run the container type:
 
-```batch
+~~~batch
 sudo docker run -t -i flinkstreaming/flink-connectors-flume
-```
+~~~
 
 Now a terminal started running from the image with all the necessary configurations to test
run the Flume connector. The -p flag binds the localhost's and the Docker container's port
so flume can communicate with the application through this.
 
 To have the latest version of Flink type:
-```batch
+~~~batch
 cd /git/incubator-flink/
 git pull
-```
+~~~
 
 Then build the code with:
 
-```batch
+~~~batch
 cd /git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/
 mvn install -DskipTests
-```
+~~~
 
 First start the server in the background:
 
-```batch
+~~~batch
 /apache-flume-1.5.0-bin/bin/flume-ng agent --conf conf --conf-file /apache-flume-1.5.0-bin/example.conf
--name a1 -Dflume.root.logger=INFO,console > /flumelog.txt 2> /flumeerr.txt &
-```
+~~~
 
 Then press enter and launch the example with:
 
-```batch
+~~~batch
 java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.flume.FlumeTopology
-```
+~~~
 
 The maven assemby plugin creates one jar with all the requiered dependencies. If the project
is in a directory called git then the jar can be found here: (the version number may change
later)
-```batch
-/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar
```
+~~~batch
+/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar
~~~
 In the example there are to connectors. One that sends messages to Flume and one that receives
messages from the same queue. In the logger messages the arriving messages can be observed
in the following format:
 
-```
+~~~
 <DATE> INFO flume.FlumeTopology: String: <one> arrived from Flume
 <DATE> INFO flume.FlumeTopology: String: <two> arrived from Flume
 <DATE> INFO flume.FlumeTopology: String: <three> arrived from Flume
 <DATE> INFO flume.FlumeTopology: String: <four> arrived from Flume
 <DATE> INFO flume.FlumeTopology: String: <five> arrived from Flume
-```
+~~~
 
 [Back to top](#top)


Mime
View raw message