flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [12/18] git commit: [streaming] Minor bug fixes in Connectors, StreamCollector & docs
Date Sat, 20 Sep 2014 13:10:55 GMT
[streaming] Minor bug fixes in Connectors, StreamCollector & docs

This closes #115


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/439ca7ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/439ca7ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/439ca7ff

Branch: refs/heads/master
Commit: 439ca7ffe0a7a9fe856b22bfed50c031f062c7fb
Parents: 6b6951e
Author: Márton Balassi <balassi.marton@gmail.com>
Authored: Sat Sep 20 13:14:57 2014 +0200
Committer: mbalassi <balassi.marton@gmail.com>
Committed: Sat Sep 20 13:44:12 2014 +0200

----------------------------------------------------------------------
 docs/streaming_guide.md                         | 81 ++++++++++----------
 .../connectors/twitter/TwitterSource.java       | 11 ++-
 .../api/collector/DirectedStreamCollector.java  | 29 ++++---
 .../api/collector/DirectedOutputTest.java       | 14 +++-
 4 files changed, 75 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/439ca7ff/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index c1c6cde..87d851d 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -78,7 +78,7 @@ public class StreamingWordCount {
 Program Skeleton
 ----------------
 
-As we could already see in the example, a Flink Streaming program looks almost identical
to a regular Flink program. Each stream processing program consists of the following parts:
+As presented in the [example](#example), a Flink Streaming program looks almost identical
to a regular Flink program. Each stream processing program consists of the following parts:
 
 1. Creating a `StreamExecutionEnvironment`,
 2. Connecting to data stream sources,
@@ -100,7 +100,7 @@ For connecting to data streams the `StreamExecutionEnvironment` has many
differe
 env.readTextFile(filePath)
 ```
 
-After defining the data stream sources, the user can specify transformations on the data
streams to create a new data stream. Different data streams can be also combined together
for joint transformations which we will see in the [operations](#operations) section.
+After defining the data stream sources, the user can specify transformations on the data
streams to create a new data stream. Different data streams can be also combined together
for joint transformations which are being showcased in the [operations](#operations) section.
 
 ```java
 dataStream.map(new Mapper()).reduce(new Reducer())
@@ -126,30 +126,30 @@ Basics
 
 ### DataStream
 
-The `DataStream` is the basic abstraction provided by the the Flink Streaming API. It represents
a continuous stream of data of a certain type from either a data source or a transformed data
stream. Operations on the DataStreams will be applied on individual data points or windows
of the `DataStream` based on the type of the operation. For example the map operator transforms
each data point individually while window or batch aggregations work on an interval of data
points at the same time.
+The `DataStream` is the basic abstraction provided by the Flink Streaming API. It represents
a continuous stream of data of a certain type from either a data source or a transformed data
stream. Operations will be applied on individual data points or windows of the `DataStream`
based on the type of the operation. For example the map operator transforms each data point
individually while window or batch aggregations work on an interval of data points at the
same time.
  
-The different operations return different `DataStream` types allowing more elaborate transformations,
for example the `groupBy()` method returns a `GroupedDataStream` which can be used for group
operations.
+The operations may return different `DataStream` types allowing more elaborate transformations,
for example the `groupBy()` method returns a `GroupedDataStream` which can be used for group
operations.
 
 ### Partitioning
 
-Partitioning controls how individual data points are distributed among the parallel instances
of the transformation operators. By default Forward partitioning is used. There are several
partitioning types supported in Flink Streaming:
+Partitioning controls how individual data points are distributed among the parallel instances
of the transformation operators. By default *Forward* partitioning is used. There are several
partitioning types supported in Flink Streaming:
 
- * Forward: Forward partitioning directs the output data to the next operator on the same
core (if possible) avoiding expensive network I/O. This is the default partitioner.
+ * *Forward*: Forward partitioning directs the output data to the next operator on the same
machine (if possible) avoiding expensive network I/O. This is the default partitioner.
 Usage: `dataStream.forward()`
- * Shuffle: Shuffle partitioning randomly partitions the output data stream to the next operator
using uniform distribution.
+ * *Shuffle*: Shuffle partitioning randomly partitions the output data stream to the next
operator using uniform distribution.
 Usage: `dataStream.shuffle()`
- * Distribute: Distribute partitioning directs the output data stream to the next operator
in a round-robin fashion, achieving a balanced distribution.
+ * *Distribute*: Distribute partitioning directs the output data stream to the next operator
in a round-robin fashion, achieving a balanced distribution.
 Usage: `dataStream.distribute()`
- * Field: Field partitioning partitions the output data stream based on the hash code of
a selected key field. Data points with the same key will always go to the same operator instance.
+ * *Field*: Field partitioning partitions the output data stream based on the hash code of
a selected key field. Data points with the same key are directed to the same operator instance.
 Usage: `dataStream.partitionBy(keyposition)`
- * Broadcast: Broadcast partitioning sends the output data stream to all parallel instances
of the next operator.
+ * *Broadcast*: Broadcast partitioning sends the output data stream to all parallel instances
of the next operator.
 Usage: `dataStream.broadcast()`
- * Global: All data points end up at the same operator instance. To achieve a clearer structure
use the parallelism setting of the corresponding operator for this.
+ * *Global*: All data points end up at the same operator instance. To achieve this use the
parallelism setting of the corresponding operator.
 Usage: `operator.setParallelism(1)`
 
 ### Sources
 
-The user can connect to different data streams by the different implemenations of `DataStreamSource`
using methods provided in `StreamExecutionEnvironment`. There are several predefined ones
similar to the ones provided by the batch API like:
+The user can connect to data streams by the different implemenations of `DataStreamSource`
using methods provided in `StreamExecutionEnvironment`. There are several predefined ones
similar to the ones provided by the batch API like:
 
  * `env.genereateSequence(from, to)`
  * `env.fromElements(elements…)`
@@ -251,19 +251,19 @@ When the reduce operator is applied on a grouped data stream, the user-defined
`
 
 ### Aggregations
 
-The Flink streaming API supports different types of aggregation operators similarly to the
core API. For grouped data streams the aggregations work in a grouped fashion.
+The Flink Streaming API supports different types of aggregation operators similarly to the
core API. For grouped data streams the aggregations work in a grouped fashion.
 
 Types of aggregations: `sum(fieldPosition)`, `min(fieldPosition)`, `max(fieldPosition)`
 
-For every incoming tuple the selected field is replaced with the current aggregated value.
If the aggregations are used without defining field position, 0 is used as default. 
+For every incoming tuple the selected field is replaced with the current aggregated value.
If the aggregations are used without defining field position, position `0` is used as default.

 
 ### Window/Batch operators
 
 Window and batch operators allow the user to execute function on slices or windows of the
DataStream in a sliding fashion. If the stepsize for the slide is not defined then the window/batchsize
is used as stepsize by default.
 
-When applied to grouped data streams the data stream will be batched/windowed for different
key values separately. 
+When applied to grouped data streams the data stream is batched/windowed for different key
values separately. 
 
-For example a `ds.groupBy(0).batch(100, 10)` will produce batches of the last 100 elements
for each key value with 10 record step size.
+For example a `dataStream.groupBy(0).batch(100, 10)` produces batches of the last 100 elements
for each key value with 10 record step size.
  
 #### Reduce on windowed/batched data streams
 The transformation calls a user-defined `ReduceFunction` on records received in the batch
or during the predefined time window. The window is shifted after each reduce call. The user
can also use the different streaming aggregations.
@@ -291,10 +291,10 @@ Applies a CoMap transformation on two separate DataStreams, mapping
them to a co
 A CoMap operator that outputs true if an Integer value is received and false if a String
value is received:
 
 ```java
-DataStream<Integer> ds1 = ...
-DataStream<String> ds2 = ...
+DataStream<Integer> dataStream1 = ...
+DataStream<String> dataStream2 = ...
 		
-ds1.connect(ds2).
+dataStream1.connect(dataStream2)
 	.map(new CoMapFunction<Integer, String, Boolean>() {
 			
 			@Override
@@ -310,13 +310,13 @@ ds1.connect(ds2).
 ```
 
 #### FlatMap on ConnectedDataStream
-The FlatMap operator for the `ConnectedDataStream` works similarly to CoMap, but instead
of returning exactly one element after each map call the user can output zero or more values
using the Collector interface. 
+The FlatMap operator for the `ConnectedDataStream` works similarly to CoMap, but instead
of returning exactly one element after each map call the user can output arbitrarily many
values using the Collector interface. 
 
 ```java
-DataStream<Integer> ds1 = ...
-DataStream<String> ds2 = ...
-
-ds1.connect(ds2)
+DataStream<Integer> dataStream1 = ...
+DataStream<String> dataStream2 = ...
+		
+dataStream1.connect(dataStream2)
 	.flatMap(new CoFlatMapFunction<Integer, String, Boolean>() {
 
 			@Override
@@ -337,7 +337,7 @@ The Reduce operator for the `ConnectedDataStream` applies a simple reduce
transf
 <section id="output-splitting">
 ### Output splitting
 
-Most data stream operators support directed outputs. It means that different data elements
are received by only given outputs. The outputs are referenced by their name given at the
point of receiving:
+Most data stream operators support directed outputs, meaning that different data elements
are received by only given outputs. The outputs are referenced by their name given at the
point of receiving:
 
 ```java
 SplitDataStream<Integer> split = someDataStream.split(outputSelector);
@@ -345,7 +345,7 @@ DataStream<Integer> even = split.select("even");
 DataStream<Integer> odd = split.select("odd");
 ```
 
-Data streams will only receive the elements directed to selected output names. These outputs
are directed by implementing a selector function (extending `OutputSelector`):
+Data streams only receive the elements directed to selected output names. These outputs are
directed by implementing a selector function (extending `OutputSelector`):
 
 ```java
 void select(OUT value, Collection<String> outputs);
@@ -365,7 +365,7 @@ void select(Integer value, Collection<String> outputs) {
 ```
 
 This output selection allows data streams to listen to multiple outputs, and data points
to be sent to multiple outputs. A value is sent to all the outputs specified in the `OutputSelector`
and a data stream will receive a value if it has selected any of the outputs the value is
sent to. The stream will receive the data at most once.
-It is common that a stream needs to listen to all the outputs, so `split.selectAll()` is
provided as an alias for explicitly selecting all output names.
+It is common that a stream listens to all the outputs, so `split.selectAll()` is provided
as an alias for explicitly selecting all output names.
 
 
 ### Iterations
@@ -376,13 +376,13 @@ To start an iterative part of the program the user defines the iteration
startin
 ```java
 IterativeDataStream<Integer> iteration = source.iterate();
 ```
-The operator applied on the iteration starting point will be the head of the iteration, where
data is fed back from the iteration tail.
+The operator applied on the iteration starting point is the head of the iteration, where
data is fed back from the iteration tail.
 
 ```java
 DataStream<Integer> head = iteration.map(new IterationHead());
 ```
 
-To close an iteration and define the iteration tail, the user needs to call `.closeWith(tail)`
method of the `IterativeDataStream`:
+To close an iteration and define the iteration tail, the user calls `.closeWith(tail)` method
of the `IterativeDataStream`:
 
 ```java
 DataStream<Integer> tail = head.map(new IterationTail());
@@ -394,19 +394,19 @@ SplitDataStream<Integer> tail = head.map(new IterationTail()).split(outputSelect
 iteration.closeWith(tail.select("iterate"));
 ``` 
 
-Because iterative streaming programs do not have a set number of iteratons for each data
element, the streaming program no information on the end of its input. From this it follows
that iterative streaming programs run until the user manually stops the program. While this
is acceptable under normal circumstances we provide a method to allow iterative programs to
shut down automatically if no input received by the iteration head for a predefined number
of milliseconds.
-To use this function the user need to call, the `iteration.setMaxWaitTime(millis)` to control
the max wait time. 
+Because iterative streaming programs do not have a set number of iteratons for each data
element, the streaming program has no information on the end of its input. From this it follows
that iterative streaming programs run until the user manually stops the program. While this
is acceptable under normal circumstances a method is provided to allow iterative programs
to shut down automatically if no input received by the iteration head for a predefined number
of milliseconds.
+To use this function the user needs to call, the `iteration.setMaxWaitTime(millis)` to control
the max wait time. 
 
 ### Rich functions
-The usage of rich functions are essentially the same as in the core Flink API. All transformations
that take as argument a user-defined function can instead take as argument a rich function:
+The usage of rich functions are essentially the same as in the core Flink API. All transformations
that take as argument a user-defined function can instead take a rich function as argument:
 
 ```java
-ds.map(new RichMapFunction<String, Integer>() {
+dataStream.map(new RichMapFunction<String, Integer>() {
   public Integer map(String value) { return value.toString(); }
 });
 ```
 
-Rich functions provide, in addition to the user-defined function (`map`, `reduce`, etc),
the `open` and `close` methods for initialization and finalization. (In contrast to the core
API, the streaming API currently does not support the  `getRuntimeContext` and `setRuntimeContext`
methods.)
+Rich functions provide, in addition to the user-defined function (`map()`, `reduce()`, etc),
the `open()` and `close()` methods for initialization and finalization. (In contrast to the
core API, the streaming API currently does not support the  `getRuntimeContext()` and `setRuntimeContext()`
methods.)
 
 [Back to top](#top)
 
@@ -416,7 +416,7 @@ Operator Settings
 
 ### Parallelism
 
-Setting parallelism for operators works exactly the same way as in the core Flink API The
user can control the number of parallel instances created for each operator by calling the
`operator.setParallelism(dop)` method.
+Setting parallelism for operators works exactly the same way as in the core Flink API. The
user can control the number of parallel instances created for each operator by calling the
`operator.setParallelism(dop)` method.
 
 ### Buffer timeout
 
@@ -432,7 +432,7 @@ env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
 
 ### Mutability
 
-Most operators allows setting mutability for reading input data. If the operator is set mutable
then the variable used to store input data for operators will be reused in a mutable fashion
to avoid excessive object creation. By default, all operators are set to immutable.
+Most operators allow setting mutability for reading input data. If the operator is set mutable
then the variable used to store input data for operators will be reused in a mutable fashion
to avoid excessive object creation. By default, all operators are set to immutable.
 Usage:
 ```java
 operator.setMutability(isMutable)
@@ -859,7 +859,8 @@ java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.ra
 ```
 
 The maven assemby plugin creates one jar with all the requiered dependencies. If the project
is in a directory called git then the jar can be found here: (the version number may change
later)
-```/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar
```
+```batch
+/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar
```
 
 In the example there are to connectors. One that sends messages to RabbitMQ and one that
receives messages from the same queue. In the logger messages the arriving messages can be
observed in the following format:
 
@@ -905,7 +906,8 @@ java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.ka
 ```
 
 The maven assemby plugin creates one jar with all the requiered dependencies. If the project
is in a directory called git then the jar can be found here: (the version number may change
later)
-```/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar
```
+```batch
+/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar
```
 
 In the example there are to connectors. One that sends messages to Kafka and one that receives
messages from the same queue. In the logger messages the arriving messages can be observed
in the following format:
 
@@ -966,7 +968,8 @@ java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.fl
 ```
 
 The maven assemby plugin creates one jar with all the requiered dependencies. If the project
is in a directory called git then the jar can be found here: (the version number may change
later)
-```/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar
```
+```batch
+/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar
```
 In the example there are to connectors. One that sends messages to Flume and one that receives
messages from the same queue. In the logger messages the arriving messages can be observed
in the following format:
 
 ```

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/439ca7ff/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index 525f4c8..0ae3723 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.connectors.twitter;
 
 import java.io.FileInputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
@@ -27,10 +26,10 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.source.RichSourceFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.twitter.hbc.ClientBuilder;
 import com.twitter.hbc.core.Constants;
@@ -149,8 +148,8 @@ public class TwitterSource extends RichSourceFunction<String> {
 			InputStream input = new FileInputStream(authPath);
 			properties.load(input);
 			input.close();
-		} catch (IOException ioe) {
-			new RuntimeException("Cannot open .properties file: " + authPath, ioe);
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot open .properties file: " + authPath, e);
 		}
 		return properties;
 	}
@@ -226,7 +225,7 @@ public class TwitterSource extends RichSourceFunction<String> {
 				}
 			}
 		} catch (InterruptedException e) {
-			new RuntimeException("'Waiting for tweet' thread is interrupted", e);
+			throw new RuntimeException("'Waiting for tweet' thread is interrupted", e);
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/439ca7ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index 42a2683..40fe3c6 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -86,13 +86,6 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT>
{
 		for (String outputName : outputNames) {
 			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputList
= outputMap
 					.get(outputName);
-			if (outputList == null) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Cannot emit because no output is selected with the name: {}",
-							outputName);
-				}
-			}
-
 			try {
 				for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output
: selectAllOutputs) {
 					if (!emitted.contains(output)) {
@@ -101,18 +94,32 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT>
{
 					}
 				}
 
-				for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output
: outputList) {
-					if (!emitted.contains(output)) {
-						output.emit(serializationDelegate);
-						emitted.add(output);
+				if (outputList == null) {
+					if (LOG.isErrorEnabled()) {
+						String format = String.format(
+								"Cannot emit because no output is selected with the name: %s",
+								outputName);
+						LOG.error(format);
+
+					}
+				} else {
+
+					for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output
: outputList) {
+						if (!emitted.contains(output)) {
+							output.emit(serializationDelegate);
+							emitted.add(output);
+						}
 					}
+
 				}
+
 			} catch (Exception e) {
 				if (LOG.isErrorEnabled()) {
 					LOG.error("Emit to {} failed due to: {}", outputName,
 							StringUtils.stringifyException(e));
 				}
 			}
+
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/439ca7ff/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index e9d3994..85f3d7f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -43,6 +43,7 @@ public class DirectedOutputTest {
 	private static final String EVEN_AND_ODD = "evenAndOdd";
 	private static final String ODD_AND_TEN = "oddAndTen";
 	private static final String EVEN = "even";
+	private static final String NON_SELECTED = "nonSelected";
 
 	static final class MyMap implements MapFunction<Long, Long> {
 		private static final long serialVersionUID = 1L;
@@ -67,6 +68,10 @@ public class DirectedOutputTest {
 			if (value == 10L) {
 				outputs.add(TEN);
 			}
+			
+			if (value == 11L) {
+				outputs.add(NON_SELECTED);
+			}
 		}
 	}
 	
@@ -97,10 +102,11 @@ public class DirectedOutputTest {
 	
 	@Test
 	public void outputSelectorTest() throws Exception {
+		
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 		
-		SplitDataStream<Long> source = env.generateSequence(1, 10).split(new MyOutputSelector());
+		SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
 		source.select(EVEN).addSink(new ListSink(EVEN));
 		source.select(ODD, TEN).addSink(new ListSink(ODD_AND_TEN));
 		source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD));
@@ -108,8 +114,8 @@ public class DirectedOutputTest {
 		
 		env.executeTest(128);
 		assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN));
-		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L), outputs.get(ODD_AND_TEN));
-		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), outputs.get(EVEN_AND_ODD));
-		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), outputs.get(ALL));
+		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), outputs.get(ODD_AND_TEN));
+		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(EVEN_AND_ODD));
+		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(ALL));
 	}
 }


Mime
View raw message