[FLINK-1143] Allow delta iterations that do not join with the solution set
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9310703d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9310703d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9310703d
Branch: refs/heads/release-0.7
Commit: 9310703d3cfae47965b5275da0e665a745fd9eba
Parents: 1abadb1
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Oct 13 21:21:54 2014 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Fri Oct 17 16:48:55 2014 +0200
----------------------------------------------------------------------
.../org/apache/flink/compiler/PactCompiler.java | 10 ++-
.../flink/compiler/dag/SolutionSetNode.java | 6 +-
.../apache/flink/compiler/dag/TwoInputNode.java | 3 +-
.../WorksetIterationCornerCasesTest.java | 77 ++++++++++++++++++
.../WorksetIterationsJavaApiCompilerTest.java | 4 +-
...terationNotDependingOnSolutionSetITCase.java | 84 ++++++++++++++++++++
6 files changed, 171 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9310703d/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index 6492c5c..177622b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -855,18 +855,20 @@ public class PactCompiler {
// and the solution set. If it does depend on both, this descend should create both nodes
iter.getSolutionSetDelta().accept(recursiveCreator);
- final SolutionSetNode solutionSetNode = (SolutionSetNode) recursiveCreator.con2node.get(iter.getSolutionSet());
final WorksetNode worksetNode = (WorksetNode) recursiveCreator.con2node.get(iter.getWorkset());
if (worksetNode == null) {
- throw new CompilerException("In the given plan, the solution set delta does not depend
on the workset. This is a prerequisite in workset iterations.");
+ throw new CompilerException("In the given plan, the solution set delta does not depend
on the workset. This is a prerequisite in delta iterations.");
}
iter.getNextWorkset().accept(recursiveCreator);
+ SolutionSetNode solutionSetNode = (SolutionSetNode) recursiveCreator.con2node.get(iter.getSolutionSet());
+
if (solutionSetNode == null || solutionSetNode.getOutgoingConnections() == null || solutionSetNode.getOutgoingConnections().isEmpty())
{
- throw new CompilerException("Error: The step function does not reference the solution
set.");
- } else {
+ solutionSetNode = new SolutionSetNode((SolutionSetPlaceHolder<?>) iter.getSolutionSet(),
iterNode);
+ }
+ else {
for (PactConnection conn : solutionSetNode.getOutgoingConnections()) {
OptimizerNode successor = conn.getTarget();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9310703d/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SolutionSetNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SolutionSetNode.java
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SolutionSetNode.java
index 523525d..ab95f21 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SolutionSetNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SolutionSetNode.java
@@ -46,11 +46,7 @@ public class SolutionSetNode extends AbstractPartialSolutionNode {
// --------------------------------------------------------------------------------------------
public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel
initialInput) {
- if (this.cachedPlans != null) {
- throw new IllegalStateException();
- } else {
- this.cachedPlans = Collections.<PlanNode>singletonList(new SolutionSetPlanNode(this,
"SolutionSet("+this.getPactContract().getName()+")", gProps, lProps, initialInput));
- }
+ this.cachedPlans = Collections.<PlanNode>singletonList(new SolutionSetPlanNode(this,
"SolutionSet("+this.getPactContract().getName()+")", gProps, lProps, initialInput));
}
public SolutionSetPlanNode getCurrentSolutionSetPlanNode() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9310703d/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
index 239ca08..ce9214e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
@@ -492,8 +492,7 @@ public abstract class TwoInputNode extends OptimizerNode {
instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, rgps1,
rgps2, ilp1, ilp2);
break;
} else {
- // meet, but not co-compatible
-// throw new CompilerException("Implements to adjust one side to the other!");
+ // cannot use this pair, fall through the loop and try the next one
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9310703d/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationCornerCasesTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationCornerCasesTest.java
b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationCornerCasesTest.java
new file mode 100644
index 0000000..dfff3e1
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationCornerCasesTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
+import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class WorksetIterationCornerCasesTest extends CompilerTestBase {
+
+ @Test
+ public void testWorksetIterationNotDependingOnSolutionSet() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 100).map(new Duplicator<Long>());
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input,
100, 1);
+
+ DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new TestMapper<Tuple2<Long,Long>>());
+ iteration.closeWith(iterEnd, iterEnd).print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+ assertTrue(wipn.getSolutionSetPlanNode().getOutgoingChannels().isEmpty());
+
+ NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+ jgg.compileJobGraph(op);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static final class Duplicator<T> implements MapFunction<T, Tuple2<T,
T>> {
+ @Override
+ public Tuple2<T, T> map(T value) {
+ return new Tuple2<T, T>(value, value);
+ }
+ }
+
+ private static final class TestMapper<T> implements MapFunction<T, T> {
+ @Override
+ public T map(T value) {
+ return value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9310703d/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
index ef756d0..048f712 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
@@ -102,7 +102,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase
{
}
@Test
- public void testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
+ public void testJavaApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
try {
Plan plan = getJavaTestPlan(false, false);
@@ -146,7 +146,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase
{
}
@Test
- public void testRecordApiWithDirectSoltionSetUpdate() {
+ public void testJavaApiWithDirectSoltionSetUpdate() {
try {
Plan plan = getJavaTestPlan(true, false);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9310703d/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
new file mode 100644
index 0000000..1701628
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.iterative;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class DeltaIterationNotDependingOnSolutionSetITCase {
+
+ @Test
+ public void testDeltaIterationNotDependingOnSolutionSet() {
+ try {
+ final List<Tuple2<Long, Long>> result = new ArrayList<Tuple2<Long,Long>>();
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(1);
+
+ DataSet<Tuple2<Long, Long>> input = env.generateSequence(0, 9).map(new Duplicator<Long>());
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input,
5, 1);
+
+ iteration.closeWith(iteration.getWorkset(), iteration.getWorkset().map(new TestMapper()))
+ .output(new LocalCollectionOutputFormat<Tuple2<Long,Long>>(result));
+
+ env.execute();
+
+ boolean[] present = new boolean[50];
+ for (Tuple2<Long, Long> t : result) {
+ present[t.f0.intValue()] = true;
+ }
+
+ for (int i = 0; i < present.length; i++) {
+ assertTrue(String.format("Missing tuple (%d, %d)", i, i), present[i]);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+
+ private static final class Duplicator<T> implements MapFunction<T, Tuple2<T,
T>> {
+ @Override
+ public Tuple2<T, T> map(T value) {
+ return new Tuple2<T, T>(value, value);
+ }
+ }
+
+ private static final class TestMapper extends RichMapFunction<Tuple2<Long, Long>,
Tuple2<Long, Long>> {
+ @Override
+ public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
+ return new Tuple2<Long, Long>(value.f0+10, value.f1+10);
+ }
+ }
+}
|