flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [32/36] flink git commit: [streaming] Updated streaming guide for recent connector and data source changes
Date Wed, 07 Jan 2015 14:13:11 GMT
[streaming] Updated streaming guide for recent connector and data source changes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acd2d601
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acd2d601
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acd2d601

Branch: refs/heads/release-0.8
Commit: acd2d601e89e336c17c87bcb51461be0a4db5340
Parents: fcd28fc
Author: Gyula Fora <gyfora@apache.org>
Authored: Mon Jan 5 18:26:18 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Tue Jan 6 00:22:58 2015 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         | 309 +++----------------
 .../api/scala/StreamExecutionEnvironment.scala  |   6 +-
 2 files changed, 56 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/acd2d601/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index bea0907..c7e7060 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -170,7 +170,9 @@ Usage: `operator.setParallelism(1)`
 
 ### Sources
 
-The user can connect to data streams by the different implementations of `SourceFunction`
using `StreamExecutionEnvironment.addSource(SourceFunction)`. In contrast with other operators,
DataStreamSources have a default operator parallelism of 1 which can be increased using the
`.setParallelism(dop)` method as we will see later at the operator settings.
+The user can connect to data streams by the different implementations of `SourceFunction`
using `StreamExecutionEnvironment.addSource(SourceFunction)`. In contrast with other operators,
DataStreamSources have a default operator parallelism of 1.
+
+To create parallel sources the users source function needs to implement `ParallelSourceFunction`
or extend `RichParallelSourceFunction` in which cases the source will have the parallelism
of the environment. The degree of parallelism for ParallelSourceFunctions can be changed afterwards
using `source.setParallelism(int dop)`.
 
 There are several predefined ones similar to the ones of the batch API and some streaming
specific ones like:
 
