flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Michels <...@apache.org>
Subject Re: Should collect() and count() be treated as data sinks?
Date Thu, 02 Apr 2015 16:21:20 GMT
Hi Felix,

count() defines a sink through the DiscardingOutputFormat. The error you're
seeing is because the execution of the plan is already triggered within the
count() method. When you call env.execute() again, the plan has been
already cleared from the ExecutionEnvironment and it fails to execute.

We should probably make it a bit more obvious how count and collect behave.
At least, we should improve the docs with a big warning.

In the future, when we can resume executions, we might delay the execution
in count/collect. On execute, we then execute once to get the accumulator
result for count/collect, and a second time to resume and execute the rest
of the plan. That is, if the current implementation remains the same...


On Thu, Apr 2, 2015 at 5:33 PM, Felix Neutatz <neutatz@googlemail.com>

> Hi,
> I have run the following program:
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> List l = Arrays.asList(new Tuple1<Long>(1L));
> TypeInformation t = TypeInfoParser.parse("Tuple1<Long>");
> DataSet<Tuple1<Long>> data = env.fromCollection(l, t);
> long value = data.count();
> System.out.println(value);
> env.execute("example");
> Since there is no "real" data sink, I get the following:
> Exception in thread "main" java.lang.RuntimeException: No data sinks have
> been created yet. A program needs at least one sink that consumes data.
> Examples are writing the data set or printing it.
> In my opinion, we should handle count() and collect() like print().
> What do you think?
> Best regards,
> Felix

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message