flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gábor Hermann (JIRA) <j...@apache.org>
Subject [jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
Date Tue, 22 Aug 2017 11:58:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136684#comment-16136684

Gábor Hermann commented on FLINK-3133:

[~aljoscha], [~kenmy] thanks for the feedback!

[~aljoscha] I see the problems with communications between JMs/TMs. I guess it's worth waiting
for QS to mature before tackling the issue of collect() etc., because we could reuse that
code then. I'm also in favor of keeping the DataStream API as it is, and put the code in a
separate place (like DataStreamUtils in contrib).

That said, would it make sense to create a sort of collectToList() method for the local mini
cluster only? For testing that should suffice. We could put it in the DataStreamUtils. For
collecting one DataStream, that's already in place by returning an iterator. I think we could
modify the code to allow collecting multiple DataStreams in a List. So, modifying my previous
example a bit:

StreamExecutionEnvironment env = StreamEnvironment.getStreamExecutionEnvironment(); 
DataStream<Integer> printSink = env.addSource(..).print(); 
DataStream<String> otherSink = env.addSource(..).map(..).filter(..).print();
Future<List<Integer>> printSinkResults = DataStreamUtils.collectToList(printSink)
Future<List<String>> otherSinkResults = DataStreamUtils.collectToList(otherSink)

Or if we don't like to expose Future to the users, we could have something like a {{DataStreamCollector}}:
DataStreamCollector<Integer> printSinkResults = DataStreamUtils.collectToList(printSink)
DataStreamCollector<String> otherSinkResults = DataStreamUtils.collectToList(otherSink)
and after execution we could do:
List<Integer> printSinkList = printSinkResults.getList()

An API like `PAssert` could be built on top of that, or users could use this collectToList()
API directly for testing.

[~kenmy] If we go with this, I wouldn't touch your code, and I could open a separate issue.
Like you said, this issue is blocked by the async execution. And I believe we can work around
async execution by creating a reference for the result before env.execute() with a sort of
"non-complete" reference like a Future or a DataStreamCollector. 

This should not be hard, but I might be missing something. What do you think?

> Introduce collect()/count()/print() methods in DataStream API
> -------------------------------------------------------------
>                 Key: FLINK-3133
>                 URL: https://issues.apache.org/jira/browse/FLINK-3133
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>    Affects Versions: 0.10.0, 1.0.0, 0.10.1
>            Reporter: Maximilian Michels
>            Assignee: Evgeny Kincharov
>             Fix For: 1.0.0
> The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should be mirrored
to the DataStream API. 
> The semantics of the calls are different. We need to be able to sample parts of a stream,
e.g. by supplying a time period in the arguments to the methods. Users should use the {{JobClient}}
to retrieve the results.
> {code:java}
> StreamExecutionEnvironment env = StramEnvironment.getStreamExecutionEnvironment();
> DataStream<DataType> streamData = env.addSource(..).map(..);
> JobClient jobClient = env.executeWithControl();
> Iterable<DataType> sampled = jobClient.sampleStream(streamData, Time.seconds(5));
> {code}

This message was sent by Atlassian JIRA

View raw message