Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AF2E4C8DD for ; Mon, 17 Nov 2014 13:20:35 +0000 (UTC) Received: (qmail 11637 invoked by uid 500); 17 Nov 2014 13:20:34 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 11607 invoked by uid 500); 17 Nov 2014 13:20:34 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 11596 invoked by uid 99); 17 Nov 2014 13:20:34 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Nov 2014 13:20:34 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 17 Nov 2014 13:20:11 +0000 Received: (qmail 6302 invoked by uid 99); 17 Nov 2014 13:18:53 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Nov 2014 13:18:53 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 6729693DE43; Mon, 17 Nov 2014 13:18:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.incubator.apache.org Message-Id: <02c3920aaa424da58ac3c7dfbd1edf44@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-flink git commit: [FLINK-1008] Fix createProgramPlan() throws exception Date: Mon, 17 Nov 2014 13:18:53 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-flink Updated Branches: refs/heads/master e23874cd8 -> 42828f245 [FLINK-1008] Fix createProgramPlan() throws exception Problem was, that ExecutionEnvironment#getExecutionPlan clears the data sinks, i.e. a following ExecutionEnvironment#execute will throw an error because there are no data sinks. This introduces a new flag for ExecutionEnvironment#createProgramPlan to indicate, that the the sinks shall not be cleared. This does not break any existing code. This closes #184 Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/42828f24 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/42828f24 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/42828f24 Branch: refs/heads/master Commit: 42828f24596ec2432fbd506ce08ac18c23fd5350 Parents: e23874c Author: Stefan Bunk Authored: Wed Nov 5 17:21:24 2014 +0100 Committer: Stephan Ewen Committed: Mon Nov 17 13:27:38 2014 +0100 ---------------------------------------------------------------------- .../org/apache/flink/client/program/Client.java | 2 +- .../ExecutionPlanAfterExecutionTest.java | 49 ++++++++++++++++++++ .../flink/api/java/ExecutionEnvironment.java | 22 ++++++++- .../apache/flink/api/java/LocalEnvironment.java | 2 +- .../flink/api/java/RemoteEnvironment.java | 2 +- 5 files changed, 73 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/42828f24/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index 70b5f9b..6243c96 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -352,7 +352,7 @@ public class Client { @Override public String getExecutionPlan() throws Exception { - Plan plan = createProgramPlan(); + Plan plan = createProgramPlan(null, false); this.optimizerPlan = compiler.compile(plan); // do not go on with anything now! http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/42828f24/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java new file mode 100644 index 0000000..95b7de3 --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOuputFormat; +import org.junit.Test; + +import static org.junit.Assert.*; + +@SuppressWarnings("serial") +public class ExecutionPlanAfterExecutionTest implements java.io.Serializable { + + @Test + public void testExecuteAfterGetExecutionPlan() { + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + DataSet baseSet = env.fromElements(1, 2); + + DataSet result = baseSet.map(new MapFunction() { + @Override public Integer map(Integer value) throws Exception { return value * 2; } + }); + result.output(new DiscardingOuputFormat()); + + try { + env.getExecutionPlan(); + env.execute(); + } catch (Exception e) { + fail("Cannot run both #getExecutionPlan and #execute."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/42828f24/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 541a89b..f549a93 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -595,6 +595,7 @@ public abstract class ExecutionEnvironment { /** * Creates the plan with which the system will execute the program, and returns it as * a String using a JSON representation of the execution data flow graph. + * Note that this needs to be called, before the plan is executed. * * @return The execution plan of the program, as a JSON String. * @throws Exception Thrown, if the compiler could not be instantiated, or the master could not @@ -658,6 +659,7 @@ public abstract class ExecutionEnvironment { * {@link org.apache.flink.api.common.PlanExecutor}. Obtaining a plan and starting it with an * executor is an alternative way to run a program and is only possible if the program consists * only of distributed operations. + * This automatically starts a new stage of execution. * * @return The program's plan. */ @@ -671,11 +673,27 @@ public abstract class ExecutionEnvironment { * {@link org.apache.flink.api.common.PlanExecutor}. Obtaining a plan and starting it with an * executor is an alternative way to run a program and is only possible if the program consists * only of distributed operations. + * This automatically starts a new stage of execution. * * @param jobName The name attached to the plan (displayed in logs and monitoring). * @return The program's plan. */ public JavaPlan createProgramPlan(String jobName) { + return createProgramPlan(jobName, true); + } + + /** + * Creates the program's {@link Plan}. The plan is a description of all data sources, data sinks, + * and operations and how they interact, as an isolated unit that can be executed with a + * {@link org.apache.flink.api.common.PlanExecutor}. Obtaining a plan and starting it with an + * executor is an alternative way to run a program and is only possible if the program consists + * only of distributed operations. + * + * @param jobName The name attached to the plan (displayed in logs and monitoring). + * @param clearSinks Whether or not to start a new stage of execution. + * @return The program's plan. + */ + public JavaPlan createProgramPlan(String jobName, boolean clearSinks) { if (this.sinks.isEmpty()) { throw new RuntimeException("No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it."); } @@ -699,7 +717,9 @@ public abstract class ExecutionEnvironment { } // clear all the sinks such that the next execution does not redo everything - this.sinks.clear(); + if (clearSinks) { + this.sinks.clear(); + } return plan; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/42828f24/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java index 0deca8b..e7daf11 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java @@ -53,7 +53,7 @@ public class LocalEnvironment extends ExecutionEnvironment { @Override public String getExecutionPlan() throws Exception { - Plan p = createProgramPlan(); + Plan p = createProgramPlan(null, false); PlanExecutor executor = PlanExecutor.createLocalExecutor(); return executor.getOptimizerPlanAsJSON(p); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/42828f24/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java index 24fdc82..c0695e5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java @@ -71,7 +71,7 @@ public class RemoteEnvironment extends ExecutionEnvironment { @Override public String getExecutionPlan() throws Exception { - Plan p = createProgramPlan("unnamed"); + Plan p = createProgramPlan("unnamed", false); p.setDefaultParallelism(getDegreeOfParallelism()); registerCachedFilesWithPlan(p);