flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Getting executionplan in the local mode inside IDE
Date Fri, 01 Jan 2016 10:09:23 GMT
Hi,

you can only get the execution plan for programs that have a data sink and
haven't been executed. In your code print() defines the data sink, however
it also eagerly executes a program. After execution the program is
"removed" from the execution environment. Therefore, Flink complains that
no sink has been defined.

You can print the execution plan if you use a data sink that does not
eagerly execute. For example, you can replace the sum.print() statement
with sum.output(new DiscardingOutputFormat()) or
sum.writeAsText("file://some/path").

Cheers, Fabian



2016-01-01 10:21 GMT+01:00 madhu phatak <phatak.dev@gmail.com>:

> Hi,
> I am trying to get execution plan for wordcount using below code in local
> mode inside IntelliJ IDEA. I am using flink 0.10.0.
>
> val env = ExecutionEnvironment.getExecutionEnvironment
>
> val data = List("hi","how are you","hi")
>
> val dataSet = env.fromCollection(data)
>
> val words = dataSet.flatMap(value => value.split("\\s+"))
>
> val mappedWords = words.map(value => (value,1))
>
> val grouped = mappedWords.groupBy(0)
>
> val sum = grouped.sum(1)
>
> sum.print()
>
> println(env.getExecutionPlan())
>
>
> The program computes sum correctly, but fails with following exception for
> last line
>
> Exception in thread "main" java.lang.RuntimeException: No new data sinks
> have been defined since the last execution. The last execution refers to
> the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
> at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:925)
> at
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:95)
> at
> org.apache.flink.api.scala.ExecutionEnvironment.getExecutionPlan(ExecutionEnvironment.scala:635)
> at com.madhukaraphatak.flink.WordCount$.main(WordCount.scala:30)
>
>
> I tried placing the getExecutionPlan in different places. But I get same
> error. Is there any other way to get the execution plan in local mode?
>
> --
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>

Mime
View raw message