flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: SideOutput doesn't receive anything if filter is applied after the process function
Date Mon, 15 Jan 2018 10:43:37 GMT
It would mean that getSideOutput() would return a 
SingleOutputWithSideOutputOperator which extends SingleOutputOperator 
offering getSideOutput(). Other transformations would still return a 
SingleOutputOperator.

With this the following code wouldn't compile.

stream
     .process(...)
     .filter(...)
     .getSideOutput(...) // compile error

You would have to explicitly define the code as below, which makes the 
behavior unambiguous:

processed = stream
     .process(...)

filtered = processed
     .filter(...)

filteredSideOutput = processed
     .getSideOutput(...)
     .filter(...)

On 15.01.2018 09:55, Juho Autio wrote:
> > sideoutput might deserve a seperate class which inherit form 
> singleoutput. It might prevent lot of confusions
>
> Thanks, but how could that be done? Do you mean that if one calls 
> .process(), then the stream would change to another class which would 
> only allow calls like .getMainOutput() or .getSideOutput("name")? Of 
> course compile time error would be even better than a runtime error, 
> but I don't see yet how it could be done in practice.
>
> On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qinnchen@gmail.com 
> <mailto:qinnchen@gmail.com>> wrote:
>
>     Hi Juho,
>
>     I think sideoutput might deserve a seperate class which inherit
>     form singleoutput. It might prevent lot of confusions. A more
>     generic question is whether datastream api can be mulitple ins and
>     mulitple outs natively. It's more like scheduling problem when you
>     come from single process system to multiple process system, which
>     one should get resource and how much sharing same hardware
>     resources, I guess it will open gate to lots of edge cases ->
>     strategies-> more edge cases :)
>
>     Chen
>
>     On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio <juho.autio@rovio.com
>     <mailto:juho.autio@rovio.com>> wrote:
>
>         Maybe I could express it in a slightly different way: if
>         adding the .filter() after .process() causes the side output
>         to be somehow totally "lost", then I believe the
>         .getSideOutput() could be aware that there is not such side
>         output to be listened to from upstream, and throw an
>         exception. I mean, this should be possible when building the
>         DAG, it shouldn't require starting the stream to detect? Thanks..
>
>         On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai
>         <tzulitai@apache.org <mailto:tzulitai@apache.org>> wrote:
>
>             Hi Juho,
>
>>             Now that I think of it this seems like a bug to me: why
>>             does the call to getSideOutput succeed if it doesn't
>>             provide _any_ input?
>
>             With the way side outputs work, I don’t think this is
>             possible (or would make sense). An operator does not know
>             whether or not it would ever emit some element with a
>             given tag.
>             As far as I understand it, calling `getSideOutput`
>             essentially adds a virtual node to the result stream graph
>             that listens to the specified tag from the upstream input.
>
>             While I’m not aware whether or not your observation is
>             expected behavior, from an API perspective, I can see why
>             it is a bit confusing for you.
>             Aljoscha would be the expert here, maybe he’ll have more
>             insights. I’ve looped him in cc’ed.
>
>             Cheers,
>             Gordon
>
>
>             On 12 January 2018 at 4:05:13 PM, Juho Autio
>             (juho.autio@rovio.com <mailto:juho.autio@rovio.com>) wrote:
>
>>             When I run the code below (Flink 1.4.0 or 1.3.1), only
>>             "a" is printed. If I switch the position of .process() &
>>             .filter() (ie. filter first, then process), both "a" &
>>             "b" are printed, as expected.
>>
>>             I guess it's a bit hard to say what the side output
>>             should include in this case: the stream before filtering
>>             or after it?
>>
>>             What I would suggest is Flink to protect against this
>>             kind of a user error that is hard to debug. Would it be
>>             possible that Flink throws an exception if one tries to
>>             call .getSideOutput() on anything that doesn't actually
>>             provide that side output? Now that I think of it this
>>             seems like a bug to me: why does the call to
>>             getSideOutput succeed if it doesn't provide _any_ input?
>>             I would expect it to get the side output data stream
>>             _before_ applying .filter().
>>
>>             import org.apache.flink.api.common.functions.FilterFunction;
>>             import
>>             org.apache.flink.streaming.api.datastream.DataStreamSource;
>>             import
>>             org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>             import
>>             org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>             import
>>             org.apache.flink.streaming.api.functions.ProcessFunction;
>>             import org.apache.flink.util.Collector;
>>             import org.apache.flink.util.OutputTag;
>>
>>             public class SideOutputProblem {
>>
>>             public static void main(String[] args) throws Exception {
>>
>>             StreamExecutionEnvironment env =
>>             StreamExecutionEnvironment.getExecutionEnvironment();
>>             DataStreamSource<String> stream = env.fromElements("a", "b");
>>             OutputTag<String> sideOutputTag = new
>>             OutputTag<String>("side-output"){};
>>
>>             SingleOutputStreamOperator<String> processed = stream
>>
>>             .process(new ProcessFunction<String, String>() {
>>               @Override
>>               public void processElement(String s, Context context,
>>             Collector<String> collector) throws Exception {
>>                   if ("a".equals(s)) {
>>             collector.collect(s);
>>                   } else {
>>             context.output(sideOutputTag, s);
>>                   }
>>               }
>>             })
>>
>>             .filter(new FilterFunction<String>() {
>>               @Override
>>               public boolean filter(String s) throws Exception {
>>                   return true;
>>               }
>>             });
>>
>>             processed.getSideOutput(sideOutputTag).printToErr();
>>
>>             processed.print();
>>
>>             env.execute();
>>                 }
>>
>>             }
>>
>>             Cheers,
>>             Juho
>
>
>
>


Mime
View raw message