@@ -617,7 +619,7 @@ Stream connectors
 
 Connectors provide an interface for accessing data from various third party sources (message
queues). Currently four connectors are natively supported, namely [Apache Kafka](https://kafka.apache.org/),
 [RabbitMQ](http://www.rabbitmq.com/), [Apache Flume](https://flume.apache.org/index.html)
and [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis).
 
-Typically the connector packages consist of an abstract source and sink (with the exception
of Twitter where only a source is provided). The burden of the user is to implement a subclass
of these abstract classes specifying a serializer and a deserializer function. 
+Typically the connector packages consist of a source and sink class (with the exception of
Twitter where only a source is provided). To use these sources the user needs to pass Serialization/Deserialization
schemas for the connectors for the desired types. (Or use some predefined ones)
 
 To run an application using one of these connectors usually additional third party components
are required to be installed and launched, e.g. the servers for the message queues. Further
instructions for these can be found in the corresponding subsections. [Docker containers](#docker-containers-for-connectors)
are also provided encapsulating these services to aid users getting started with connectors.
 
@@ -631,106 +633,40 @@ This connector provides access to data streams from [Apache Kafka](https://kafka
 * If the Kafka zookeeper and server are running on a remote machine then in the config/server.properties
file the advertised.host.name must be set to the machine's IP address.
 
 #### Kafka Source
-An abstract class providing an interface for receiving data from Kafka. By implementing the
user must:
+A class providing an interface for receiving data from Kafka.
 
- * Write a constructor calling the constructor of the abstract class,
- * Write a deserializer function which processes the data coming from Kafka,
- * Stop the source manually when necessary with one of the close functions.
+The followings have to be provided for the `KafkaSource(..)` constructor in order:
 
-The implemented class must extend `KafkaSource`, for example: `KafkaSource<String>`.
+1. The hostname
+2. The group name
+3. The topic name
+4. The parallelism
+5. Deserialisation schema
 
-##### Constructor
-An example of an implementation of a constructor:
 
-~~~java
-public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
-    super(zkQuorum, groupId, topicId, numThreads);
-}
-~~~
-
-##### Deserializer
-An example of an implementation of a deserializer:
+Example:
 
 ~~~java
-@Override
-public String deserialize(byte[] msg) {
-    String s = new String(msg);
-    if(s.equals("q")){
-        closeWithoutSend();
-    }
-    return new String(s);
-}
+DataStream<String> stream = env
+	.addSource(new KafkaSource<String>("localhost:2181", "group", "test",new SimpleStringSchema()))
+	.print();
 ~~~
 
-The source closes when it receives the String `"q"`.
-
-###### Close<a name="kafka_source_close"></a>
-Two types of close functions are available, namely `closeWithoutSend()` and `sendAndClose()`.
The former closes the connection immediately and no further data will be sent, while the latter
closes the connection only when the next message is sent after this call.
-
-In the example provided `closeWithoutSend()` is used because here the String `"q"` is meta-message
indicating the end of the stream and there is no need to forward it. 
-
 #### Kafka Sink
-An abstract class providing an interface for sending data to Kafka. By implementing the user
must:
-
- * Write a constructor calling the constructor of the abstract class,
- * Write a serializer function to send data in the desired form to Kafka,
- * Stop the sink manually when necessary with one of the close functions.
-
-The implemented class must extend `KafkaSink`, for example `KafkaSink<String, String>`.
-
-##### Constructor
-An example of an implementation of a constructor:
-
-~~~java
-public MyKafkaSink(String topicId, String brokerAddr) {
-    super(topicId, brokerAddr);
-}
-~~~
-
-##### Serializer
-An example of an implementation of a serializer:
-
-~~~java
-@Override
-public String serialize(String tuple) {
-    if(tuple.equals("q")){
-        sendAndClose();
-    }
-    return tuple;
-}
-~~~
+A class providing an interface for sending data to Kafka. 
 
-##### Close
-The API provided is the [same](#kafka_source_close) as the one for `KafkaSource`.
+The followings have to be provided for the `KafkaSink()` constructor in order:
 
-#### Building A Topology
-To use a Kafka connector as a source in Flink call the `addSource()` function with a new
instance of the class which extends `KafkaSource` as parameter:
-
-~~~java
-DataStream<String> stream1 = env.
-    addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
-    .print();
-~~~
-
-The followings have to be provided for the `MyKafkaSource()` constructor in order:
-
-1. The hostname
-2. The group name
-3. The topic name
-4. The parallelism
+1. The topic name
+2. The hostname
+3. Serialisation schema
 
-Similarly to use a Kafka connector as a sink in Flink call the `addSink()` function with
a new instance of the class which extends `KafkaSink`:
+Example: 
 
 ~~~java
-DataStream<String> stream2 = env
-    .addSource(new MySource())
-    .addSink(new MyKafkaSink("test", "localhost:9092"));
+stream.addSink(new KafkaSink<String, String>("test", "localhost:9092", new SimpleStringSchema()));
 ~~~
 
-The followings have to be provided for the `MyKafkaSink()` constructor in order:
-
-1. The topic name
-2. The hostname
 
 More about Kafka can be found [here](https://kafka.apache.org/documentation.html).
 
@@ -741,114 +677,40 @@ More about Kafka can be found [here](https://kafka.apache.org/documentation.html
 This connector provides access to datastreams from [Apache Flume](http://flume.apache.org/).
 
 #### Installing Apache Flume
-[Download](http://flume.apache.org/download.html) Apache Flume. A configuration file is required
for starting agents in Flume. A configuration file for running the example can be found [here](#config_file).

+[Download](http://flume.apache.org/download.html) Apache Flume. A configuration file is required
for starting agents in Flume. A configuration file for running the example can be found [here](#config_file).
 
 #### Flume Source
-An abstract class providing an interface for receiving data from Flume. By implementing the
user must:
+A class providing an interface for receiving data from Flume.
 
- * Write a constructor calling the constructor of the abstract class,
- * Write a deserializer function which processes the data coming from Flume,
- * Stop the source manually when necessary with one of the close functions.
+The followings have to be provided for the `FlumeSource(…)` constructor in order:
 
-The implemented class must extend `FlumeSource` for example: `FlumeSource<String>`
-
-##### Constructor
-An example of an implementation of a constructor:
-
-~~~java
-MyFlumeSource(String host, int port) {
-    super(host, port);
-}
-~~~
+1. The hostname
+2. The port number
+3. Deserialisation schema
 
-##### Deserializer
-An example of an implementation of a deserializer:
+Example:
 
 ~~~java
-@Override
-public String deserialize(byte[] msg) {
-    String s = (String) SerializationUtils.deserialize(msg);
-    String out = s;
-    if (s.equals("q")) {
-        closeWithoutSend();
-    }
-    return out;
-}
+DataStream<String> stream = env
+	.addSource(new FlumeSource<String>("localhost", 41414, new SimpleStringSchema()))
+	.print();
 ~~~
 
-The source closes when it receives the String `"q"`.
-
-##### Close<a name="flume_source_close"></a>
-Two types of close functions are available, namely `closeWithoutSend()` and `sendAndClose()`.The
former closes the connection immediately and no further data will be sent, while the latter
closes the connection only when the next message is sent after this call.
-
-In the example `closeWithoutSend()` is used because here the String `"q"` is meta-message
indicating the end of the stream and there is no need to forward it. 
-
 #### Flume Sink
-An abstract class providing an interface for sending data to Flume. By implementing the user
must:
-
-* Write a constructor calling the constructor of the abstract class,
-* Write a serializer function to send data in the desired form to Flume,
-* Stop the sink manually when necessary with one of the close functions.
+A class providing an interface for sending data to Flume. 
 
-The implemented class must extend `FlumeSink`, for example `FlumeSink<String, String>`.
-
-##### Constructor
-An example of an implementation of a constructor:
-
-~~~java
-public MyFlumeSink(String host, int port) {
-    super(host, port);
-}
-~~~
-
-##### Serializer
-An example of an implementation of a serializer.
-
-~~~java
-@Override
-public byte[] serialize(String tuple) {
-    if (tuple.equals("q")) {
-        try {
-            sendAndClose();
-        } catch (Exception e) {
-            new RuntimeException("Error while closing Flume connection with " + port + "
at "
-                + host, e);
-        }
-    }
-    return SerializationUtils.serialize(tuple);
-}
-~~~
-
-##### Close
-The API provided is the [same](#flume_source_close) as the one for `FlumeSource`.
-
-#### Building A Topology
-To use a Flume connector as a source in Flink call the `addSource()` function with a new
instance of the class which extends `FlumeSource` as parameter:
-
-~~~java
-DataStream<String> dataStream1 = env
-    .addSource(new MyFlumeSource("localhost", 41414))
-    .print();
-~~~
-
-The followings have to be provided for the `MyFlumeSource()` constructor in order:
+The followings have to be provided for the `FlumeSink(…)` constructor in order:
 
 1. The hostname
 2. The port number
+3. Serialisation schema
 
-Similarly to use a Flume connector as a sink in Flink call the `addSink()` function with
a new instance of the class which extends `FlumeSink`
+Example: 
 
 ~~~java
-DataStream<String> dataStream2 = env
-    .fromElements("one", "two", "three", "four", "five", "q")
-    .addSink(new MyFlumeSink("localhost", 42424));
+stream.addSink(new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
 ~~~
 
-The followings have to be provided for the `MyFlumeSink()` constructor in order:
-
-1. The hostname
-2. The port number
-
 ##### Configuration file<a name="config_file"></a>
 An example of a configuration file:
 
@@ -890,107 +752,38 @@ This connector provides access to datastreams from [RabbitMQ](http://www.rabbitm
 Follow the instructions from the [RabbitMQ download page](http://www.rabbitmq.com/download.html).
After the installation the server automatically starts and the application connecting to RabbitMQ
can be launched.
 
 #### RabbitMQ Source
-An abstract class providing an interface for receiving data from RabbitMQ. By implementing
the user must:
-
-* Write a constructor calling the constructor of the abstract class,
-* Write a deserializer function which processes the data coming from RabbitMQ,
-* Stop the source manually when necessary with one of the close functions.
 
-The implemented class must extend `RabbitMQSource` for example: `RabbitMQSource<String>`
+A class providing an interface for receiving data from RabbitMQ.
 
-##### Constructor
-An example of an implementation of a constructor:
+The followings have to be provided for the `RMQSource(…)` constructor in order:
 
-~~~java
-public MyRMQSource(String HOST_NAME, String QUEUE_NAME) {
-    super(HOST_NAME, QUEUE_NAME);
-}
-~~~
+1. The hostname
+2. The queue name
+3. Deserialisation schema
 
-##### Deserializer
-An example of an implemetation of a deserializer:
+Example:
 
 ~~~java
-@Override
-public String deserialize(byte[] t) {
-    String s = (String) SerializationUtils.deserialize(t);
-    String out = s;
-    if (s.equals("q")) {
-        closeWithoutSend();
-    }
-    return out;
-}
+DataStream<String> stream = env
+	.addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema()))
+	.print();
 ~~~
 
-The source closes when it receives the String `"q"`.
-
-##### Close<a name="rmq_source_close"></a>
-Two types of close functions are available, namely `closeWithoutSend()` and `sendAndClose()`.
The former closes the connection immediately and no further data will be sent, while the latter
closes the connection only when the next message is sent after this call.
-
-Closes the connection only when the next message is sent after this call.
-
-In the example `closeWithoutSend()` is used because here the String `"q"` is meta-message
indicating the end of the stream and there is no need to forward it. 
-
 #### RabbitMQ Sink
-An abstract class providing an interface for sending data to RabbitMQ. By implementing the
user must:
-
-* Write a constructor calling the constructor of the abstract class
-* Write a serializer function to send data in the desired form to RabbitMQ
-* Stop the sink manually when necessary with one of the close functions
-
-The implemented class must extend `RabbitMQSink` for example: `RabbitMQSink<String, String>`
-
-##### Constructor
-An example of an implementation of a constructor:
-
-~~~java
-public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
-    super(HOST_NAME, QUEUE_NAME);
-}
-~~~
+A class providing an interface for sending data to RabbitMQ. 
 
-##### Serializer
-An example of an implementation of a serializer.
-
-~~~java
-@Override
-public byte[] serialize(Tuple tuple) {
-    if (t.getField(0).equals("q")) {
-        sendAndClose();
-    }
-    return SerializationUtils.serialize(tuple.f0);
-}
-~~~
-
-##### Close
-The API provided is the [same](#rmq_source_close) as the one for `RabbitMQSource`.
-
-#### Building A Topology
-To use a RabbitMQ connector as a source in Flink call the `addSource()` function with a new
instance of the class which extends `RabbitMQSource` as parameter:
-
-~~~java
-DataStream<String> dataStream1 = env
-    .addSource(new MyRMQSource("localhost", "hello"))
-    .print();
-~~~
-
-The followings have to be provided for the `MyRabbitMQSource()` constructor in order:
+The followings have to be provided for the `RMQSink(…)` constructor in order:
 
 1. The hostname
 2. The queue name
+3. Serialisation schema
 
-Similarly to use a RabbitMQ connector as a sink in Flink call the `addSink()` function with
a new instance of the class which extends `RabbitMQSink`
+Example: 
 
 ~~~java
-DataStream<String> dataStream2 = env
-    .fromElements("one", "two", "three", "four", "five", "q")
-    .addSink(new MyRMQSink("localhost", "hello"));
+stream.addSink(new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
 ~~~
 
-The followings have to be provided for the `MyRabbitMQSink()` constructor in order:
-
-1. The hostname
-1. The queue name
 
 More about RabbitMQ can be found [here](http://www.rabbitmq.com/).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/acd2d601/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index cf05627..eb34b80 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -151,7 +151,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
    * Create a DataStream using a user defined source function for arbitrary
-   * source functionality.
+   * source functionality. By default sources have a parallelism of 1. 
+   * To enable parallel execution, the user defined source should implement 
+   * ParallelSourceFunction or extend RichParallelSourceFunction. 
+   * In these cases the resulting source will have the parallelism of the environment. 
+   * To change this afterwards call DataStreamSource.setParallelism(int)
    *
    */
   def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T]
= {


Mime
View raw message