flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] incubator-flink git commit: [FLINK-1209] [compiler] Improve error messages when forgetting to close an iteration
Date Tue, 18 Nov 2014 16:28:30 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 6989cec4c -> 9f6a0b8fa


[FLINK-1209] [compiler] Improve error messages when forgetting to close an iteration


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b264221b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b264221b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b264221b

Branch: refs/heads/master
Commit: b264221b32ef2b1a4abe3d40c3174fccea21974f
Parents: 6989cec
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Nov 18 14:08:33 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Nov 18 17:26:04 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/compiler/PactCompiler.java |   2 +-
 .../flink/compiler/java/OpenIterationTest.java  | 183 +++++++++++++++++++
 .../api/java/operators/IterativeDataSet.java    |   4 +-
 .../api/java/operators/OperatorTranslation.java |   5 +
 4 files changed, 192 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b264221b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index d1d6343..bec264d 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -905,7 +905,7 @@ public class PactCompiler {
 							}
 						}
 						else {
-							throw new CompilerException("Error: The only operations allowed on the solution set
are Join and CoGroup.");
+							throw new InvalidProgramException("Error: The only operations allowed on the solution
set are Join and CoGroup.");
 						}
 					}
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b264221b/flink-compiler/src/test/java/org/apache/flink/compiler/java/OpenIterationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/java/OpenIterationTest.java
b/flink-compiler/src/test/java/org/apache/flink/compiler/java/OpenIterationTest.java
new file mode 100644
index 0000000..b142a23
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/java/OpenIterationTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.java;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.testfunctions.IdentityMapper;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class OpenIterationTest extends CompilerTestBase {
+
+	@Test
+	public void testSinkInOpenBulkIteration() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> input = env.generateSequence(1, 10);
+			
+			IterativeDataSet<Long> iteration = input.iterate(10);
+			
+			DataSet<Long> mapped = iteration.map(new IdentityMapper<Long>());
+			
+			mapped.print();
+			
+			try {
+				env.createProgramPlan();
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSinkInClosedBulkIteration() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> input = env.generateSequence(1, 10);
+			
+			IterativeDataSet<Long> iteration = input.iterate(10);
+			
+			DataSet<Long> mapped = iteration.map(new IdentityMapper<Long>());
+			
+			iteration.closeWith(mapped).print();
+			
+			mapped.print();
+			
+			Plan p = env.createProgramPlan();
+			
+			try {
+				compileNoStats(p);
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSinkOnSolutionSetDeltaIteration() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L,0L));
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input,
10, 0);
+			
+			DataSet<Tuple2<Long, Long>> mapped = iteration.getSolutionSet().map(new IdentityMapper<Tuple2<Long,
Long>>());
+			
+			mapped.print();
+			
+			try {
+				env.createProgramPlan();
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSinkOnWorksetDeltaIteration() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L,0L));
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input,
10, 0);
+			
+			DataSet<Tuple2<Long, Long>> mapped = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,
Long>>());
+			
+			mapped.print();
+			
+			try {
+				env.createProgramPlan();
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testOperationOnSolutionSet() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L,0L));
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input,
10, 0);
+			
+			DataSet<Tuple2<Long, Long>> mapped = iteration.getSolutionSet().map(new IdentityMapper<Tuple2<Long,
Long>>());
+			
+			DataSet<Tuple2<Long, Long>> joined = iteration.getWorkset().join(mapped)
+												.where(0).equalTo(0).projectFirst(1).projectSecond(0).types(Long.class, Long.class);
+			
+			iteration.closeWith(joined, joined)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			try {
+				compileNoStats(p);
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b264221b/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
index f243d89..e39d56b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.operators;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
@@ -144,6 +145,7 @@ public class IterativeDataSet<T> extends SingleInputOperator<T,
T, IterativeData
 	@Override
 	protected org.apache.flink.api.common.operators.SingleInputOperator<T, T, ?> translateToDataFlow(Operator<T>
input) {
 		// All the translation magic happens when the iteration end is encountered.
-		throw new RuntimeException("Error while creating the data flow plan for an iteration: The
iteration end was not specified correctly.");
+		throw new InvalidProgramException("A data set that is part of an iteration was used as
a sink or action."
+				+ " Did you forget to close the iteration?");
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b264221b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
index 5a73e36..3570402 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.operators.AbstractUdfOperator;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
@@ -93,6 +94,10 @@ public class OperatorTranslation {
 		else if (dataSet instanceof DeltaIterationResultSet) {
 			dataFlowOp = translateDeltaIteration((DeltaIterationResultSet<?, ?>) dataSet);
 		}
+		else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof
DeltaIteration.WorksetPlaceHolder) {
+			throw new InvalidProgramException("A data set that is part of a delta iteration was used
as a sink or action."
+				+ " Did you forget to close the iteration?");
+		}
 		else {
 			throw new RuntimeException("Error while creating the data flow plan for the program: Unknown
operator or data set type: " + dataSet);
 		}


Mime
View raw message