flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [12/28] git commit: [streaming] CoRecordReader and iterator added + ConnectedDataStream refactor
Date Fri, 29 Aug 2014 19:03:45 GMT
[streaming] CoRecordReader and iterator added + ConnectedDataStream refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c3ec1e1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c3ec1e1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c3ec1e1f

Branch: refs/heads/master
Commit: c3ec1e1feee6719c7ae609215320bb619fc135e1
Parents: fa75af0
Author: gyfora <gyula.fora@gmail.com>
Authored: Fri Aug 22 12:12:31 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Aug 29 21:01:56 2014 +0200

----------------------------------------------------------------------
 .../api/collector/DirectedStreamCollector.java  |  24 +-
 .../api/datastream/ConnectedDataStream.java     | 428 ++++++++-----------
 .../datastream/GroupedConnectedDataStream.java  |  66 +++
 .../api/invokable/operator/co/CoInvokable.java  |  50 ++-
 .../streamcomponent/BlockingQueueBroker.java    |  41 --
 .../api/streamcomponent/CoStreamTask.java       |  54 +--
 .../api/streamcomponent/OutputHandler.java      |   1 +
 .../streamcomponent/StreamIterationSink.java    |   1 +
 .../streamcomponent/StreamIterationSource.java  |   1 +
 .../api/streamcomponent/StreamRecordWriter.java | 137 ------
 .../flink/streaming/io/BlockingQueueBroker.java |  41 ++
 .../flink/streaming/io/CoReaderIterator.java    |  56 +++
 .../flink/streaming/io/CoRecordReader.java      | 177 ++++++++
 .../flink/streaming/io/StreamRecordWriter.java  | 137 ++++++
 .../apache/flink/streaming/api/PrintTest.java   |   1 +
 .../invokable/operator/CoGroupReduceTest.java   |   4 +-
 .../ml/IncrementalLearningSkeleton.java         |   6 +-
 .../api/AbstractSingleGateRecordReader.java     |   2 +-
 18 files changed, 725 insertions(+), 502 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index d1d6b48..82f3c50 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -17,16 +17,16 @@
 
 package org.apache.flink.streaming.api.collector;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.util.StringUtils;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.util.StringUtils;
 
 /**
  * A StreamCollector that uses user defined output names and a user defined
@@ -39,7 +39,7 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
 
 	OutputSelector<OUT> outputSelector;
 	private static final Log LOG = LogFactory.getLog(DirectedStreamCollector.class);
-	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted;
+	private Set<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted;
 
 	/**
 	 * Creates a new DirectedStreamCollector
@@ -56,7 +56,7 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
 			OutputSelector<OUT> outputSelector) {
 		super(channelID, serializationDelegate);
 		this.outputSelector = outputSelector;
-		this.emitted = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+		this.emitted = new HashSet<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
 
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 25c2ca5..920278c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -1,250 +1,182 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import java.io.Serializable;
-
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.functions.RichReduceFunction;
-import org.apache.flink.streaming.api.JobGraphBuilder;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
-import org.apache.flink.streaming.partitioner.FieldsPartitioner;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
-
-/**
- * The ConnectedDataStream represents a stream for two different data types. It
- * can be used to apply transformations like {@link CoMapFunction} on two
- * {@link DataStream}s
- *
- * @param <IN1>
- *            Type of the first DataSteam.
- * @param <IN2>
- *            Type of the second DataStream.
- */
-public class ConnectedDataStream<IN1, IN2> {
-
-	StreamExecutionEnvironment environment;
-	JobGraphBuilder jobGraphBuilder;
-	DataStream<IN1> input1;
-	DataStream<IN2> input2;
-
-	protected ConnectedDataStream(StreamExecutionEnvironment environment,
-			JobGraphBuilder jobGraphBuilder, DataStream<IN1> input1,
-			DataStream<IN2> input2) {
-		this.jobGraphBuilder = jobGraphBuilder;
-		this.environment = environment;
-		this.input1 = input1.copy();
-		this.input2 = input2.copy();
-	}
-
-	/**
-	 * Returns the first {@link DataStream}.
-	 * 
-	 * @return The first DataStream.
-	 */
-	public DataStream<IN1> getFirst() {
-		return input1.copy();
-	}
-
-	/**
-	 * Returns the second {@link DataStream}.
-	 * 
-	 * @return The second DataStream.
-	 */
-	public DataStream<IN2> getSecond() {
-		return input2.copy();
-	}
-
-	/**
-	 * Sets the partitioning of the two separate {@link DataStream}s so that the
-	 * output tuples are partitioned by their hashcode and are sent to only one
-	 * component.
-	 * 
-	 * @param keyPosition1
-	 *            The field used to compute the hashcode of the elements in the
-	 *            first input stream.
-	 * @param keyPosition2
-	 *            The field used to compute the hashcode of the elements in the
-	 *            second input stream.
-	 * @return The DataStream with field partitioning set.
-	 */
-	public ConnectedDataStream<IN1, IN2> partitionBy(int keyPosition1,
-			int keyPosition2) {
-		if (keyPosition1 < 0 || keyPosition2 < 0) {
-			throw new IllegalArgumentException(
-					"The position of the field must be non-negative");
-		}
-
-		return new ConnectedDataStream<IN1, IN2>(this.environment,
-				this.jobGraphBuilder, getFirst().setConnectionType(
-						new FieldsPartitioner<IN1>(keyPosition1)), getSecond()
-						.setConnectionType(
-								new FieldsPartitioner<IN2>(keyPosition2)));
-	}
-
-	/**
-	 * GroupBy operation for connected data stream. Groups the elements of
-	 * input1 and input2 according to keyPosition1 and keyPosition2. Basically
-	 * used before Reduce operation.
-	 * 
-	 * @param keyPosition1
-	 *            The field used to compute the hashcode of the elements in the
-	 *            first input stream.
-	 * @param keyPosition2
-	 *            The field used to compute the hashcode of the elements in the
-	 *            second input stream.
-	 * @return
-	 */
-	public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1,
-			int keyPosition2) {
-		return this.partitionBy(keyPosition1, keyPosition2);
-	}
-
-	/**
-	 * Applies a CoMap transformation on two separate {@link DataStream}s. The
-	 * transformation calls a {@link CoMapFunction#map1} for each element of the
-	 * first input and {@link CoMapFunction#map2} for each element of the second
-	 * input. Each CoMapFunction call returns exactly one element. The user can
-	 * also extend {@link RichCoMapFunction} to gain access to other features
-	 * provided by the {@link RichFuntion} interface.
-	 * 
-	 * @param coMapper
-	 *            The CoMapFunction used to jointly transform the two input
-	 *            DataStreams
-	 * @return The transformed DataStream
-	 */
-	public <OUT> SingleOutputStreamOperator<OUT, ?> map(
-			CoMapFunction<IN1, IN2, OUT> coMapper) {
-		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(
-				coMapper, CoMapFunction.class, 0);
-		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(
-				coMapper, CoMapFunction.class, 1);
-		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(
-				coMapper, CoMapFunction.class, 2);
-
-		return addCoFunction("coMap", coMapper,
-				in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
-				new CoMapInvokable<IN1, IN2, OUT>(coMapper));
-	}
-
-	/**
-	 * Applies a CoFlatMap transformation on two separate {@link DataStream}s.
-	 * The transformation calls a {@link CoFlatMapFunction#map1} for each
-	 * element of the first input and {@link CoFlatMapFunction#map2} for each
-	 * element of the second input. Each CoFlatMapFunction call returns any
-	 * number of elements including none. The user can also extend
-	 * {@link RichFlatMapFunction} to gain access to other features provided by
-	 * the {@link RichFuntion} interface.
-	 * 
-	 * @param coFlatMapper
-	 *            The CoFlatMapFunction used to jointly transform the two input
-	 *            DataStreams
-	 * @return The transformed DataStream
-	 */
-	public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
-			CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
-		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(
-				coFlatMapper, CoFlatMapFunction.class, 0);
-		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(
-				coFlatMapper, CoFlatMapFunction.class, 1);
-		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(
-				coFlatMapper, CoFlatMapFunction.class, 2);
-		
-		return addCoFunction("coFlatMap", coFlatMapper,
-				in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
-				new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
-	}
-
-	/**
-	 * Applies a CoReduce transformation on the grouped data stream grouped on
-	 * by the given key position. The {@link CoReduceFunction} will receive
-	 * input values based on the key positions. The transformation calls
-	 * {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
-	 * each element of the first input and {@link CoReduceFunction#reduce2} and
-	 * {@link CoReduceFunction#map2} for each element of the second input. For
-	 * each input, only values with the same key will go to the same reducer.
-	 * The user can also extend {@link RichReduceFunction} to gain access to
-	 * other features provided by the {@link RichFuntion} interface.
-	 * 
-	 * @param coReducer
-	 *            The {@link CoReduceFunction} that will be called for every two
-	 *            element with the same key of each input DataStream.
-	 * @param keyPosition1
-	 *            position of the key in the first input DataStream
-	 * @param keyPosition2
-	 *            position of the key in the second input DataStream
-	 * @return The transformed DataStream.
-	 */
-	public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(
-			CoReduceFunction<IN1, IN2, OUT> coReducer, int keyPosition1,
-			int keyPosition2) {
-		
-		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(
-				coReducer, CoReduceFunction.class, 0);
-		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(
-				coReducer, CoReduceFunction.class, 1);
-		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(
-				coReducer, CoReduceFunction.class, 2);
-		
-		return addCoFunction("coReduce", coReducer,
-				in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
-				new CoGroupReduceInvokable<IN1, IN2, OUT>(coReducer,
-						keyPosition1, keyPosition2));
-	}
-
-	protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(
-			String functionName, final Function function,
-			TypeSerializerWrapper<IN1> in1TypeWrapper,
-			TypeSerializerWrapper<IN2> in2TypeWrapper,
-			TypeSerializerWrapper<OUT> outTypeWrapper,
-			CoInvokable<IN1, IN2, OUT> functionInvokable) {
-
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
-				environment, functionName);
-
-		try {
-			input1.jobGraphBuilder.addCoTask(returnStream.getId(),
-					functionInvokable, in1TypeWrapper, in2TypeWrapper, outTypeWrapper, functionName,
-					SerializationUtils.serialize((Serializable) function),
-					environment.getDegreeOfParallelism());
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize user defined function");
-		}
-
-		input1.connectGraph(input1, returnStream.getId(), 1);
-		input1.connectGraph(input2, returnStream.getId(), 2);
-
-		// TODO consider iteration
-
-		return returnStream;
-	}
-
-}
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import java.io.Serializable;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.streaming.api.JobGraphBuilder;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
+import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+
+/**
+ * The ConnectedDataStream represents a stream for two different data types. It
+ * can be used to apply transformations like {@link CoMapFunction} on two
+ * {@link DataStream}s
+ *
+ * @param <IN1>
+ *            Type of the first DataSteam.
+ * @param <IN2>
+ *            Type of the second DataStream.
+ */
+public class ConnectedDataStream<IN1, IN2> {
+
+	StreamExecutionEnvironment environment;
+	JobGraphBuilder jobGraphBuilder;
+	DataStream<IN1> input1;
+	DataStream<IN2> input2;
+
+	protected ConnectedDataStream(StreamExecutionEnvironment environment,
+			JobGraphBuilder jobGraphBuilder, DataStream<IN1> input1, DataStream<IN2> input2) {
+		this.jobGraphBuilder = jobGraphBuilder;
+		this.environment = environment;
+		this.input1 = input1.copy();
+		this.input2 = input2.copy();
+	}
+
+	/**
+	 * Returns the first {@link DataStream}.
+	 * 
+	 * @return The first DataStream.
+	 */
+	public DataStream<IN1> getFirst() {
+		return input1.copy();
+	}
+
+	/**
+	 * Returns the second {@link DataStream}.
+	 * 
+	 * @return The second DataStream.
+	 */
+	public DataStream<IN2> getSecond() {
+		return input2.copy();
+	}
+
+	/**
+	 * GroupBy operation for connected data stream. Groups the elements of
+	 * input1 and input2 according to keyPosition1 and keyPosition2. Used for
+	 * applying function on grouped data streams for example
+	 * {@link GroupedConnectedDataStream#reduce}
+	 * 
+	 * @param keyPosition1
+	 *            The field used to compute the hashcode of the elements in the
+	 *            first input stream.
+	 * @param keyPosition2
+	 *            The field used to compute the hashcode of the elements in the
+	 *            second input stream.
+	 * @return Returns the {@link GroupedConnectedDataStream} created.
+	 */
+	public GroupedConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
+		if (keyPosition1 < 0 || keyPosition2 < 0) {
+			throw new IllegalArgumentException("The position of the field must be non-negative");
+		}
+
+		return new GroupedConnectedDataStream<IN1, IN2>(this.environment, this.jobGraphBuilder,
+				getFirst().partitionBy(keyPosition1), getSecond().partitionBy(keyPosition2),
+				keyPosition1, keyPosition2);
+	}
+
+	/**
+	 * Applies a CoMap transformation on two separate {@link DataStream}s. The
+	 * transformation calls a {@link CoMapFunction#map1} for each element of the
+	 * first input and {@link CoMapFunction#map2} for each element of the second
+	 * input. Each CoMapFunction call returns exactly one element. The user can
+	 * also extend {@link RichCoMapFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param coMapper
+	 *            The CoMapFunction used to jointly transform the two input
+	 *            DataStreams
+	 * @return The transformed DataStream
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
+		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coMapper,
+				CoMapFunction.class, 0);
+		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coMapper,
+				CoMapFunction.class, 1);
+		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coMapper,
+				CoMapFunction.class, 2);
+
+		return addCoFunction("coMap", coMapper, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
+				new CoMapInvokable<IN1, IN2, OUT>(coMapper));
+	}
+
+	/**
+	 * Applies a CoFlatMap transformation on two separate {@link DataStream}s.
+	 * The transformation calls a {@link CoFlatMapFunction#map1} for each
+	 * element of the first input and {@link CoFlatMapFunction#map2} for each
+	 * element of the second input. Each CoFlatMapFunction call returns any
+	 * number of elements including none. The user can also extend
+	 * {@link RichFlatMapFunction} to gain access to other features provided by
+	 * the {@link RichFuntion} interface.
+	 * 
+	 * @param coFlatMapper
+	 *            The CoFlatMapFunction used to jointly transform the two input
+	 *            DataStreams
+	 * @return The transformed DataStream
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
+			CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
+		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coFlatMapper,
+				CoFlatMapFunction.class, 0);
+		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coFlatMapper,
+				CoFlatMapFunction.class, 1);
+		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coFlatMapper,
+				CoFlatMapFunction.class, 2);
+
+		return addCoFunction("coFlatMap", coFlatMapper, in1TypeWrapper, in2TypeWrapper,
+				outTypeWrapper, new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
+	}
+
+	protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
+			final Function function, TypeSerializerWrapper<IN1> in1TypeWrapper,
+			TypeSerializerWrapper<IN2> in2TypeWrapper, TypeSerializerWrapper<OUT> outTypeWrapper,
+			CoInvokable<IN1, IN2, OUT> functionInvokable) {
+
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
+				environment, functionName);
+
+		try {
+			input1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
+					in1TypeWrapper, in2TypeWrapper, outTypeWrapper, functionName,
+					SerializationUtils.serialize((Serializable) function),
+					environment.getDegreeOfParallelism());
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize user defined function");
+		}
+
+		input1.connectGraph(input1, returnStream.getId(), 1);
+		input1.connectGraph(input2, returnStream.getId(), 2);
+
+		// TODO consider iteration
+
+		return returnStream;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
new file mode 100755
index 0000000..13ba3ab
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.streaming.api.JobGraphBuilder;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupReduceInvokable;
+import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+
+public class GroupedConnectedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, IN2> {
+
+	int keyPosition1;
+	int keyPosition2;
+
+	protected GroupedConnectedDataStream(StreamExecutionEnvironment environment,
+			JobGraphBuilder jobGraphBuilder, DataStream<IN1> input1, DataStream<IN2> input2,
+			int keyPosition1, int keyPosition2) {
+		super(environment, jobGraphBuilder, input1, input2);
+		this.keyPosition1 = keyPosition1;
+		this.keyPosition2 = keyPosition2;
+	}
+
+	/**
+	 * Applies a CoReduce transformation on the grouped data stream grouped on
+	 * by the given key position. The {@link CoReduceFunction} will receive
+	 * input values based on the key positions. The transformation calls
+	 * {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
+	 * each element of the first input and {@link CoReduceFunction#reduce2} and
+	 * {@link CoReduceFunction#map2} for each element of the second input. For
+	 * each input, only values with the same key will go to the same reducer.
+	 * 
+	 * @param coReducer
+	 *            The {@link CoReduceFunction} that will be called for every two
+	 *            element with the same key of each input DataStream.
+	 * @return The transformed DataStream.
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
+
+		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
+				CoReduceFunction.class, 0);
+		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
+				CoReduceFunction.class, 1);
+		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
+				CoReduceFunction.class, 2);
+
+		return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
+				new CoGroupReduceInvokable<IN1, IN2, OUT>(coReducer, keyPosition1, keyPosition2));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 6e18ec3..246fa4f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -21,8 +21,8 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.io.CoReaderIterator;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
 
 public abstract class CoInvokable<IN1, IN2, OUT> extends StreamComponentInvokable<OUT> {
 
@@ -32,8 +32,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamComponentInvokabl
 
 	private static final long serialVersionUID = 1L;
 
-	protected MutableObjectIterator<StreamRecord<IN1>> recordIterator1;
-	protected MutableObjectIterator<StreamRecord<IN2>> recordIterator2;
+	protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
 	protected StreamRecord<IN1> reuse1;
 	protected StreamRecord<IN2> reuse2;
 	protected StreamRecordSerializer<IN1> serializer1;
@@ -41,16 +40,13 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamComponentInvokabl
 	protected boolean isMutable;
 
 	public void initialize(Collector<OUT> collector,
-			MutableObjectIterator<StreamRecord<IN1>> recordIterator1,
-			StreamRecordSerializer<IN1> serializer1,
-			MutableObjectIterator<StreamRecord<IN2>> recordIterator2,
-			StreamRecordSerializer<IN2> serializer2, boolean isMutable) {
+			CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator,
+			StreamRecordSerializer<IN1> serializer1, StreamRecordSerializer<IN2> serializer2,
+			boolean isMutable) {
 		this.collector = collector;
 
-		this.recordIterator1 = recordIterator1;
+		this.recordIterator = recordIterator;
 		this.reuse1 = serializer1.createInstance();
-
-		this.recordIterator2 = recordIterator2;
 		this.reuse2 = serializer2.createInstance();
 
 		this.serializer1 = serializer1;
@@ -58,30 +54,32 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamComponentInvokabl
 		this.isMutable = isMutable;
 	}
 
-	public void resetReuse() {
+	protected void resetReuseAll() {
 		this.reuse1 = serializer1.createInstance();
 		this.reuse2 = serializer2.createInstance();
 	}
 
-	public void invoke() throws Exception {
-		boolean noMoreRecordOnInput1 = false;
-		boolean noMoreRecordOnInput2 = false;
+	protected void resetReuse1() {
+		this.reuse1 = serializer1.createInstance();
+	}
 
-		do {
-			noMoreRecordOnInput1 = ((reuse1 = recordIterator1.next(reuse1)) == null);
-			if (!noMoreRecordOnInput1) {
-				handleStream1();
-			}
+	protected void resetReuse2() {
+		this.reuse2 = serializer2.createInstance();
+	}
 
-			noMoreRecordOnInput2 = ((reuse2 = recordIterator2.next(reuse2)) == null);
-			if (!noMoreRecordOnInput2) {
+	public void invoke() throws Exception {
+		while (true) {
+			int next = recordIterator.next(reuse1, reuse2);
+			if (next == 0) {
+				break;
+			} else if (next == 1) {
+				handleStream1();
+				resetReuse1();
+			} else {
 				handleStream2();
+				resetReuse2();
 			}
-
-			if (!this.isMutable) {
-				resetReuse();
-			}
-		} while (!noMoreRecordOnInput1 || !noMoreRecordOnInput2);
+		}
 	}
 
 	public abstract void handleStream1() throws Exception;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/BlockingQueueBroker.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/BlockingQueueBroker.java
deleted file mode 100755
index 3618347..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/BlockingQueueBroker.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.flink.runtime.iterative.concurrent.Broker;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-@SuppressWarnings("rawtypes")
-public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> {
-	/**
-	 * Singleton instance
-	 */
-	private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
-
-	private BlockingQueueBroker() {
-	}
-
-	/**
-	 * retrieve singleton instance
-	 */
-	public static Broker<BlockingQueue<StreamRecord>> instance() {
-		return INSTANCE;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index b919ece..ce2b7e8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.streamcomponent;
 import java.util.ArrayList;
 
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.io.network.api.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.io.CoReaderIterator;
+import org.apache.flink.streaming.io.CoRecordReader;
 import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -34,15 +34,16 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 		AbstractStreamComponent {
 
 	private OutputHandler<OUT> outputHandler;
-	
+
 	protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
 	protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
 
-	private MutableReader<IOReadableWritable> inputs1;
-	private MutableReader<IOReadableWritable> inputs2;
 	MutableObjectIterator<StreamRecord<IN1>> inputIter1;
 	MutableObjectIterator<StreamRecord<IN2>> inputIter2;
 
+	CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>> coReader;
+	CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;
+
 	private CoInvokable<IN1, IN2, OUT> userInvokable;
 	private static int numTasks;
 
@@ -52,30 +53,29 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 		instanceID = numTasks;
 	}
 
-	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private void setDeserializers() {
 		TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1();
 		inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
 
 		TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2();
-		inputDeserializer2 = new StreamRecordSerializer(inputTypeInfo2);
+		inputDeserializer2 = new StreamRecordSerializer<IN2>(inputTypeInfo2);
 	}
 
 	@Override
 	public void setInputsOutputs() {
 		outputHandler = new OutputHandler<OUT>(this);
-		
+
 		setConfigInputs();
 
-		inputIter1 = InputHandler.staticCreateInputIterator(inputs1, inputDeserializer1);
-		inputIter2 = InputHandler.staticCreateInputIterator(inputs2, inputDeserializer2);
+		coIter = new CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>>(coReader,
+				inputDeserializer1, inputDeserializer2);
 	}
 
 	@Override
 	protected void setInvokable() {
 		userInvokable = configuration.getUserInvokable();
-		userInvokable.initialize(outputHandler.getCollector(), inputIter1, inputDeserializer1,
-				inputIter2, inputDeserializer2, isMutable);
+		userInvokable.initialize(outputHandler.getCollector(), coIter, inputDeserializer1,
+				inputDeserializer2, isMutable);
 	}
 
 	protected void setConfigInputs() throws StreamComponentException {
@@ -83,39 +83,27 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 
 		int numberOfInputs = configuration.getNumberOfInputs();
 
-		ArrayList<MutableRecordReader<IOReadableWritable>> inputList1 = new ArrayList<MutableRecordReader<IOReadableWritable>>();
-		ArrayList<MutableRecordReader<IOReadableWritable>> inputList2 = new ArrayList<MutableRecordReader<IOReadableWritable>>();
+		ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>> inputList1 = new ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>>();
+		ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>> inputList2 = new ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>>();
 
 		for (int i = 0; i < numberOfInputs; i++) {
 			int inputType = configuration.getInputType(i);
 			switch (inputType) {
 			case 1:
-				inputList1.add(new MutableRecordReader<IOReadableWritable>(this));
+				inputList1.add(new MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>(
+						this));
 				break;
 			case 2:
-				inputList2.add(new MutableRecordReader<IOReadableWritable>(this));
+				inputList2.add(new MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>(
+						this));
 				break;
 			default:
 				throw new RuntimeException("Invalid input type number: " + inputType);
 			}
 		}
 
-		inputs1 = getInputs(inputList1);
-		inputs2 = getInputs(inputList2);
-	}
-
-	@SuppressWarnings("unchecked")
-	private MutableReader<IOReadableWritable> getInputs(
-			ArrayList<MutableRecordReader<IOReadableWritable>> inputList) {
-		if (inputList.size() == 1) {
-			return inputList.get(0);
-		} else if (inputList.size() > 1) {
-			MutableRecordReader<IOReadableWritable>[] inputArray = inputList
-					.toArray(new MutableRecordReader[inputList.size()]);
-
-			return new MutableUnionRecordReader<IOReadableWritable>(inputArray);
-		}
-		return null;
+		coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(
+				inputList1, inputList2);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
index 8e59f89..71fb015 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.collector.StreamCollector;
 import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.io.StreamRecordWriter;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.types.TypeInformation;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index 98225ec..7b4f3ff 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.io.BlockingQueueBroker;
 import org.apache.flink.util.StringUtils;
 
 public class StreamIterationSink<IN extends Tuple> extends

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index f7c42ce..7de80b2 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.io.BlockingQueueBroker;
 
 public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComponent {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java
deleted file mode 100755
index ff30d70..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.api.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.io.network.api.RoundRobinChannelSelector;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class StreamRecordWriter<T extends IOReadableWritable> extends
-		RecordWriter<T> {
-
-	private final BufferProvider bufferPool;
-
-	private final ChannelSelector<T> channelSelector;
-
-	private int numChannels;
-
-	private long timeout;
-
-	/** RecordSerializer per outgoing channel */
-	private RecordSerializer<T>[] serializers;
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public StreamRecordWriter(AbstractInvokable invokable) {
-		this(invokable, new RoundRobinChannelSelector<T>(), 1000);
-	}
-
-	public StreamRecordWriter(AbstractInvokable invokable,
-			ChannelSelector<T> channelSelector) {
-		this(invokable, channelSelector, 1000);
-	}
-
-	public StreamRecordWriter(AbstractInvokable invokable,
-			ChannelSelector<T> channelSelector, long timeout) {
-		// initialize the gate
-		super(invokable);
-
-		this.timeout = timeout;
-		this.bufferPool = invokable.getEnvironment().getOutputBufferProvider();
-		this.channelSelector = channelSelector;
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void initializeSerializers() {
-		this.numChannels = this.outputGate.getNumChannels();
-		this.serializers = new RecordSerializer[numChannels];
-		for (int i = 0; i < this.numChannels; i++) {
-			this.serializers[i] = new SpanningRecordSerializer<T>();
-		}
-		(new OutputFlusher()).start();
-	}
-
-	@Override
-	public void emit(final T record) throws IOException, InterruptedException {
-		for (int targetChannel : this.channelSelector.selectChannels(record,
-				this.numChannels)) {
-			// serialize with corresponding serializer and send full buffer
-
-			RecordSerializer<T> serializer = this.serializers[targetChannel];
-
-			synchronized (serializer) {
-				RecordSerializer.SerializationResult result = serializer
-						.addRecord(record);
-				while (result.isFullBuffer()) {
-					Buffer buffer = serializer.getCurrentBuffer();
-					if (buffer != null) {
-						sendBuffer(buffer, targetChannel);
-					}
-
-					buffer = this.bufferPool
-							.requestBufferBlocking(this.bufferPool
-									.getBufferSize());
-					result = serializer.setNextBuffer(buffer);
-				}
-			}
-		}
-	}
-
-	@Override
-	public void flush() throws IOException, InterruptedException {
-		for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) {
-			RecordSerializer<T> serializer = this.serializers[targetChannel];
-			synchronized (serializer) {
-				Buffer buffer = serializer.getCurrentBuffer();
-				if (buffer != null) {
-					sendBuffer(buffer, targetChannel);
-				}
-
-				serializer.clear();
-			}
-
-		}
-	}
-
-	private class OutputFlusher extends Thread {
-
-		@Override
-		public void run() {
-			while (!outputGate.isClosed()) {
-				try {
-					Thread.sleep(timeout);
-					flush();
-				} catch (Exception e) {
-					throw new RuntimeException(e);
-				}
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BlockingQueueBroker.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BlockingQueueBroker.java
new file mode 100755
index 0000000..93577ea
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BlockingQueueBroker.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.flink.runtime.iterative.concurrent.Broker;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+
+@SuppressWarnings("rawtypes")
+public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> {
+	/**
+	 * Singleton instance
+	 */
+	private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
+
+	private BlockingQueueBroker() {
+	}
+
+	/**
+	 * retrieve singleton instance
+	 */
+	public static Broker<BlockingQueue<StreamRecord>> instance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
new file mode 100755
index 0000000..729f75d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+
+/**
+ * A CoReaderIterator wraps a {@link CoRecordReader} producing records of two
+ * input types.
+ */
+public final class CoReaderIterator<T1, T2> {
+
+	private final CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader; // the
+																									// source
+
+	private final DeserializationDelegate<T1> delegate1;
+	private final DeserializationDelegate<T2> delegate2;
+
+	public CoReaderIterator(
+			CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader,
+			TypeSerializer<T1> serializer1, TypeSerializer<T2> serializer2) {
+		this.reader = reader;
+		this.delegate1 = new DeserializationDelegate<T1>(serializer1);
+		this.delegate2 = new DeserializationDelegate<T2>(serializer2);
+	}
+
+	public int next(T1 target1, T2 target2) throws IOException {
+		this.delegate1.setInstance(target1);
+		this.delegate2.setInstance(target2);
+
+		try {
+			return this.reader.getNextRecord(this.delegate1, this.delegate2);
+
+		} catch (InterruptedException e) {
+			throw new IOException("Reader interrupted.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
new file mode 100755
index 0000000..ed44398
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.AbstractTaskEvent;
+import org.apache.flink.runtime.io.network.api.AbstractRecordReader;
+import org.apache.flink.runtime.io.network.api.MutableRecordReader;
+import org.apache.flink.runtime.io.network.gates.InputChannelResult;
+import org.apache.flink.runtime.io.network.gates.InputGate;
+import org.apache.flink.runtime.io.network.gates.RecordAvailabilityListener;
+
+/**
+ * A CoRecordReader wraps {@link MutableRecordReader}s of two different input
+ * types to read records effectively.
+ */
+@SuppressWarnings("rawtypes")
+public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> extends
+		AbstractRecordReader implements RecordAvailabilityListener {
+
+	/**
+	 * Sets of input gates for the two input types
+	 */
+	private Set<InputGate<T1>> inputGates1;
+	private Set<InputGate<T2>> inputGates2;
+
+	private final Set<InputGate> remainingInputGates;
+
+	/**
+	 * Queue with indices of channels that store at least one available record.
+	 */
+	private final ArrayDeque<InputGate> availableInputGates = new ArrayDeque<InputGate>();
+
+	/**
+	 * The next input gate to read a record from.
+	 */
+	private InputGate nextInputGateToReadFrom;
+
+	@Override
+	public boolean isInputClosed() {
+		return this.remainingInputGates.isEmpty();
+	}
+
+	@SuppressWarnings("unchecked")
+	public CoRecordReader(ArrayList<MutableRecordReader<T1>> inputList1,
+			ArrayList<MutableRecordReader<T2>> inputList2) {
+
+		if (inputList1 == null || inputList2 == null) {
+			throw new IllegalArgumentException("Provided argument recordReaders is null");
+		}
+
+		this.inputGates1 = new HashSet<InputGate<T1>>();
+		this.inputGates2 = new HashSet<InputGate<T2>>();
+		this.remainingInputGates = new HashSet<InputGate>(
+				(int) ((inputGates1.size() + inputGates2.size()) * 1.6f));
+
+		for (MutableRecordReader<T1> reader : inputList1) {
+			InputGate<T1> inputGate = (InputGate<T1>) reader.getInputGate();
+			inputGate.registerRecordAvailabilityListener(this);
+			this.inputGates1.add(inputGate);
+			this.remainingInputGates.add(inputGate);
+		}
+
+		for (MutableRecordReader<T2> reader : inputList2) {
+			InputGate<T2> inputGate = (InputGate<T2>) reader.getInputGate();
+			inputGate.registerRecordAvailabilityListener(this);
+			this.inputGates2.add(inputGate);
+			this.remainingInputGates.add(inputGate);
+		}
+	}
+
+	@Override
+	public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException {
+		for (InputGate<T1> gate : this.inputGates1) {
+			gate.publishEvent(event);
+		}
+		for (InputGate<T2> gate : this.inputGates2) {
+			gate.publishEvent(event);
+		}
+	}
+
+	@Override
+	public void reportRecordAvailability(InputGate inputGate) {
+		synchronized (this.availableInputGates) {
+			this.availableInputGates.add(inputGate);
+			this.availableInputGates.notifyAll();
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException {
+		int out;
+		while (true) {
+			// has the current input gate more data?
+			if (this.nextInputGateToReadFrom == null) {
+				if (this.remainingInputGates.isEmpty()) {
+					return 0;
+				}
+
+				this.nextInputGateToReadFrom = getNextAvailableInputGate();
+			}
+			InputChannelResult result;
+
+			if (inputGates1.contains(this.nextInputGateToReadFrom)) {
+				result = this.nextInputGateToReadFrom.readRecord(target1);
+				out = 1;
+			} else {
+				result = this.nextInputGateToReadFrom.readRecord(target2);
+				out = 2;
+			}
+			switch (result) {
+			case INTERMEDIATE_RECORD_FROM_BUFFER: // record is available and we
+													// can stay on the same
+													// channel
+				return out;
+
+			case LAST_RECORD_FROM_BUFFER: // record is available, but we need to
+											// re-check the channels
+				this.nextInputGateToReadFrom = null;
+				return out;
+
+			case END_OF_SUPERSTEP:
+				this.nextInputGateToReadFrom = null;
+				if (incrementEndOfSuperstepEventAndCheck()) {
+					return 0; // end of the superstep
+				} else {
+					break; // fall through and wait for next record/event
+				}
+
+			case TASK_EVENT: // event for the subscribers is available
+				handleEvent(this.nextInputGateToReadFrom.getCurrentEvent());
+				this.nextInputGateToReadFrom = null;
+				break;
+
+			case END_OF_STREAM: // one gate is empty
+				this.remainingInputGates.remove(this.nextInputGateToReadFrom);
+				this.nextInputGateToReadFrom = null;
+				break;
+
+			case NONE: // gate processed an internal event and could not return
+						// a record on this call
+				this.nextInputGateToReadFrom = null;
+				break;
+			}
+		}
+	}
+
+	private InputGate getNextAvailableInputGate() throws InterruptedException {
+		synchronized (this.availableInputGates) {
+			while (this.availableInputGates.isEmpty()) {
+				this.availableInputGates.wait();
+			}
+			return this.availableInputGates.pop();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
new file mode 100755
index 0000000..64588ce
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.Buffer;
+import org.apache.flink.runtime.io.network.api.ChannelSelector;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.io.network.api.RoundRobinChannelSelector;
+import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
+import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
+import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+public class StreamRecordWriter<T extends IOReadableWritable> extends
+		RecordWriter<T> {
+
+	private final BufferProvider bufferPool;
+
+	private final ChannelSelector<T> channelSelector;
+
+	private int numChannels;
+
+	private long timeout;
+
+	/** RecordSerializer per outgoing channel */
+	private RecordSerializer<T>[] serializers;
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public StreamRecordWriter(AbstractInvokable invokable) {
+		this(invokable, new RoundRobinChannelSelector<T>(), 1000);
+	}
+
+	public StreamRecordWriter(AbstractInvokable invokable,
+			ChannelSelector<T> channelSelector) {
+		this(invokable, channelSelector, 1000);
+	}
+
+	public StreamRecordWriter(AbstractInvokable invokable,
+			ChannelSelector<T> channelSelector, long timeout) {
+		// initialize the gate
+		super(invokable);
+
+		this.timeout = timeout;
+		this.bufferPool = invokable.getEnvironment().getOutputBufferProvider();
+		this.channelSelector = channelSelector;
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void initializeSerializers() {
+		this.numChannels = this.outputGate.getNumChannels();
+		this.serializers = new RecordSerializer[numChannels];
+		for (int i = 0; i < this.numChannels; i++) {
+			this.serializers[i] = new SpanningRecordSerializer<T>();
+		}
+		(new OutputFlusher()).start();
+	}
+
+	@Override
+	public void emit(final T record) throws IOException, InterruptedException {
+		for (int targetChannel : this.channelSelector.selectChannels(record,
+				this.numChannels)) {
+			// serialize with corresponding serializer and send full buffer
+
+			RecordSerializer<T> serializer = this.serializers[targetChannel];
+
+			synchronized (serializer) {
+				RecordSerializer.SerializationResult result = serializer
+						.addRecord(record);
+				while (result.isFullBuffer()) {
+					Buffer buffer = serializer.getCurrentBuffer();
+					if (buffer != null) {
+						sendBuffer(buffer, targetChannel);
+					}
+
+					buffer = this.bufferPool
+							.requestBufferBlocking(this.bufferPool
+									.getBufferSize());
+					result = serializer.setNextBuffer(buffer);
+				}
+			}
+		}
+	}
+
+	@Override
+	public void flush() throws IOException, InterruptedException {
+		for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) {
+			RecordSerializer<T> serializer = this.serializers[targetChannel];
+			synchronized (serializer) {
+				Buffer buffer = serializer.getCurrentBuffer();
+				if (buffer != null) {
+					sendBuffer(buffer, targetChannel);
+				}
+
+				serializer.clear();
+			}
+
+		}
+	}
+
+	private class OutputFlusher extends Thread {
+
+		@Override
+		public void run() {
+			while (!outputGate.isClosed()) {
+				try {
+					Thread.sleep(timeout);
+					flush();
+				} catch (Exception e) {
+					throw new RuntimeException(e);
+				}
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index 58ec692..43c36a0 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -53,5 +53,6 @@ public class PrintTest{
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 		env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print();
 		env.executeTest(MEMORYSIZE);
+		
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
index 588b4ff..1c91cc0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
@@ -104,7 +104,7 @@ public class CoGroupReduceTest {
 
 		@SuppressWarnings({ "unused", "unchecked" })
 		DataStream<String> ds4 = env1.fromElements(word1, word2, word3).connect(ds2).groupBy(0, 0)
-				.reduce(new MyCoReduceFunction(), 0, 0).addSink(new EmptySink());
+				.reduce(new MyCoReduceFunction()).addSink(new EmptySink());
 
 		env1.executeTest(32);
 
@@ -142,7 +142,7 @@ public class CoGroupReduceTest {
 
 		@SuppressWarnings({ "unused", "unchecked" })
 		DataStream<String> ds4 = env2.fromElements(word1, word2, word3).connect(ds2).groupBy(2, 0)
-				.reduce(new MyCoReduceFunction(), 2, 0).addSink(new EmptySink());
+				.reduce(new MyCoReduceFunction()).addSink(new EmptySink());
 
 		env2.executeTest(32);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index b107ac3..d80b937 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -39,6 +39,7 @@ public class IncrementalLearningSkeleton {
 
 		// Method for pulling new data for prediction
 		private Integer getNewData() throws InterruptedException {
+			Thread.sleep(1000);
 			return 1;
 		}
 	}
@@ -58,6 +59,7 @@ public class IncrementalLearningSkeleton {
 
 		// Method for pulling new training data
 		private Integer getTrainingData() throws InterruptedException {
+			Thread.sleep(1000);
 			return 1;
 
 		}
@@ -97,7 +99,7 @@ public class IncrementalLearningSkeleton {
 			// Update model
 			partialModel = value;
 			batchModel = getBatchModel();
-			return 0;
+			return 1;
 		}
 
 		// Pulls model built with batch-job on the old training data
@@ -122,7 +124,7 @@ public class IncrementalLearningSkeleton {
 
 		// Build new model on every second of new data
 		DataStream<Double[]> model = env.addSource(new TrainingDataSource(), SOURCE_PARALLELISM)
-				.windowReduce(new PartialModelBuilder(), 1000);
+				.windowReduce(new PartialModelBuilder(), 5000);
 
 		// Use partial model for prediction
 		DataStream<Integer> prediction = env.addSource(new NewDataSource(), SOURCE_PARALLELISM)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c3ec1e1f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java
index a22debe..fa41841 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java
@@ -69,7 +69,7 @@ public abstract class AbstractSingleGateRecordReader<T extends IOReadableWritabl
 		this.inputGate.publishEvent(event);
 	}
 
-	InputGate<T> getInputGate() {
+	public InputGate<T> getInputGate() {
 		return this.inputGate;
 	}
 }


Mime
View raw message