flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject incubator-flink git commit: [FLINK-1008] Fix createProgramPlan() throws exception
Date Mon, 17 Nov 2014 13:18:53 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master e23874cd8 -> 42828f245


[FLINK-1008] Fix createProgramPlan() throws exception

Problem was, that ExecutionEnvironment#getExecutionPlan clears the data
sinks, i.e. a following ExecutionEnvironment#execute will throw an error
because there are no data sinks.

This introduces a new flag for ExecutionEnvironment#createProgramPlan to
indicate, that the the sinks shall not be cleared.
This does not break any existing code.

This closes #184


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/42828f24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/42828f24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/42828f24

Branch: refs/heads/master
Commit: 42828f24596ec2432fbd506ce08ac18c23fd5350
Parents: e23874c
Author: Stefan Bunk <stefan.bunk@googlemail.com>
Authored: Wed Nov 5 17:21:24 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Nov 17 13:27:38 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/program/Client.java |  2 +-
 .../ExecutionPlanAfterExecutionTest.java        | 49 ++++++++++++++++++++
 .../flink/api/java/ExecutionEnvironment.java    | 22 ++++++++-
 .../apache/flink/api/java/LocalEnvironment.java |  2 +-
 .../flink/api/java/RemoteEnvironment.java       |  2 +-
 5 files changed, 73 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/42828f24/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 70b5f9b..6243c96 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -352,7 +352,7 @@ public class Client {
 
 		@Override
 		public String getExecutionPlan() throws Exception {
-			Plan plan = createProgramPlan();
+			Plan plan = createProgramPlan(null, false);
 			this.optimizerPlan = compiler.compile(plan);
 			
 			// do not go on with anything now!

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/42828f24/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
new file mode 100644
index 0000000..95b7de3
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class ExecutionPlanAfterExecutionTest implements java.io.Serializable {
+
+	@Test
+	public void testExecuteAfterGetExecutionPlan() {
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		DataSet<Integer> baseSet = env.fromElements(1, 2);
+
+		DataSet<Integer> result = baseSet.map(new MapFunction<Integer, Integer>() {
+			@Override public Integer map(Integer value) throws Exception { return value * 2; }
+		});
+		result.output(new DiscardingOuputFormat<Integer>());
+
+		try {
+			env.getExecutionPlan();
+			env.execute();
+		} catch (Exception e) {
+			fail("Cannot run both #getExecutionPlan and #execute.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/42828f24/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 541a89b..f549a93 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -595,6 +595,7 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Creates the plan with which the system will execute the program, and returns it as 
 	 * a String using a JSON representation of the execution data flow graph.
+	 * Note that this needs to be called, before the plan is executed.
 	 * 
 	 * @return The execution plan of the program, as a JSON String.
 	 * @throws Exception Thrown, if the compiler could not be instantiated, or the master could
not
@@ -658,6 +659,7 @@ public abstract class ExecutionEnvironment {
 	 * {@link org.apache.flink.api.common.PlanExecutor}. Obtaining a plan and starting it with
an
 	 * executor is an alternative way to run a program and is only possible if the program consists
 	 * only of distributed operations.
+	 * This automatically starts a new stage of execution.
 	 * 
 	 * @return The program's plan.
 	 */
@@ -671,11 +673,27 @@ public abstract class ExecutionEnvironment {
 	 * {@link org.apache.flink.api.common.PlanExecutor}. Obtaining a plan and starting it with
an
 	 * executor is an alternative way to run a program and is only possible if the program consists
 	 * only of distributed operations.
+	 * This automatically starts a new stage of execution.
 	 * 
 	 * @param jobName The name attached to the plan (displayed in logs and monitoring).
 	 * @return The program's plan.
 	 */
 	public JavaPlan createProgramPlan(String jobName) {
+		return createProgramPlan(jobName, true);
+	}
+
+	/**
+	 * Creates the program's {@link Plan}. The plan is a description of all data sources, data
sinks,
+	 * and operations and how they interact, as an isolated unit that can be executed with a
+	 * {@link org.apache.flink.api.common.PlanExecutor}. Obtaining a plan and starting it with
an
+	 * executor is an alternative way to run a program and is only possible if the program consists
+	 * only of distributed operations.
+	 *
+	 * @param jobName The name attached to the plan (displayed in logs and monitoring).
+	 * @param clearSinks Whether or not to start a new stage of execution.
+	 * @return The program's plan.
+	 */
+	public JavaPlan createProgramPlan(String jobName, boolean clearSinks) {
 		if (this.sinks.isEmpty()) {
 			throw new RuntimeException("No data sinks have been created yet. A program needs at least
one sink that consumes data. Examples are writing the data set or printing it.");
 		}
@@ -699,7 +717,9 @@ public abstract class ExecutionEnvironment {
 		}
 		
 		// clear all the sinks such that the next execution does not redo everything
-		this.sinks.clear();
+		if (clearSinks) {
+			this.sinks.clear();
+		}
 		
 		return plan;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/42828f24/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index 0deca8b..e7daf11 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -53,7 +53,7 @@ public class LocalEnvironment extends ExecutionEnvironment {
 	
 	@Override
 	public String getExecutionPlan() throws Exception {
-		Plan p = createProgramPlan();
+		Plan p = createProgramPlan(null, false);
 		
 		PlanExecutor executor = PlanExecutor.createLocalExecutor();
 		return executor.getOptimizerPlanAsJSON(p);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/42828f24/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 24fdc82..c0695e5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -71,7 +71,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 	
 	@Override
 	public String getExecutionPlan() throws Exception {
-		Plan p = createProgramPlan("unnamed");
+		Plan p = createProgramPlan("unnamed", false);
 		p.setDefaultParallelism(getDegreeOfParallelism());
 		registerCachedFilesWithPlan(p);
 		


Mime
View raw message