flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject flink git commit: [streaming] Small backport fixes for streaming to branch-0.8
Date Tue, 10 Feb 2015 07:50:21 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.8 6afa4ba6e -> e01712111


[streaming] Small backport fixes for streaming to branch-0.8

Fix for operators inheriting lower parallelism e01057e
Hash join for streaming joins 6d49d1d


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0171211
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0171211
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0171211

Branch: refs/heads/release-0.8
Commit: e01712111d6f3e85c986d59d31d6f59030c0faa3
Parents: 6afa4ba
Author: mbalassi <mbalassi@apache.org>
Authored: Mon Feb 9 15:59:03 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Feb 9 22:54:53 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  4 +-
 .../api/function/co/JoinWindowFunction.java     | 39 +++++++++++++++-----
 .../streaming/api/WindowCrossJoinTest.java      | 14 +++----
 3 files changed, 39 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0171211/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 8e21218..261f0ae 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1022,7 +1022,7 @@ public class DataStream<OUT> {
 				operatorName, outTypeInfo);
 
 		jobGraphBuilder.addStreamVertex(returnStream.getId(), invokable, getType(), outTypeInfo,
-				operatorName, degreeOfParallelism);
+				operatorName, returnStream.getParallelism());
 
 		connectGraph(inputStream, returnStream.getId(), 0);
 
@@ -1086,7 +1086,7 @@ public class DataStream<OUT> {
 		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink",
getType());
 
 		jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(
-				clean(sinkFunction)), getType(), null, "sink", degreeOfParallelism);
+				clean(sinkFunction)), getType(), null, "sink", returnStream.getParallelism());
 
 		this.connectGraph(this.copy(), returnStream.getId(), 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0171211/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
index 9b39f33..e8f46dd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.streaming.api.function.co;
 
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -32,24 +35,42 @@ public class JoinWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1,
 	private JoinFunction<IN1, IN2, OUT> joinFunction;
 
 	public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?>
keySelector2,
-			JoinFunction<IN1, IN2, OUT> joinFunction) {
-		this.keySelector1 = keySelector1;
+							JoinFunction<IN1, IN2, OUT> joinFunction) { this.keySelector1 = keySelector1;
 		this.keySelector2 = keySelector2;
 		this.joinFunction = joinFunction;
 	}
 
 	@Override
 	public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT>
out) throws Exception {
-		for (IN1 item1 : first) {
-			Object key1 = keySelector1.getKey(item1);
 
-			for (IN2 item2 : second) {
-				Object key2 = keySelector2.getKey(item2);
+		Map<Object, List<IN1>> map = build(first);
 
-				if (key1.equals(key2)) {
-					out.collect(joinFunction.join(item1, item2));
+		for (IN2 record : second) {
+			Object key = keySelector2.getKey(record);
+			List<IN1> match = map.get(key);
+			if (match != null) {
+				for (IN1 matching : match) {
+					out.collect(joinFunction.join(matching, record));
 				}
 			}
 		}
+
+	}
+
+	private Map<Object, List<IN1>> build(List<IN1> records) throws Exception
{
+
+		Map<Object, List<IN1>> map = new HashMap<Object, List<IN1>>();
+
+		for (IN1 record : records) {
+			Object key = keySelector1.getKey(record);
+			List<IN1> current = map.get(key);
+			if (current == null) {
+				current = new LinkedList<IN1>();
+				map.put(key, current);
+			}
+			current.add(record);
+		}
+
+		return map;
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0171211/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index b52418d..85bd8bb 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api;
 import static org.junit.Assert.assertEquals;
 
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.HashSet;
 
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
@@ -38,19 +38,19 @@ public class WindowCrossJoinTest implements Serializable {
 
 	private static final long MEMORYSIZE = 32;
 
-	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinResults
= new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
-	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults
= new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+	private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> joinResults
= new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>();
+	private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults
= new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>();
 
-	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossResults
= new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
-	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults
= new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+	private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> crossResults
= new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>();
+	private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults
= new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>();
 
 	@Test
 	public void test() throws Exception {
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 		env.setBufferTimeout(1);
 
-		ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer,
String>>();
-		ArrayList<Tuple1<Integer>> in2 = new ArrayList<Tuple1<Integer>>();
+		HashSet<Tuple2<Integer, String>> in1 = new HashSet<Tuple2<Integer, String>>();
+		HashSet<Tuple1<Integer>> in2 = new HashSet<Tuple1<Integer>>();
 
 		in1.add(new Tuple2<Integer, String>(10, "a"));
 		in1.add(new Tuple2<Integer, String>(20, "b"));


Mime
View raw message