flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/4] flink git commit: [FLINK-1951] Fix NullPointerException in delta iteration due to missing temp
Date Tue, 05 May 2015 21:03:40 GMT
Repository: flink
Updated Branches:
  refs/heads/master 60ec68308 -> bd96ba8d1


[FLINK-1951] Fix NullPointerException in delta iteration due to missing temp

This closes #641


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/adb321d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/adb321d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/adb321d6

Branch: refs/heads/master
Commit: adb321d61cc783b3a2a78f4e707104d75e1d63c0
Parents: 60ec683
Author: Fabian Hueske <fhueske@apache.org>
Authored: Thu Apr 30 17:34:02 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue May 5 22:56:36 2015 +0200

----------------------------------------------------------------------
 .../plantranslate/JobGraphGenerator.java        |  5 +-
 .../plantranslate/TempInIterationsTest.java     | 81 ++++++++++++++++++++
 2 files changed, 84 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/adb321d6/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index dc21c13..2630019 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -1163,8 +1163,9 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			final TempMode tm = channel.getTempMode();
 
 			boolean needsMemory = false;
-			// Don't add a pipeline breaker if the data exchange is already blocking.
-			if (tm.breaksPipeline() && channel.getDataExchangeMode() != DataExchangeMode.BATCH)
{
+			// Don't add a pipeline breaker if the data exchange is already blocking, EXCEPT the channel
is within an iteration.
+			if (tm.breaksPipeline() &&
+					(channel.isOnDynamicPath() || channel.getDataExchangeMode() != DataExchangeMode.BATCH)
) {
 				config.setInputAsynchronouslyMaterialized(inputNum, true);
 				needsMemory = true;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/adb321d6/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
new file mode 100644
index 0000000..15cb03f
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plantranslate;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TempInIterationsTest {
+
+	/*
+	 * Tests whether temps barriers are correctly set in within iterations
+	 */
+	@Test
+	public void testTempInIterationTest() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Long, Long>> input = env.readCsvFile("file:///does/not/exist").types(Long.class,
Long.class);
+
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
+				input.iterateDelta(input, 1, 0);
+
+		DataSet<Tuple2<Long, Long>> update = iteration.getWorkset()
+				.join(iteration.getSolutionSet()).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+		iteration.closeWith(update, update)
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = (new Optimizer(new Configuration())).compile(plan);
+
+		JobGraphGenerator jgg = new JobGraphGenerator();
+		JobGraph jg = jgg.compileJobGraph(oPlan);
+
+		boolean solutionSetUpdateChecked = false;
+		for(AbstractJobVertex v : jg.getVertices()) {
+			if(v.getName().equals("SolutionSet Delta")) {
+
+				// check if input of solution set delta is temped
+				TaskConfig tc = new TaskConfig(v.getConfiguration());
+				assertTrue(tc.isInputAsynchronouslyMaterialized(0));
+				solutionSetUpdateChecked = true;
+			}
+		}
+		assertTrue(solutionSetUpdateChecked);
+
+	}
+
+}


Mime
View raw message