flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [03/28] git commit: [streaming] CoFlatMap & CoGroupReduce functionality
Date Fri, 29 Aug 2014 19:03:36 GMT
[streaming] CoFlatMap & CoGroupReduce functionality


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/75960122
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/75960122
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/75960122

Branch: refs/heads/master
Commit: 75960122b47f84d258273c0cd7f36b2aae7a684d
Parents: 1e38a63
Author: szape <nemderogatorius@gmail.com>
Authored: Fri Aug 15 13:35:48 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Aug 29 21:01:56 2014 +0200

----------------------------------------------------------------------
 .../api/datastream/ConnectedDataStream.java     | 103 ++++++++++++-
 .../api/function/co/CoFlatMapFunction.java      |  42 +++++
 .../api/function/co/CoReduceFunction.java       | 113 ++++++++++++++
 .../operator/co/CoFlatMapInvokable.java         |  60 ++++++++
 .../operator/co/CoGroupReduceInvokable.java     |  75 +++++++++
 .../api/invokable/operator/co/CoInvokable.java  |  29 +++-
 .../invokable/operator/co/CoMapInvokable.java   |  29 +---
 .../api/streamcomponent/CoStreamTask.java       |   8 +-
 .../api/invokable/operator/CoFlatMapTest.java   | 101 ++++++++++++
 .../invokable/operator/CoGroupReduceTest.java   | 152 +++++++++++++++++++
 10 files changed, 684 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75960122/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 16a383f..1c99ee4 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -24,18 +24,25 @@ import java.io.Serializable;
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
+import org.apache.flink.streaming.partitioner.FieldsPartitioner;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
 
 /**
- * The ConnectedDataStream represents a stream for two different data types. It can be
- * used to apply transformations like {@link CoMapFunction} on two
+ * The ConnectedDataStream represents a stream for two different data types. It
+ * can be used to apply transformations like {@link CoMapFunction} on two
  * {@link DataStream}s
  *
  * @param <IN1>
@@ -50,8 +57,8 @@ public class ConnectedDataStream<IN1, IN2> {
 	DataStream<IN1> input1;
 	DataStream<IN2> input2;
 
-	protected ConnectedDataStream(StreamExecutionEnvironment environment, JobGraphBuilder jobGraphBuilder,
-			DataStream<IN1> input1, DataStream<IN2> input2) {
+	protected ConnectedDataStream(StreamExecutionEnvironment environment,
+			JobGraphBuilder jobGraphBuilder, DataStream<IN1> input1, DataStream<IN2> input2)
{
 		this.jobGraphBuilder = jobGraphBuilder;
 		this.environment = environment;
 		this.input1 = input1.copy();
@@ -77,6 +84,46 @@ public class ConnectedDataStream<IN1, IN2> {
 	}
 
 	/**
+	 * Sets the partitioning of the two separate {@link DataStream}s so that the
+	 * output tuples are partitioned by their hashcode and are sent to only one
+	 * component.
+	 * 
+	 * @param keyPosition1
+	 *            The field used to compute the hashcode of the elements in the
+	 *            first input stream.
+	 * @param keyPosition2
+	 *            The field used to compute the hashcode of the elements in the
+	 *            second input stream.
+	 * @return The DataStream with field partitioning set.
+	 */
+	public ConnectedDataStream<IN1, IN2> partitionBy(int keyPosition1, int keyPosition2)
{
+		if (keyPosition1 < 0 || keyPosition2 < 0) {
+			throw new IllegalArgumentException("The position of the field must be non-negative");
+		}
+
+		return new ConnectedDataStream<IN1, IN2>(this.environment, this.jobGraphBuilder,
getFirst()
+				.setConnectionType(new FieldsPartitioner<IN1>(keyPosition1)), getSecond()
+				.setConnectionType(new FieldsPartitioner<IN2>(keyPosition2)));
+	}
+
+	/**
+	 * GroupBy operation for connected data stream. Groups the elements of
+	 * input1 and input2 according to keyPosition1 and keyPosition2. Basically
+	 * used before Reduce operation.
+	 * 
+	 * @param keyPosition1
+	 *            The field used to compute the hashcode of the elements in the
+	 *            first input stream.
+	 * @param keyPosition2
+	 *            The field used to compute the hashcode of the elements in the
+	 *            second input stream.
+	 * @return
+	 */
+	public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
+		return this.partitionBy(keyPosition1, keyPosition2);
+	}
+
+	/**
 	 * Applies a CoMap transformation on two separate {@link DataStream}s. The
 	 * transformation calls a {@link CoMapFunction#map1} for each element of the
 	 * first input and {@link CoMapFunction#map2} for each element of the second
@@ -94,6 +141,54 @@ public class ConnectedDataStream<IN1, IN2> {
 				CoMapFunction.class, 0, 1, 2), new CoMapInvokable<IN1, IN2, OUT>(coMapper));
 	}
 
+	/**
+	 * Applies a CoFlatMap transformation on two separate {@link DataStream}s.
+	 * The transformation calls a {@link CoFlatMapFunction#map1} for each
+	 * element of the first input and {@link CoFlatMapFunction#map2} for each
+	 * element of the second input. Each CoFlatMapFunction call returns any
+	 * number of elements including none. The user can also extend
+	 * {@link RichFlatMapFunction} to gain access to other features provided by
+	 * the {@link RichFuntion} interface.
+	 * 
+	 * @param coFlatMapper
+	 *            The CoFlatMapFunction used to jointly transform the two input
+	 *            DataStreams
+	 * @return The transformed DataStream
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
+			CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
+		return addCoFunction("coFlatMap", coFlatMapper, new FunctionTypeWrapper<IN1, IN2, OUT>(
+				coFlatMapper, CoFlatMapFunction.class, 0, 1, 2),
+				new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
+	}
+
+	/**
+	 * Applies a CoReduce transformation on the grouped data stream grouped on
+	 * by the given key position. The {@link CoReduceFunction} will receive
+	 * input values based on the key positions. The transformation calls
+	 * {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
+	 * each element of the first input and {@link CoReduceFunction#reduce2} and
+	 * {@link CoReduceFunction#map2} for each element of the second input. For
+	 * each input, only values with the same key will go to the same reducer.
+	 * The user can also extend {@link RichReduceFunction} to gain access to
+	 * other features provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param coReducer
+	 *            The {@link CoReduceFunction} that will be called for every two
+	 *            element with the same key of each input DataStream.
+	 * @param keyPosition1
+	 *            position of the key in the first input DataStream
+	 * @param keyPosition2
+	 *            position of the key in the second input DataStream
+	 * @return The transformed DataStream.
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(
+			CoReduceFunction<IN1, IN2, OUT> coReducer, int keyPosition1, int keyPosition2) {
+		return addCoFunction("coReduce", coReducer, new FunctionTypeWrapper<IN1, IN2, OUT>(
+				coReducer, CoReduceFunction.class, 0, 1, 2),
+				new CoGroupReduceInvokable<IN1, IN2, OUT>(coReducer, keyPosition1, keyPosition2));
+	}
+
 	protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
 			final Function function, TypeSerializerWrapper<IN1, IN2, OUT> typeWrapper,
 			CoInvokable<IN1, IN2, OUT> functionInvokable) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75960122/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java
new file mode 100644
index 0000000..81c6886
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+/**
+ * A CoFlatMapFunction represents a FlatMap transformation with two different
+ * input types.
+ *
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+	public void flatMap1(IN1 value, Collector<OUT> out) throws Exception;
+
+	public void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75960122/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
new file mode 100644
index 0000000..fa58991
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * The CoReduceFunction interface represents a Reduce transformation with two
+ * different input streams. The reduce1 function combine groups of elements of
+ * the first input with the same key to a single value, while reduce2 combine
+ * groups of elements of the second input with the same key to a single value.
+ * Each produced values are mapped to the same type by map1 and map2,
+ * respectively, to form one output stream.
+ * 
+ * The basic syntax for using a grouped ReduceFunction is as follows:
+ * 
+ * <pre>
+ * <blockquote>
+ * ConnectedDataStream<X> input = ...;
+ * 
+ * ConnectedDataStream<X> result = input.groupBy(keyPosition1, keyPosition2)
+ *          .reduce(new MyCoReduceFunction(), keyPosition1, keyPosition2).addSink(...);
+ * </blockquote>
+ * </pre>
+ * <p>
+ * 
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public interface CoReduceFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+	/**
+	 * The core method of CoReduceFunction, combining two values of the first
+	 * input into one value of the same type. The reduce1 function is
+	 * consecutively applied to all values of a group until only a single value
+	 * remains.
+	 *
+	 * @param value1
+	 *            The first value to combine.
+	 * @param value2
+	 *            The second value to combine.
+	 * @return The combined value of both input values.
+	 *
+	 * @throws Exception
+	 *             This method may throw exceptions. Throwing an exception will
+	 *             cause the operation to fail and may trigger recovery.
+	 */
+	public abstract IN1 reduce1(IN1 value1, IN1 value2);
+
+	/**
+	 * The core method of ReduceFunction, combining two values of the second
+	 * input into one value of the same type. The reduce2 function is
+	 * consecutively applied to all values of a group until only a single value
+	 * remains.
+	 *
+	 * @param value1
+	 *            The first value to combine.
+	 * @param value2
+	 *            The second value to combine.
+	 * @return The combined value of both input values.
+	 *
+	 * @throws Exception
+	 *             This method may throw exceptions. Throwing an exception will
+	 *             cause the operation to fail and may trigger recovery.
+	 */
+	public abstract IN2 reduce2(IN2 value1, IN2 value2);
+
+	/**
+	 * Maps the reduced first input to the output type.
+	 * 
+	 * @param <IN1>
+	 *            Type of the first input.
+	 * @param <IN2>
+	 *            Type of the second input.
+	 * @param <OUT>
+	 *            Output type.
+	 */
+	public abstract OUT map1(IN1 value);
+
+	/**
+	 * Maps the reduced second input to the output type.
+	 * 
+	 * @param <IN1>
+	 *            Type of the first input.
+	 * @param <IN2>
+	 *            Type of the second input.
+	 * @param <OUT>
+	 *            Output type.
+	 */
+	public abstract OUT map2(IN2 value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75960122/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
new file mode 100644
index 0000000..d81b09c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
+
+public class CoFlatMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT>
{
+	private static final long serialVersionUID = 1L;
+
+	private CoFlatMapFunction<IN1, IN2, OUT> flatMapper;
+
+	public CoFlatMapInvokable(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
+		super(flatMapper);
+		this.flatMapper = flatMapper;
+	}
+
+	@Override
+	public void handleStream1() throws Exception {
+		flatMapper.flatMap1(reuse1.getObject(), collector);
+	}
+
+	@Override
+	public void handleStream2() throws Exception {
+		flatMapper.flatMap2(reuse2.getObject(), collector);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		if (flatMapper instanceof RichFunction) {
+			((RichFunction) flatMapper).open(parameters);
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (flatMapper instanceof RichFunction) {
+			((RichFunction) flatMapper).close();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75960122/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
new file mode 100644
index 0000000..e52e9be
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
@@ -0,0 +1,75 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+import org.apache.flink.streaming.state.MutableTableState;
+
+public class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private CoReduceFunction<IN1, IN2, OUT> coReducer;
+	private int keyPosition1;
+	private MutableTableState<Object, IN1> values1;
+
+	private int keyPosition2;
+	private MutableTableState<Object, IN2> values2;
+
+	public CoGroupReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, int keyPosition1,
+			int keyPosition2) {
+		super(coReducer);
+		this.coReducer = coReducer;
+		this.keyPosition1 = keyPosition1;
+		this.keyPosition2 = keyPosition2;
+		values1 = new MutableTableState<Object, IN1>();
+		values2 = new MutableTableState<Object, IN2>();
+	}
+
+	@Override
+	public void handleStream1() throws Exception {
+		Object key = reuse1.getField(keyPosition1);
+		IN1 currentValue = values1.get(key);
+		IN1 nextValue = reuse1.getObject();
+		if (currentValue != null) {
+			IN1 reduced = coReducer.reduce1(currentValue, nextValue);
+			values1.put(key, reduced);
+			collector.collect(coReducer.map1(reduced));
+		} else {
+			values1.put(key, nextValue);
+			collector.collect(coReducer.map1(nextValue));
+		}
+	}
+
+	@Override
+	public void handleStream2() throws Exception {
+		Object key = reuse2.getField(keyPosition2);
+		IN2 currentValue = values2.get(key);
+		IN2 nextValue = reuse2.getObject();
+		if (currentValue != null) {
+			IN2 reduced = coReducer.reduce2(currentValue, nextValue);
+			values2.put(key, reduced);
+			collector.collect(coReducer.map2(reduced));
+		} else {
+			values2.put(key, nextValue);
+			collector.collect(coReducer.map2(nextValue));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75960122/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 8e002ee..d94cb8a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -26,8 +26,7 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public abstract class CoInvokable<IN1, IN2, OUT> extends
-		StreamComponentInvokable<OUT> {
+public abstract class CoInvokable<IN1, IN2, OUT> extends StreamComponentInvokable<OUT>
{
 
 	public CoInvokable(Function userFunction) {
 		super(userFunction);
@@ -65,4 +64,30 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends
 		this.reuse1 = serializer1.createInstance();
 		this.reuse2 = serializer2.createInstance();
 	}
+
+	public void invoke() throws Exception {
+		boolean noMoreRecordOnInput1 = false;
+		boolean noMoreRecordOnInput2 = false;
+
+		do {
+			noMoreRecordOnInput1 = ((reuse1 = recordIterator1.next(reuse1)) == null);
+			if (!noMoreRecordOnInput1) {
+				handleStream1();
+			}
+
+			noMoreRecordOnInput2 = ((reuse2 = recordIterator2.next(reuse2)) == null);
+			if (!noMoreRecordOnInput2) {
+				handleStream2();
+			}
+
+			if (!this.isMutable) {
+				resetReuse();
+			}
+		} while (!noMoreRecordOnInput1 || !noMoreRecordOnInput2);
+	}
+
+	public abstract void handleStream1() throws Exception;
+
+	public abstract void handleStream2() throws Exception;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75960122/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
index ab10b40..6087c29 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
@@ -32,28 +32,15 @@ public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1,
IN2, OUT> {
 		super(mapper);
 		this.mapper = mapper;
 	}
-
-	// TODO rework this as UnionRecordReader
+	
 	@Override
-	public void invoke() throws Exception {
-		boolean noMoreRecordOnInput1 = false;
-		boolean noMoreRecordOnInput2 = false;
-
-		do {
-			noMoreRecordOnInput1 = ((reuse1 = recordIterator1.next(reuse1)) == null);
-			if (!noMoreRecordOnInput1) {
-				collector.collect(mapper.map1(reuse1.getObject()));
-			}
-
-			noMoreRecordOnInput2 = ((reuse2 = recordIterator2.next(reuse2)) == null);
-			if (!noMoreRecordOnInput2) {
-				collector.collect(mapper.map2(reuse2.getObject()));
-			}
-
-			if (!this.isMutable) {
-				resetReuse();
-			}
-		} while (!noMoreRecordOnInput1 && !noMoreRecordOnInput2);
+	public void handleStream1() throws Exception {
+		collector.collect(mapper.map1(reuse1.getObject()));
+	}
+	
+	@Override
+	public void handleStream2() throws Exception {
+		collector.collect(mapper.map2(reuse2.getObject()));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75960122/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index a595cfa..3d9b811 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -27,7 +27,9 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.io.network.api.MutableRecordReader;
 import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
+import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -61,9 +63,13 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT
extends Tupl
 
 		Object function = configuration.getFunction();
 		try {
+			setSerializer();
 			if (operatorName.equals("coMap")) {
-				setSerializer();
 				setDeserializers(function, CoMapFunction.class);
+			} else if (operatorName.equals("coFlatMap")) {
+				setDeserializers(function, CoFlatMapFunction.class);
+			} else if (operatorName.equals("coReduce")) {
+			setDeserializers(function, CoReduceFunction.class);
 			} else {
 				throw new Exception("Wrong operator name!");
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75960122/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
new file mode 100644
index 0000000..f6e7231
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
@@ -0,0 +1,101 @@
+/** Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.LogUtils;
+import org.apache.flink.util.Collector;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CoFlatMapTest implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	private static Set<String> result;
+	private static Set<String> expected = new HashSet<String>();
+
+	private final static class EmptySink implements SinkFunction<String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(String tuple) {
+		}
+	}
+
+	private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer,
String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap1(String value, Collector<String> coll) {
+			for (int i = 0; i < value.length(); i++) {
+				coll.collect(value.substring(i, i + 1));
+				result.add(value.substring(i, i + 1));
+			}
+		}
+
+		@Override
+		public void flatMap2(Integer value, Collector<String> coll) {
+			coll.collect(value.toString());
+			result.add(value.toString());
+		}
+	}
+
+	@Test
+	public void multipleInputTest() {
+		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+		expected.add("a");
+		expected.add("b");
+		expected.add("c");
+		expected.add("d");
+		expected.add("e");
+		expected.add("f");
+		expected.add("g");
+		expected.add("h");
+		expected.add("e");
+		expected.add("1");
+		expected.add("2");
+		expected.add("3");
+		expected.add("4");
+		expected.add("5");
+
+		result = new HashSet<String>();
+
+		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		DataStream<Integer> ds1 = env.fromElements(1, 3, 5);
+		@SuppressWarnings("unchecked")
+		DataStream<Integer> ds2 = env.fromElements(2, 4).merge(ds1);
+
+		@SuppressWarnings({ "unused" })
+		DataStream<String> ds4 = env.fromElements("abc", "def", "ghe").connect(ds2)
+				.flatMap(new MyCoFlatMap()).addSink(new EmptySink());
+
+		env.executeTest(32);
+
+		Assert.assertEquals(expected, result);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75960122/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
new file mode 100644
index 0000000..588b4ff
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
@@ -0,0 +1,152 @@
+/** Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.LogUtils;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CoGroupReduceTest {
+
+	private static List<String> result;
+	private static List<String> expected = new ArrayList<String>();
+
+	private final static class EmptySink implements SinkFunction<String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(String tuple) {
+		}
+	}
+
+	private final static class MyCoReduceFunction implements
+			CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>,
String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple3<String, String, String> reduce1(Tuple3<String, String, String>
value1,
+				Tuple3<String, String, String> value2) {
+			return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2);
+		}
+
+		@Override
+		public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1,
+				Tuple2<Integer, Integer> value2) {
+			return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
+		}
+
+		@Override
+		public String map1(Tuple3<String, String, String> value) {
+			String mapResult = value.f1;
+			result.add(mapResult);
+			return mapResult;
+		}
+
+		@Override
+		public String map2(Tuple2<Integer, Integer> value) {
+			String mapResult = value.f1.toString();
+			result.add(mapResult);
+			return mapResult;
+		}
+	}
+
+	@Test
+	public void multipleInputTest() {
+		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+		expected.add("word1word3");
+		expected.add("word2");
+		expected.add("3");
+		expected.add("5");
+		expected.add("7");
+		Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a",
"word1", "b");
+		Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b",
"word2", "a");
+		Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a",
"word3", "a");
+		Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
+		Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
+		Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
+		Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
+		Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
+
+		result = new ArrayList<String>();
+
+		LocalStreamEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		@SuppressWarnings("unchecked")
+		DataStream<Tuple2<Integer, Integer>> ds1 = env1.fromElements(int1, int3, int5);
+		@SuppressWarnings("unchecked")
+		DataStream<Tuple2<Integer, Integer>> ds2 = env1.fromElements(int2, int4).merge(ds1);
+
+		@SuppressWarnings({ "unused", "unchecked" })
+		DataStream<String> ds4 = env1.fromElements(word1, word2, word3).connect(ds2).groupBy(0,
0)
+				.reduce(new MyCoReduceFunction(), 0, 0).addSink(new EmptySink());
+
+		env1.executeTest(32);
+
+		Assert.assertEquals(result.size(), 8);
+		Assert.assertTrue(result.containsAll(expected));
+	}
+
+	@Test
+	public void multipleInputTest2() {
+		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+		expected.clear();
+		result.clear();
+		expected.add("word2word3");
+		expected.add("word1");
+		expected.add("3");
+		expected.add("5");
+		expected.add("7");
+		Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a",
"word1", "b");
+		Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b",
"word2", "a");
+		Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a",
"word3", "a");
+		Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
+		Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
+		Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
+		Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
+		Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
+
+		result = new ArrayList<String>();
+
+		LocalStreamEnvironment env2 = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		@SuppressWarnings("unchecked")
+		DataStream<Tuple2<Integer, Integer>> ds1 = env2.fromElements(int1, int3, int5);
+		@SuppressWarnings("unchecked")
+		DataStream<Tuple2<Integer, Integer>> ds2 = env2.fromElements(int2, int4).merge(ds1);
+
+		@SuppressWarnings({ "unused", "unchecked" })
+		DataStream<String> ds4 = env2.fromElements(word1, word2, word3).connect(ds2).groupBy(2,
0)
+				.reduce(new MyCoReduceFunction(), 2, 0).addSink(new EmptySink());
+
+		env2.executeTest(32);
+
+		Assert.assertEquals(result.size(), 8);
+		Assert.assertTrue(result.containsAll(expected));
+	}
+}


Mime
View raw message