flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [12/51] [abbrv] git commit: [streaming] Fixed multiple input CoFunction
Date Mon, 18 Aug 2014 17:25:49 GMT
[streaming] Fixed multiple input CoFunction


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b92ce014
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b92ce014
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b92ce014

Branch: refs/heads/master
Commit: b92ce014fa178223119eab3b1f950d0c83630602
Parents: f436690
Author: ghermann <reckoner42@gmail.com>
Authored: Tue Jul 22 15:07:30 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Aug 18 16:19:17 2014 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    |  9 ++-
 .../api/streamcomponent/CoStreamTask.java       |  6 +-
 .../api/invokable/operator/CoMapTest.java       | 81 ++++++++++++++------
 3 files changed, 69 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b92ce014/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 022fcd8..73f8d3a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -754,17 +754,20 @@ public class JobGraphBuilder {
 		for (String componentName : outEdgeList.keySet()) {
 			createVertex(componentName);
 		}
-		int inputNumber = 0;
+		
 		for (String upStreamComponentName : outEdgeList.keySet()) {
-			
 			int i = 0;
+			
 			ArrayList<Integer> outEdgeTypeList = outEdgeType.get(upStreamComponentName);
 
 			for (String downStreamComponentName : outEdgeList.get(upStreamComponentName)) {
 				Configuration downStreamComponentConfig = components.get(downStreamComponentName)
 						.getConfiguration();
+				
+				int inputNumber = downStreamComponentConfig.getInteger("numberOfInputs", 0);				
 				downStreamComponentConfig.setInteger("inputType_" + inputNumber++, outEdgeTypeList.get(i));
-
+				downStreamComponentConfig.setInteger("numberOfInputs", inputNumber);
+				
 				connect(upStreamComponentName, downStreamComponentName,
 						connectionTypes.get(upStreamComponentName).get(i));
 				i++;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b92ce014/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index 4c1cf42..204bcfe 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -168,8 +168,10 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT
extends Tupl
 		if (inputList.size() == 1) {
 			return inputList.get(0);
 		} else if (inputList.size() > 1) {
-			return new MutableUnionRecordReader<IOReadableWritable>(
-					(MutableRecordReader<IOReadableWritable>[]) inputList.toArray());
+			MutableRecordReader<IOReadableWritable>[] inputArray = inputList
+					.toArray(new MutableRecordReader[inputList.size()]);
+
+			return new MutableUnionRecordReader<IOReadableWritable>(inputArray);
 		}
 		return null;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b92ce014/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
index 00ae3b9..b2fd3cf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -28,17 +28,49 @@ import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.LogUtils;
+import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class CoMapTest implements Serializable {
 	private static final long serialVersionUID = 1L;
-	
-	private static Set<String> result = new HashSet<String>();
+
+	private static Set<String> result;
 	private static Set<String> expected = new HashSet<String>();
 
+	private final static class EmptySink extends SinkFunction<Tuple1<Boolean>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(Tuple1<Boolean> tuple) {
+		}
+	}
+
+	private final static class MyCoMap extends
+			CoMapFunction<Tuple1<String>, Tuple1<Integer>, Tuple1<Boolean>>
{
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple1<Boolean> map1(Tuple1<String> value) {
+			result.add(value.f0);
+			return new Tuple1<Boolean>(true);
+		}
+
+		@Override
+		public Tuple1<Boolean> map2(Tuple1<Integer> value) {
+			result.add(value.f0.toString());
+			return new Tuple1<Boolean>(false);
+		}
+	}
+
 	@Test
 	public void test() {
+		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+		
+		result = new HashSet<String>();
+		
 		expected.add("a");
 		expected.add("b");
 		expected.add("c");
@@ -46,31 +78,36 @@ public class CoMapTest implements Serializable {
 		expected.add("2");
 		expected.add("3");
 		expected.add("4");
-		
+
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-		
+
 		DataStream<Tuple1<Integer>> ds1 = env.fromElements(1, 2, 3, 4);
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Boolean>> ds2 = env.fromElements("a", "b", "c").coMapWith(new
CoMapFunction<Tuple1<String>, Tuple1<Integer>, Tuple1<Boolean>>()
{
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Tuple1<Boolean> map1(Tuple1<String> value) {
-				System.out.println("1: " + value);
-				result.add(value.f0);
-				return new Tuple1<Boolean>(true);
-			}
-
-			@Override
-			public Tuple1<Boolean> map2(Tuple1<Integer> value) {
-				System.out.println("2: " +value);
-				result.add(value.f0.toString());
-				return new Tuple1<Boolean>(false);
-			}
-		}, ds1)
-		.print();
+		DataStream<Tuple1<Boolean>> ds2 = env.fromElements("a", "b", "c")
+				.coMapWith(new MyCoMap(), ds1).addSink(new EmptySink());
+
+		env.executeTest(32);
+		Assert.assertArrayEquals(expected.toArray(), result.toArray());
+	}
+
+	@Test
+	public void multipleInputTest() {
+		result = new HashSet<String>();
+
+		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		DataStream<Tuple1<Integer>> ds1 = env.fromElements(1, 3);
+		@SuppressWarnings("unchecked")
+		DataStream<Tuple1<Integer>> ds2 = env.fromElements(2, 4).connectWith(ds1);
+
+		DataStream<Tuple1<String>> ds3 = env.fromElements("a", "b");
 		
+		@SuppressWarnings({ "unused", "unchecked" })
+		DataStream<Tuple1<Boolean>> ds4 = env.fromElements("c").connectWith(ds3).coMapWith(new
MyCoMap(),
+
+		ds2).addSink(new EmptySink());
+
 		env.executeTest(32);
 		Assert.assertArrayEquals(expected.toArray(), result.toArray());
 	}


Mime
View raw message