flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [6/7] flink git commit: [FLINK-2689] [runtime] Fix reuse of null object for solution set Joins and CoGroups.
Date Thu, 17 Sep 2015 12:04:08 GMT
[FLINK-2689] [runtime] Fix reuse of null object for solution set Joins and CoGroups.

This closes #1136


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0e5cdfb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0e5cdfb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0e5cdfb3

Branch: refs/heads/release-0.10.0-milestone-1
Commit: 0e5cdfb30cbdd4fbfded4644706fa9b85a956451
Parents: 7a11a90
Author: Fabian Hueske <fhueske@apache.org>
Authored: Wed Sep 16 16:56:06 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu Sep 17 11:56:09 2015 +0200

----------------------------------------------------------------------
 .../runtime/operators/CoGroupWithSolutionSetFirstDriver.java   | 6 +++---
 .../runtime/operators/CoGroupWithSolutionSetSecondDriver.java  | 6 +++---
 .../runtime/operators/JoinWithSolutionSetFirstDriver.java      | 4 ++--
 .../runtime/operators/JoinWithSolutionSetSecondDriver.java     | 4 ++--
 4 files changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0e5cdfb3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
index b27b6b9..97d6e51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
@@ -175,9 +175,9 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements
Resettab
 				while (this.running && probeSideInput.nextKey()) {
 					IT2 current = probeSideInput.getCurrent();
 
-					buildSideRecord = prober.getMatchFor(current, buildSideRecord);
-					if (buildSideRecord != null) {
-						siIter.set(buildSideRecord);
+					IT1 matchedRecord = prober.getMatchFor(current, buildSideRecord);
+					if (matchedRecord != null) {
+						siIter.set(matchedRecord);
 						coGroupStub.coGroup(siIter, probeSideInput.getValues(), collector);
 					} else {
 						coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector);

http://git-wip-us.apache.org/repos/asf/flink/blob/0e5cdfb3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
index ba0f8f9..9e8a81c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
@@ -174,9 +174,9 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements
Resetta
 				while (this.running && probeSideInput.nextKey()) {
 					IT1 current = probeSideInput.getCurrent();
 
-					buildSideRecord = prober.getMatchFor(current, buildSideRecord);
-					if (buildSideRecord != null) {
-						siIter.set(buildSideRecord);
+					IT2 matchedRecord = prober.getMatchFor(current, buildSideRecord);
+					if (matchedRecord != null) {
+						siIter.set(matchedRecord);
 						coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector);
 					} else {
 						coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector);

http://git-wip-us.apache.org/repos/asf/flink/blob/0e5cdfb3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
index a1c8a4a..fe926cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
@@ -166,8 +166,8 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements
ResettableP
 				IT1 buildSideRecord = this.solutionSideRecord;
 
 				while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord))
!= null)) {
-					buildSideRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
-					joinFunction.join(buildSideRecord, probeSideRecord, collector);
+					IT1 matchedRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
+					joinFunction.join(matchedRecord, probeSideRecord, collector);
 				}
 			} else if (objectMap != null) {
 				final JoinHashMap<IT1> hashTable = this.objectMap;

http://git-wip-us.apache.org/repos/asf/flink/blob/0e5cdfb3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
index 32a75dc..20079fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
@@ -168,8 +168,8 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements
Resettable
 				IT2 buildSideRecord = this.solutionSideRecord;
 
 				while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord))
!= null)) {
-					buildSideRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
-					joinFunction.join(probeSideRecord, buildSideRecord, collector);
+					IT2 matchedRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
+					joinFunction.join(probeSideRecord, matchedRecord, collector);
 				}
 			} else if (objectMap != null) {
 				final JoinHashMap<IT2> hashTable = this.objectMap;


Mime
View raw message