flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [3/3] flink git commit: [FLINK-1429] [streaming] Scala documentation and minor Scala API features
Date Sun, 08 Mar 2015 19:33:00 GMT
[FLINK-1429] [streaming] Scala documentation and minor Scala API features


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c9ab85e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c9ab85e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c9ab85e

Branch: refs/heads/master
Commit: 8c9ab85e7e62a1a4ea58e09d3790bddd74d25832
Parents: fb62f6b
Author: Gábor Hermann <reckoner42@gmail.com>
Authored: Sun Mar 8 16:41:54 2015 +0100
Committer: Gábor Hermann <reckoner42@gmail.com>
Committed: Sun Mar 8 20:24:41 2015 +0100

----------------------------------------------------------------------
 docs/programming_guide.md                       |   2 +-
 docs/streaming_guide.md                         | 399 +++++++++++++++----
 .../api/scala/ConnectedDataStream.scala         |  35 +-
 .../flink/streaming/api/scala/DataStream.scala  |  50 ++-
 .../streaming/api/scala/SplitDataStream.scala   |   2 -
 5 files changed, 390 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c9ab85e/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index efedc1b..c6c5bd1 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -2222,7 +2222,7 @@ val initial = env.fromElements(0)
 val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
   val result = iterationInput.map { i => 
     val x = Math.random()
-    val y = Math.randon()
+    val y = Math.random()
     i + (if (x * x + y * y < 1) 1 else 0)
   }
   result

http://git-wip-us.apache.org/repos/asf/flink/blob/8c9ab85e/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 0fb7dac..7a5835e 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -414,7 +414,7 @@ For example `dataStream.window(Count.of(100)).maxBy(field)` would create
global
 
 ### Temporal database style operators
 
-While database style operators like joins (on key) and crosses are hard to define properly
on data streams, a straight forward implementation is to apply these operators on windows
of the data streams. 
+While database style operators like joins (on key) and crosses are hard to define properly
on data streams, a straightforward implementation is to apply these operators on windows of
the data streams. 
 
 Currently join and cross operators are supported on time windows.
 
@@ -422,31 +422,53 @@ The Join transformation produces a new Tuple DataStream with two fields.
Each tu
 
 The following code shows a default Join transformation using field position keys:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 dataStream1.join(dataStream2)
     .onWindow(windowing_params)
     .where(key_in_first)
     .equalTo(key_in_second);
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+dataStream1.join(dataStream2)
+    .onWindow(windowing_params)
+    .where(key_in_first)
+    .equalTo(key_in_second)
+{% endhighlight %}
+</div>
+</div>
 
-The Cross transformation combines two DataStreams into one DataStreams. It builds all pairwise
combinations of the elements of both input DataStreams in the current window, i.e., it builds
a temporal Cartesian product.
+The Cross transformation combines two DataStreams into one DataStream. It builds all pairwise
combinations of the elements of both input DataStreams in the current window, i.e., it builds
a temporal Cartesian product.
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 dataStream1.cross(dataStream2).onWindow(windowing_params);
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+dataStream1 cross dataStream2 onWindow (windowing_params)
+{% endhighlight %}
+</div>
+</div>
 
-Please note that this is currently not integrated with the windowing semantics, integration
is work in progress.
 
 ### Co operators
 
-Co operators allow the users to jointly transform two `DataStreams` of different types providing
a simple way to jointly manipulate a shared state. It is designed to support joint stream
transformations where merging is not appropriate due to different data types or in case the
user needs explicit tracking of the joined stream origin.
-Co operators can be applied to `ConnectedDataStreams` which represent two `DataStreams` of
possibly different types. A `ConnectedDataStream` can be created by calling the `connect(otherDataStream)`
method of a `DataStream`. Please note that the two connected `DataStreams` can also be merged
data streams.
+Co operators allow the users to jointly transform two DataStreams of different types providing
a simple way to jointly manipulate a shared state. It is designed to support joint stream
transformations where merging is not appropriate due to different data types or in case the
user needs explicit tracking of the joined stream origin.
+Co operators can be applied to ConnectedDataStreams which represent two DataStreams of possibly
different types. A ConnectedDataStream can be created by calling the `connect(otherDataStream)`
method of a DataStream. Please note that the two connected DataStreams can also be merged
data streams.
 
 #### Map on ConnectedDataStream
 Applies a CoMap transformation on two separate DataStreams, mapping them to a common output
