flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyf...@apache.org>
Subject Re: [DISCUSS] Side Outputs and Split/Select
Date Thu, 23 Feb 2017 15:08:50 GMT

Thanks for the nice proposal, I like the idea of side outputs, and it would
make a lot of topologies much simpler.

Regarding the API I think we should come up with a way of making side
otuputs accessible from all sort of operators in a similar way. For
instance through the RichFunction interface with a special collector that
we invalidate when the user should not be collecting to it. (just a quick

I personally wouldn't deprecate the "universal" Split/Select API that can
be used on any  DataStream in favor of functionality that is only
accessible trhough the process function/ or a few select operators. I think
the Split/Select pattern is also very nice and I use it in many different
contexts to get efficient multiway filtering (after map/co operators for


Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2017. febr. 23.,
Cs, 15:42):

> Hi Folks,
> Chen and I have been working for a while now on making FLIP-13 (side
> outputs) [1] a reality. We think we have a pretty good internal
> implementation and also a proposal for an API but now we need to discuss
> how we want to go forward with this, especially how we should deal with
> split/select which does some of the same things side outputs can do. I'll
> first quickly describe what the split/select API looks like, so that we're
> all on the same page. Then I'll present the new proposed side output API
> and then I'll present new API for getting dropped late data from a windowed
> operation, which was the original motivation for adding side outputs.
> Split/select consists of two API calls: DataStream.split(OutputSelector)
> and SplitStream.select(). You can use it like this:
> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> final String EVEN_SELECTOR = "even";
> final String ODD_SELECTOR = "odd";
> SplitStream<Integer> split = input.split(
>         new OutputSelector<Integer>() {
>             @Override
>             public Iterable<String> select(Integer value) {
>                 if (value % 2 == 0) {
>                     return Collections.singleton(EVEN_SELECTOR);
>                 } else {
>                     return Collections.singleton(ODD_SELECTOR);
>                 }
>             }
>         });
> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
> DataStream<Integer> oddStream = split.select(ODD_SELECTOR);
> The stream is split according to an OutputSelector that returns an Iterable
> of Strings. Then you can use select() to get a new stream that only
> contains elements with the given selector. Notice how the element type for
> all the split streams is the same.
> The new side output API proposal adds a new type OutputTag<T> and relies on
> extending ProcessFunction to allow emitting data to outputs besides the
> main output. I think it's best explained with an example as well:
> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> final OutputTag<String> sideOutput1 = new OutputTag<>("side-output-1"){};
> final OutputTag<Integer> sideOutput2 = new OutputTag<>("side-output-2"){};
> SingleOutputStreamOperator<String> mainOutputStream = input
>         .process(new ProcessFunction<Integer, String>() {
>             @Override
>             public void processElement(
>                     Integer value,
>                     Context ctx,
>                     Collector<String> out) throws Exception {
>                 ctx.output(sideOutput1, "WE GOT: " + value);
>                 ctx.output(sideOutput2, value);
>                 out.collect("MAIN OUTPUT: " + value);
>             }
>         });
> DataStream<String> sideOutputStream1 =
> mainOutputStream.getSideOutput(sideOutput1);
> DataStream<Integer> sideOutputStream2 =
> mainOutputStream.getSideOutput(sideOutput2);
> Notice how the OutputTags are anonymous inner classes, similar to TypeHint.
> We need this to be able to analyse the type of the side-output streams.
> Also notice, how the types of the side-output streams can be independent of
> the main-output stream, also notice how everything is correctly type
> checked by the Java Compiler.
> This change requires making ProcessFunction an abstract base class so that
> not every user has to implement the onTimer() method. We would also need to
> allow ProcessFunction on a non-keyed stream.
> Chen also implemented an API based on FlatMapFunction that looks like the
> one proposed in the FLIP. This relies on CollectorWrapper, which can be
> used to "pimp" a Collector to also allow emitting to side outputs.
> For WindowedStream we have two proposals: make OutputTag visible on the
> WindowedStream API or make the result type of WindowedStream operations
> more specific to allow a getDroppedDataSideOutput() method. For the first
> proposal it would look like this:
> final OutputTag<String> lateDataTag = new OutputTag<>("side-output-1"){};
> DataStream<T> windowedResult = input
>   .keyBy(...)
>   .window(...)
>   .sideOutputLateData(lateDataTag)
>   .apply(...)
> DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag);
> For the second proposal it would look like this:
> WindowedOperator<T> windowedResult = input
>   .keyBy(...)
>   .window(...)
>   .apply(...)
> DataStream<IN> lateData = windowedResult.getSideOutput();
> Right now, the result of window operations is a
> SingleOutputStreamOperator<T>, same as it is for all DataStream operations.
> Making the result type more specific, i.e. a WindowedOperator, would allow
> us to add extra methods there. This would require wrapping a
> SingleOutputStreamOperator and forwarding all the method calls to the
> wrapped operator which can be a bit of a hassle for future changes. The
> first proposal requires additional boilerplate code.
> Sorry for the long mail but I think it's necessary to get everyone on the
> same page. The question is now: how should we proceed with the proposed API
> and the old split/select API? I propose to deprecate split/select and only
> have side outputs, going forward. Of course, I'm a bit biased on this. ;-)
> If we decide to do this, we also need to decide on what the side output API
> should look like.
> Happy discussing! Feedback very welcome. :-)
> Best,
> Aljoscha
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-13+Side+Outputs+in+Flink

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message