flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] git commit: [FLINK-957] Throw meaningful warnings and exceptions upon incorrect use of delta iterations.
Date Fri, 20 Jun 2014 13:33:15 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master f8ec28c73 -> cd665b9e8


[FLINK-957] Throw meaningful warnings and exceptions upon incorrect use of delta iterations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cd665b9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cd665b9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cd665b9e

Branch: refs/heads/master
Commit: cd665b9e8abec2bbfecf384fe7273bd50f22ce67
Parents: 9d57045
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Jun 19 19:54:23 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Jun 20 15:32:20 2014 +0200

----------------------------------------------------------------------
 .../eu/stratosphere/compiler/PactCompiler.java  |  2 +-
 .../stratosphere/api/java/DeltaIteration.java   |  2 +-
 .../api/java/operators/CoGroupOperator.java     | 20 ++++++
 .../stratosphere/api/java/operators/Keys.java   | 27 ++++++---
 .../DeltaIterationTranslationTest.java          | 64 +++++++++++++++++++-
 5 files changed, 105 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cd665b9e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
index 96eb01d..2076902 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
@@ -1055,7 +1055,7 @@ public class PactCompiler {
 							}
 						}
 						else {
-							throw new CompilerException("Error: The solution set may only be joined with through
a Join or a CoGroup function.");
+							throw new CompilerException("Error: The only operations allowed on the solution set
are Join and CoGroup.");
 						}
 					}
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cd665b9e/stratosphere-java/src/main/java/eu/stratosphere/api/java/DeltaIteration.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DeltaIteration.java
b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DeltaIteration.java
index 7fa6638..3522947 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DeltaIteration.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DeltaIteration.java
@@ -223,7 +223,7 @@ public class DeltaIteration<ST, WT> {
 		public void checkJoinKeyFields(int[] keyFields) {
 			int[] ssKeys = deltaIteration.keys.computeLogicalKeyPositions();
 			if (!Arrays.equals(ssKeys, keyFields)) {
-				throw new InvalidProgramException("The solution set must be joind with using the keys
with which elements are identified.");
+				throw new InvalidProgramException("The solution can only be joined/co-grouped with the
same keys as the elements are identified with (here: " + Arrays.toString(ssKeys) + ").");
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cd665b9e/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CoGroupOperator.java
b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CoGroupOperator.java
index ca4b1db..f2484ce 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CoGroupOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CoGroupOperator.java
@@ -25,8 +25,10 @@ import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
 import eu.stratosphere.api.common.operators.base.CoGroupOperatorBase;
 import eu.stratosphere.api.common.operators.base.MapOperatorBase;
 import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.DeltaIteration.SolutionSetPlaceHolder;
 import eu.stratosphere.api.java.functions.CoGroupFunction;
 import eu.stratosphere.api.java.functions.KeySelector;
+import eu.stratosphere.api.java.operators.Keys.FieldPositionKeys;
 import eu.stratosphere.api.java.operators.translation.KeyExtractingMapper;
 import eu.stratosphere.api.java.operators.translation.PlanUnwrappingCoGroupOperator;
 import eu.stratosphere.api.java.operators.translation.TupleKeyExtractingMapper;
@@ -409,6 +411,24 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
I2, OU
 				if (!keys1.areCompatibale(keys2)) {
 					throw new InvalidProgramException("The pair of join keys are not compatible with each
other.");
 				}
+				
+				// sanity check solution set key mismatches
+				if (input1 instanceof SolutionSetPlaceHolder) {
+					if (keys1 instanceof FieldPositionKeys) {
+						int[] positions = ((FieldPositionKeys<?>) keys1).computeLogicalKeyPositions();
+						((SolutionSetPlaceHolder<?>) input1).checkJoinKeyFields(positions);
+					} else {
+						throw new InvalidProgramException("Currently, the solution set may only be CoGrouped
with using tuple field positions.");
+					}
+				}
+				if (input2 instanceof SolutionSetPlaceHolder) {
+					if (keys2 instanceof FieldPositionKeys) {
+						int[] positions = ((FieldPositionKeys<?>) keys2).computeLogicalKeyPositions();
+						((SolutionSetPlaceHolder<?>) input2).checkJoinKeyFields(positions);
+					} else {
+						throw new InvalidProgramException("Currently, the solution set may only be CoGrouped
with using tuple field positions.");
+					}
+				}
 
 				return new CoGroupOperatorWithoutFunction(keys2);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cd665b9e/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/Keys.java
b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/Keys.java
index 9c89eb1..6026903 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/Keys.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/Keys.java
@@ -42,7 +42,7 @@ public abstract class Keys<T> {
 	
 	public static class FieldPositionKeys<T> extends Keys<T> {
 		
-		private final int[] groupingFields;
+		private final int[] fieldPositions;
 		private final TypeInformation<?>[] types;
 		
 		public FieldPositionKeys(int[] groupingFields, TypeInformation<T> type) {
@@ -60,18 +60,18 @@ public abstract class Keys<T> {
 			
 			TupleTypeInfo<?> tupleType = (TupleTypeInfo<?>)type;
 	
-			this.groupingFields = makeFields(groupingFields, (TupleTypeInfo<?>) type);
+			this.fieldPositions = makeFields(groupingFields, (TupleTypeInfo<?>) type);
 			
-			types = new TypeInformation[this.groupingFields.length];
-			for(int i = 0; i < this.groupingFields.length; i++) {
-				types[i] = tupleType.getTypeAt(this.groupingFields[i]);
+			types = new TypeInformation[this.fieldPositions.length];
+			for(int i = 0; i < this.fieldPositions.length; i++) {
+				types[i] = tupleType.getTypeAt(this.fieldPositions[i]);
 			}
 			
 		}
 
 		@Override
 		public int getNumberOfKeyFields() {
-			return this.groupingFields.length;
+			return this.fieldPositions.length;
 		}
 
 		@Override
@@ -106,9 +106,13 @@ public abstract class Keys<T> {
 
 		@Override
 		public int[] computeLogicalKeyPositions() {
-			return this.groupingFields;
+			return this.fieldPositions;
 		}
 	
+		@Override
+		public String toString() {
+			return Arrays.toString(fieldPositions);
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -119,6 +123,10 @@ public abstract class Keys<T> {
 		private final TypeInformation<K> keyType;
 		
 		public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, TypeInformation<T>
type) {
+			if (keyExtractor == null) {
+				throw new NullPointerException("Key extractor must not be null.");
+			}
+			
 			this.keyExtractor = keyExtractor;
 			this.keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
 		}
@@ -163,6 +171,11 @@ public abstract class Keys<T> {
 		public int[] computeLogicalKeyPositions() {
 			return new int[] {0};
 		}
+		
+		@Override
+		public String toString() {
+			return keyExtractor + " (" + keyType + ")";
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cd665b9e/stratosphere-java/src/test/java/eu/stratosphere/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/operators/translation/DeltaIterationTranslationTest.java
b/stratosphere-java/src/test/java/eu/stratosphere/api/java/operators/translation/DeltaIterationTranslationTest.java
index ad170a2..37b01ef 100644
--- a/stratosphere-java/src/test/java/eu/stratosphere/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -33,10 +33,12 @@ import eu.stratosphere.api.common.operators.base.MapOperatorBase;
 import eu.stratosphere.api.java.DataSet;
 import eu.stratosphere.api.java.DeltaIteration;
 import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.functions.CoGroupFunction;
 import eu.stratosphere.api.java.functions.JoinFunction;
 import eu.stratosphere.api.java.functions.MapFunction;
 import eu.stratosphere.api.java.tuple.Tuple2;
 import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.util.Collector;
 
 @SuppressWarnings("serial")
 public class DeltaIterationTranslationTest implements java.io.Serializable {
@@ -137,7 +139,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable
{
 	}
 	
 	@Test
-	public void testRejectWhenSolutionSetKeysDontMatch() {
+	public void testRejectWhenSolutionSetKeysDontMatchJoin() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			
@@ -156,6 +158,50 @@ public class DeltaIterationTranslationTest implements java.io.Serializable
{
 			catch (InvalidProgramException e) {
 				// all good!
 			}
+			
+			try {
+				iteration.getSolutionSet().join(iteration.getWorkset()).where(2).equalTo(1);
+				fail("Accepted invalid program.");
+			}
+			catch (InvalidProgramException e) {
+				// all good!
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testRejectWhenSolutionSetKeysDontMatchCoGroup() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new
Tuple3<Double, Long, String>(3.44, 5L, "abc"));
+
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double,
String>(1.23, "abc"));
+			
+			DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>>
iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1);
+			
+			try {
+				iteration.getWorkset().coGroup(iteration.getSolutionSet()).where(1).equalTo(2).with(new
SolutionWorksetCoGroup1());
+				fail("Accepted invalid program.");
+			}
+			catch (InvalidProgramException e) {
+				// all good!
+			}
+			
+			try {
+				iteration.getSolutionSet().coGroup(iteration.getWorkset()).where(2).equalTo(1).with(new
SolutionWorksetCoGroup2());
+				fail("Accepted invalid program.");
+			}
+			catch (InvalidProgramException e) {
+				// all good!
+			}
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -187,4 +233,20 @@ public class DeltaIterationTranslationTest implements java.io.Serializable
{
 			return value;
 		}
 	}
+	
+	public static class SolutionWorksetCoGroup1 extends CoGroupFunction<Tuple2<Double,
String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
+
+		@Override
+		public void coGroup(Iterator<Tuple2<Double, String>> first, Iterator<Tuple3<Double,
Long, String>> second,
+				Collector<Tuple3<Double, Long, String>> out) {
+		}
+	}
+	
+	public static class SolutionWorksetCoGroup2 extends CoGroupFunction<Tuple3<Double,
Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
+
+		@Override
+		public void coGroup(Iterator<Tuple3<Double, Long, String>> second, Iterator<Tuple2<Double,
String>> first,
+				Collector<Tuple3<Double, Long, String>> out) {
+		}
+	}
 }


Mime
View raw message