type. The transformation calls a `CoMapFunction.map1()` for each element of the first input
and `CoMapFunction.map2()` for each element of the second input. Each CoMapFunction call returns
exactly one element.
 A CoMap operator that outputs true if an Integer value is received and false if a String
value is received:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 DataStream<Integer> dataStream1 = ...
 DataStream<String> dataStream2 = ...
         
@@ -463,29 +485,63 @@ dataStream1.connect(dataStream2)
                 return false;
             }
         })
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val dataStream1 : DataStream[Int] = ...
+val dataStream2 : DataStream[String] = ...
+
+(dataStream1 connect dataStream2)
+  .map(
+    (_ : Int) => true,
+    (_ : String) => false
+  )
+{% endhighlight %}
+</div>
+</div>
 
 #### FlatMap on ConnectedDataStream
 The FlatMap operator for the `ConnectedDataStream` works similarly to CoMap, but instead
of returning exactly one element after each map call the user can output arbitrarily many
values using the Collector interface. 
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 DataStream<Integer> dataStream1 = ...
 DataStream<String> dataStream2 = ...
         
 dataStream1.connect(dataStream2)
-    .flatMap(new CoFlatMapFunction<Integer, String, Boolean>() {
+    .flatMap(new CoFlatMapFunction<Integer, String, String>() {
 
             @Override
-            public void flatMap1(Integer value, Collector<Boolean> out) {
-                out.collect(true);
+            public void flatMap1(Integer value, Collector<String> out) {
+                out.collect(value.toString());
             }
 
             @Override
-            public void flatMap2(String value, Collector<Boolean> out) {
-                out.collect(false);
+            public void flatMap2(String value, Collector<String> out) {
+                for (String word: value.split(" ")) {
+                  out.collect(word);
+                }
             }
         })
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val dataStream1 : DataStream[Int] = ...
+val dataStream2 : DataStream[String] = ...
+
+dataStream2.flatMap((str : String) => str.split(" "))
+
+(dataStream1 connect dataStream2)
+  .flatMap(
+    (num : Int) => num.toString,
+    (str : String) => str.split(" ")
+  )
+{% endhighlight %}
+</div>
+</div>
 
 #### WindowReduce on ConnectedDataStream
 The windowReduce operator applies a user defined `CoWindowFunction` to time aligned windows
of the two data streams and return zero or more elements of an arbitrary type. The user can
define the window and slide intervals and can also implement custom timestamps to be used
for calculating windows.
@@ -493,31 +549,32 @@ The windowReduce operator applies a user defined `CoWindowFunction`
to time alig
 #### Reduce on ConnectedDataStream
 The Reduce operator for the `ConnectedDataStream` applies a simple reduce transformation
on the joined data streams and then maps the reduced elements to a common output type.
 
+<div class="codetabs" markdown="1">
 ### Output splitting
+<div data-lang="java" markdown="1">
 
 Most data stream operators support directed outputs (output splitting), meaning that different
output elements are sent only to specific outputs. The outputs are referenced by their name
given at the point of receiving:
 
-~~~java
+{% highlight java %}
 SplitDataStream<Integer> split = someDataStream.split(outputSelector);
 DataStream<Integer> even = split.select("even");
 DataStream<Integer> odd = split.select("odd");
-~~~
-
-In the above example the data stream named ‘even’ will only contain elements that are
directed to the output named “even”. The user can of course further transform these new
stream by for example squaring only the even elements.
+{% endhighlight %}
+In the above example the data stream named “even” will only contain elements that are
directed to the output named “even”. The user can of course further transform these new
stream by for example squaring only the even elements.
 
 Data streams only receive the elements directed to selected output names. The user can also
select multiple output names by `splitStream.select(“output1”, “output2”, …)`. It
is common that a stream listens to all the outputs, so `split.selectAll()` provides this functionality
without having to select all names.
 
 The outputs of an operator are directed by implementing a selector function (implementing
the `OutputSelector` interface):
 
-~~~java
+{% highlight java %}
 Iterable<String> select(OUT value);
-~~~
+{% endhighlight %}
 
 The data is sent to all the outputs returned in the iterable (referenced by their name).
This way the direction of the outputs can be determined by the value of the data sent. 
 
 For example to split even and odd numbers:
 
-~~~java
+{% highlight java %}
 @Override
 Iterable<String> select(Integer value) {
 
@@ -531,91 +588,177 @@ Iterable<String> select(Integer value) {
 
     return outputs;
 }
-~~~
+{% endhighlight %}
 
 Every output will be emitted to the selected outputs exactly once, even if you add the same
output names more than once.
+</div>
+<div data-lang="scala" markdown="1">
 
+Most data stream operators support directed outputs (output splitting), meaning that different
output elements are sent only to specific outputs. The outputs are referenced by their name
given at the point of receiving:
+
+{% highlight scala %}
+val split = someDataStream.split(
+  (num: Int) =>
+    (num % 2) match {
+      case 0 => "even"
+      case 1 => "odd"
+    }
+)
+
+val even = split select "even" 
+val odd = split select "odd"
+{% endhighlight %}
+
+In the above example the data stream named “even” will only contain elements that are
directed to the output named “even”. The user can of course further transform these new
stream by for example squaring only the even elements.
+
+Data streams only receive the elements directed to selected output names. The user can also
select multiple output names by `splitStream.select(“output1”, “output2”, …)`. It
is common that a stream listens to all the outputs, so `split.selectAll` provides this functionality
without having to select all names.
+
+The outputs of an operator are directed by implementing a function that returns the output
names for the value. The data is sent to all the outputs returned by the function (referenced
by their name). This way the direction of the outputs can be determined by the value of the
data sent.
+
+Every output will be emitted to the selected outputs exactly once, even if you add the same
output names more than once.
+</div>
+
+</div>
+
+<div class="codetabs" markdown="1">
 ### Iterations
+<div data-lang="java" markdown="1">
 The Flink Streaming API supports implementing iterative stream processing dataflows similarly
to the core Flink API. Iterative streaming programs also implement a step function and embed
it into an `IterativeDataStream`.
-Unlike in the core API the user does not define the maximum number of iterations, but at
the tail of each iteration part of the output is streamed forward to the next operator and
part is streamed back to the iteration head. The user controls the output of the iteration
tail using [output splitting](#output-splitting).
+Unlike in the core API the user does not define the maximum number of iterations, but at
the tail of each iteration part of the output is streamed forward to the next operator and
part is streamed back to the iteration head. The user controls the output of the iteration
tail using [output splitting](#output-splitting) or [filters](#filter).
 To start an iterative part of the program the user defines the iteration starting point:
 
-~~~java
+{% highlight java %}
 IterativeDataStream<Integer> iteration = source.iterate(maxWaitTimeMillis);
-~~~
+{% endhighlight %}
+
 The operator applied on the iteration starting point is the head of the iteration, where
data is fed back from the iteration tail.
 
-~~~java
+{% highlight java %}
 DataStream<Integer> head = iteration.map(new IterationHead());
-~~~
+{% endhighlight %}
 
-To close an iteration and define the iteration tail, the user calls `.closeWith(iterationTail)`
method of the `IterativeDataStream`.
+To close an iteration and define the iteration tail, the user calls `.closeWith(iterationTail)`
method of the `IterativeDataStream`. This iteration tail (the DataStream given to the `closeWith`
function) will be fed back to the iteration head. A common pattern is to use [filters](#filter)
to separate the output of the iteration from the feedback-stream.
 
-A common pattern is to use output splitting:
+{% highlight java %}
+DataStream<Integer> tail = head.map(new IterationTail());
 
-~~~java
-SplitDataStream<..> tailOperator = head.map(new IterationTail()).split(outputSelector);
-iteration.closeWith(tailOperator.select("iterate"));
-~~~ 
+iteration.closeWith(tail.filter(isFeedback));
+
+DataStream<Integer> output = tail.filter(isOutput);
 
-In these case all output directed to the “iterate” edge would be fed back to the iteration
head.
+output.map(…).project(…)…
+{% endhighlight %}
+
+In this case all values passing the `isFeedback` filter will be fed back to the iteration
head, and the values passing the `isOutput` filter will produce the output of the iteration
that can be transformed further (here with a `map` and a `projection`) outside the iteration.
 
 Because iterative streaming programs do not have a set number of iterations for each data
element, the streaming program has no information on the end of its input. From this it follows
that iterative streaming programs run until the user manually stops the program. While this
is acceptable under normal circumstances a method is provided to allow iterative programs
to shut down automatically if no input received by the iteration head for a predefined number
of milliseconds.
 To use this functionality the user needs to add the maxWaitTimeMillis parameter to the `dataStream.iterate(…)`
call to control the max wait time. 
+</div>
+<div data-lang="scala" markdown="1">
+The Flink Streaming API supports implementing iterative stream processing dataflows similarly
to the core Flink API. Iterative streaming programs also implement a step function and embed
it into an `IterativeDataStream`.
+Unlike in the core API the user does not define the maximum number of iterations, but at
the tail of each iteration part of the output is streamed forward to the next operator and
part is streamed back to the iteration head. The user controls the output of the iteration
tail by defining a step function that return two DataStreams: a feedback and an output. The
first one is the output that will be fed back to the start of the iteration and the second
is the output stream of the iterative part.
+
+A common pattern is to use [filters](#filter) to separate the output from the feedback-stream.
In this case all values passing the `isFeedback` filter will be fed back to the iteration
head, and the values passing the `isOutput` filter will produce the output of the iteration
that can be transformed further (here with a `map` and a `projection`) outside the iteration.
+
+{% highlight scala %}
+val iteratedStream = someDataStream.iterate(maxWaitTime) {
+  iteration => {
+    val head = iteration.map(iterationHead)
+    val tail = head.map(iterationTail)
+    (tail.filter(isFeedback), tail.filter(isOutput))
+  }
+}.map(…).project(…)…
+{% endhighlight %}
+
+Because iterative streaming programs do not have a set number of iterations for each data
element, the streaming program has no information on the end of its input. From this it follows
that iterative streaming programs run until the user manually stops the program. While this
is acceptable under normal circumstances a method is provided to allow iterative programs
to shut down automatically if no input received by the iteration head for a predefined number
of milliseconds.
+To use this functionality the user needs to add the maxWaitTimeMillis parameter to the `dataStream.iterate(…)`
call to control the max wait time. 
+</div>
+
+</div>
 
 ### Rich functions
 The usage of rich functions are essentially the same as in the core Flink API. All transformations
that take as argument a user-defined function can instead take a rich function as argument:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 dataStream.map(new RichMapFunction<Integer, String>() {
-  public String map(Integer value) { return value.toString(); }
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        /* initialization of function */
+    }
+
+    @Override
+    public String map(Integer value) { return value.toString(); }
 });
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+dataStream map
+  new RichMapFunction[Int, String] {
+    override def open(config: Configuration) = {
+      /* initialization of function */
+    }
+    override def map(value: Int): String = value.toString
+  }
+{% endhighlight %}
+</div>
+</div>
 
-Rich functions provide, in addition to the user-defined function (`map()`, `reduce()`, etc),
the `open()` and `close()` methods for initialization and finalization. (In contrast to the
core API, the streaming API currently does not support the  `getRuntimeContext()` and `setRuntimeContext()`
methods.)
+Rich functions provide, in addition to the user-defined function (`map()`, `reduce()`, etc),
the `open()` and `close()` methods for initialization and finalization.
 
 [Back to top](#top)
 
-### Lambda expressions with Java 8
+Lambda expressions with Java 8
+------------
 
 For a more consice code one can rely on one of the main feautere of Java 8, lambda expressions.
The following program has similar functionality to the one provided in the [example](#example-program)
section, while showcasing the usage of lambda expressions.
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java8" markdown="1">
+{% highlight java %}
 public class StreamingWordCount {
     public static void main(String[] args) throws Exception {
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-	    DataStream<String> text = env.fromElements(
+        DataStream<String> text = env.fromElements(
                 "Who's there?",
                 "I think I hear them. Stand, ho! Who's there?");
 
             DataStream<Tuple2<String, Integer>> counts = 
-		// normalize and split each line
-		text.map(line -> line.toLowerCase().split("\\W+"))
-		// convert splitted line in pairs (2-tuples) containing: (word,1)
-		.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
-		// emit the pairs with non-zero-length words
-			Arrays.stream(tokens)
-				.filter(t -> t.length() > 0)
-				.forEach(t -> out.collect(new Tuple2<>(t, 1)));
-		})
-		// group by the tuple field "0" and sum up tuple field "1"
-		.groupBy(0)
-		.sum(1);
+        // normalize and split each line
+        text.map(line -> line.toLowerCase().split("\\W+"))
+        // convert splitted line in pairs (2-tuples) containing: (word,1)
+        .flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) ->
{
+        // emit the pairs with non-zero-length words
+            Arrays.stream(tokens)
+                .filter(t -> t.length() > 0)
+                .forEach(t -> out.collect(new Tuple2<>(t, 1)));
+        })
+        // group by the tuple field "0" and sum up tuple field "1"
+        .groupBy(0)
+        .sum(1);
 
         counts.print();
 
         env.execute("Streaming WordCount");
     }
 }
-~~~
+{% endhighlight %}
+</div>
+</div>
 
 For a detailed Java 8 Guide please refer to the [Java 8 Programming Guide](java8_programming_guide.html).
Operators specific to streaming, such as Operator splitting also support this usage. [Output
splitting](#output-splitting) can be rewritten as follows:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java8" markdown="1">
+{% highlight java %}
 SplitDataStream<Integer> split = someDataStream
     .split(x -> Arrays.asList(String.valueOf(x % 2)));
-~~~
+{% endhighlight %}
+</div>
+</div>
 
 Operator Settings
 ----------------
@@ -631,12 +774,24 @@ To tackle this issue the user can call `env.setBufferTimeout(timeoutMillis)`
on
 
 Usage:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
 env.setBufferTimeout(timeoutMillis);
 
 env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment
+env.setBufferTimeout(timeoutMillis)
+
+env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
+{% endhighlight %}
+</div>
+</div>
 
 To maximise the throughput the user can call `setBufferTimeout(-1)` which will remove the
timeout and buffers will only be flushed when they are full.
 To minimise latency, set the timeout to a value close to 0 (fro example 5 or 10 ms). Theoretically
a buffer timeout of 0 will cause all outputs to be flushed when produced, but this setting
should be avoided because it can cause severe performance degradation.
@@ -676,11 +831,22 @@ The followings have to be provided for the `KafkaSource(…)` constructor
in ord
 
 Example:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 DataStream<String> stream = env
-	.addSource(new KafkaSource<String>("localhost:2181", "group", "test",new SimpleStringSchema()))
+	.addSource(new KafkaSource<String>("localhost:2181", "group", "test", new SimpleStringSchema()))
 	.print();
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream = env
+    .addSource(new KafkaSource[String]("localhost:2181", "group", "test", new SimpleStringSchema)
+    .print
+{% endhighlight %}
+</div>
+</div>
 
 #### Kafka Sink
 A class providing an interface for sending data to Kafka. 
@@ -693,10 +859,18 @@ The followings have to be provided for the `KafkaSink()` constructor
in order:
 
 Example: 
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 stream.addSink(new KafkaSink<String, String>("test", "localhost:9092", new SimpleStringSchema()));
-~~~
-
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream.addSink(new KafkaSink[String, String]("test", "localhost:9092", new SimpleStringSchema))
+{% endhighlight %}
+</div>
+</div>
 
 More about Kafka can be found [here](https://kafka.apache.org/documentation.html).
 
@@ -720,11 +894,22 @@ The followings have to be provided for the `FlumeSource(…)` constructor
in ord
 
 Example:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 DataStream<String> stream = env
 	.addSource(new FlumeSource<String>("localhost", 41414, new SimpleStringSchema()))
 	.print();
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream = env
+    .addSource(new FlumeSource[String]("localhost", 41414, new SimpleStringSchema))
+    .print
+{% endhighlight %}
+</div>
+</div>
 
 #### Flume Sink
 A class providing an interface for sending data to Flume. 
@@ -737,9 +922,18 @@ The followings have to be provided for the `FlumeSink(…)` constructor
in order
 
 Example: 
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 stream.addSink(new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream.addSink(new FlumeSink[String]("localhost", 42424, new StringToByteSerializer))
+{% endhighlight %}
+</div>
+</div>
 
 ##### Configuration file<a name="config_file"></a>
 An example of a configuration file:
@@ -793,11 +987,22 @@ The followings have to be provided for the `RMQSource(…)` constructor
in order
 
 Example:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 DataStream<String> stream = env
-	.addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema()))
-	.print();
-~~~
+	.addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema))
+	.print
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream = env
+    .addSource(new RMQSource[String]("localhost", "hello", new SimpleStringSchema))
+    .print
+{% endhighlight %}
+</div>
+</div>
 
 #### RabbitMQ Sink
 A class providing an interface for sending data to RabbitMQ. 
@@ -810,10 +1015,18 @@ The followings have to be provided for the `RMQSink(…)` constructor
in order:
 
 Example: 
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 stream.addSink(new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
-~~~
-
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream.addSink(new RMQSink[String]("localhost", "hello", new StringToByteSerializer))
+{% endhighlight %}
+</div>
+</div>
 
 More about RabbitMQ can be found [here](http://www.rabbitmq.com/).
 
@@ -855,16 +1068,34 @@ Both constructors expect a `String authPath` argument determining the
location o
 #### Usage
 In constract to other connecters the `TwitterSource` depends on no additional services. For
example the following code should run gracefully:
 
-~~~java
-DataStream<String> streamSource = env.AddSource(new TwitterSource("/PATH/TO/myFile.properties"));
-~~~
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"))
+{% endhighlight %}
+</div>
+</div>
 
 The `TwitterSource` emits strings containing a JSON code. 
 To retrieve information from the JSON code you can add a FlatMap or a Map function handling
JSON code. For example use an implementation `JSONParseFlatMap` abstract class among the examples.
`JSONParseFlatMap` is an extension of the `FlatMapFunction` and has a
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 String getField(String jsonText, String field);
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+getField(jsonText : String, field : String) : String
+{% endhighlight %}
+</div>
+</div>
 
 function which can be use to acquire the value of a given field. 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8c9ab85e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
index d60e796..9363236 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
@@ -20,18 +20,17 @@ package org.apache.flink.streaming.api.scala
 
 import java.util
 
-import scala.collection.JavaConversions.asScalaBuffer
-import scala.reflect.ClassTag
-
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream}
-import org.apache.flink.streaming.api.function.co.{ CoFlatMapFunction, CoMapFunction, CoReduceFunction,
CoWindowFunction }
-import org.apache.flink.streaming.api.invokable.operator.co.{ CoFlatMapInvokable, CoMapInvokable,
CoReduceInvokable }
+import org.apache.flink.streaming.api.function.co.{CoFlatMapFunction, CoMapFunction, CoReduceFunction,
CoWindowFunction}
+import org.apache.flink.streaming.api.invokable.operator.co.{CoFlatMapInvokable, CoMapInvokable,
CoReduceInvokable}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.util.Collector
 
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.reflect.ClassTag
+
 class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
 
   /**
@@ -130,6 +129,30 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2])
{
   }
 
   /**
+   * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
+   * maps the output to a common type. The transformation calls a
+   * @param fun1 for each element of the first input
+   * and @param fun2 for each element of the second
+   * input. Each CoFlatMapFunction call returns any number of elements
+   * including none.
+   *
+   * @return The transformed { @link DataStream}
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun1: IN1 => TraversableOnce[R],
+      fun2: IN2 => TraversableOnce[R]): DataStream[R] = {
+    if (fun1 == null || fun2 == null) {
+      throw new NullPointerException("FlatMap functions must not be null.")
+    }
+    val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
+      val cleanFun1 = clean(fun1)
+      val cleanFun2 = clean(fun2)
+      def flatMap1(value: IN1, out: Collector[R]) = { cleanFun1(value) foreach out.collect
}
+      def flatMap2(value: IN2, out: Collector[R]) = { cleanFun2(value) foreach out.collect
}
+    }
+    flatMap(flatMapper)
+  }
+
+  /**
    * GroupBy operation for connected data stream. Groups the elements of
    * input1 and input2 according to keyPosition1 and keyPosition2. Used for
    * applying function on grouped data streams for example

http://git-wip-us.apache.org/repos/asf/flink/blob/8c9ab85e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index d4df1d6..b673f25 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
   SingleOutputStreamOperator, GroupedDataStream}
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.functions.MapFunction
@@ -183,7 +184,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   /**
    * Initiates an iterative part of the program that creates a loop by feeding
    * back data streams. To create a streaming iteration the user needs to define
-   * a transformation that creates two DataStreams.The first one one is the output
+   * a transformation that creates two DataStreams. The first one is the output
    * that will be fed back to the start of the iteration and the second is the output
    * stream of the iterative part.
    * <p>
@@ -198,8 +199,30 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    *
    */
-  def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),  
-        maxWaitTimeMillis:Long = 0): DataStream[R] = {
+  def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R])): DataStream[R]
= {
+    iterate(0)(stepFunction)
+  }
+
+  /**
+   * Initiates an iterative part of the program that creates a loop by feeding
+   * back data streams. To create a streaming iteration the user needs to define
+   * a transformation that creates two DataStreams. The first one is the output
+   * that will be fed back to the start of the iteration and the second is the output
+   * stream of the iterative part.
+   * <p>
+   * stepfunction: initialStream => (feedback, output)
+   * <p>
+   * A common pattern is to use output splitting to create feedback and output DataStream.
+   * Please refer to the .split(...) method of the DataStream
+   * <p>
+   * By default a DataStream with iteration will never terminate, but the user
+   * can use the maxWaitTime parameter to set a max waiting time for the iteration head.
+   * If no data received in the set time the stream terminates.
+   *
+   *
+   */
+  def iterate[R](maxWaitTimeMillis:Long = 0)(stepFunction: DataStream[T] => (DataStream[T],
DataStream[R]))
+        : DataStream[R] = {
     val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
 
     val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
@@ -472,18 +495,35 @@ class DataStream[T](javaStream: JavaStream[T]) {
    */
   def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream.split(selector)
 
+//  /**
+//   * Creates a new SplitDataStream that contains only the elements satisfying the
+//   *  given output selector predicate.
+//   */
+//  def split(fun: T => String): SplitDataStream[T] = {
+//    if (fun == null) {
+//      throw new NullPointerException("OutputSelector must not be null.")
+//    }
+//    val selector = new OutputSelector[T] {
+//      val cleanFun = clean(fun)
+//      def select(in: T): java.lang.Iterable[String] = {
+//        List(cleanFun(in))
+//      }
+//    }
+//    split(selector)
+//  }
+
   /**
    * Creates a new SplitDataStream that contains only the elements satisfying the
    *  given output selector predicate.
    */
-  def split(fun: T => String): SplitDataStream[T] = {
+  def split(fun: T => TraversableOnce[String]): SplitDataStream[T] = {
     if (fun == null) {
       throw new NullPointerException("OutputSelector must not be null.")
     }
     val selector = new OutputSelector[T] {
       val cleanFun = clean(fun)
       def select(in: T): java.lang.Iterable[String] = {
-        List(cleanFun(in))
+        cleanFun(in).toIterable.asJava
       }
     }
     split(selector)

http://git-wip-us.apache.org/repos/asf/flink/blob/8c9ab85e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
index 9e33f80..105d2c1 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
@@ -26,8 +26,6 @@ import org.apache.flink.streaming.api.datastream.{ SplitDataStream =>
SplitJavaS
  * {@link #select} function. To apply a transformation on the whole output simply call
  * the appropriate method on this stream.
  *
- * @param <OUT>
- *            The type of the output.
  */
 class SplitDataStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaStream){
 


Mime
View raw message