flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: An idea for a parallel AllWindowedStream
Date Wed, 09 Nov 2016 14:45:55 GMT
Hi,
yes, this works well in cases and I was also thinking about adding
something like this to Flink.

There can be problems if you use a trigger other than EventTimeTrigger that
possibly fires multiple times or if you specify an allowed lateness. In
those cases, you would overcount elements in the all-window.

Cheers,
Aljoscha

On Wed, 9 Nov 2016 at 06:35 Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:

> Hi,
>
> As a self training exercise I've defined a class extending WindowedStream
> for implementing a proof of concept for a parallel version of
> AllWindowStream
>
> /**
>  * Tries to create a parallel version of a AllWindowStream for a DataStream
>  * by creating a KeyedStream by using as key the hash of the elements module
>  * a parallelism level
>  *
>  * This only makes sense for window assigners that ensure the subwindows will be
>  * in sync, like time based window assigners, and it is more stable with ingestion
>  * and event time because the window alignment is more reliable.
>  * This doesn't work for counting or sessions window assigners.
>  *
>  * Also note elements from different partitions might get out of order due
>  * to parallelism
>  * */
> public static class ParAllWindowedStream<T,W extends Window> extends WindowedStream<T,
Integer, W> {
>     private final transient WindowAssigner<Object,W> windowAssigner;
>
>     public ParAllWindowedStream(DataStream<T> stream, final int parallelism,
>                                 WindowAssigner<Object,W> windowAssigner) {
>         super(stream.keyBy(new KeySelector<T, Integer>() {
>                            @Override
>                             public Integer getKey(T value) throws Exception {
>                                 return value.hashCode() % parallelism;
>                             }
>                         }),
>               windowAssigner);
>         this.windowAssigner = windowAssigner;
>     }
>
>     @Override
>     public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFun)
{
>         return super.reduce(reduceFun)      // reduce each subwindow
>                 .windowAll(windowAssigner)  // synchronize
>                 .reduce(reduceFun);         // sequential aggregate of
>     }
>
>     // Cannot override because we need an additional reduce function of type R
>     // to recombine the result for each window
>     // @Override
>     public <R> SingleOutputStreamOperator<R> applyPar(ReduceFunction<T>
reduceFunction,
>                                                     WindowFunction<T, R, Integer,
W> function,
>                                                     ReduceFunction<R> reduceWindowsFunction)
{
>         return super.apply(reduceFunction, function)
>                     .windowAll(windowAssigner)
>                      .reduce(reduceWindowsFunction);
>     }
> }
>
> Maybe someone might find this interesting. I have a toy example program in
> https://github.com/juanrh/flink-state-eviction/blob/05676ca0eebf83e936b5cc04ecf85e8110ccacf4/src/main/java/com/github/juanrh/streaming/windowAllPoCs/WindowAllTimeKeyedPoC.java
> for the curious.
>
> Greetings,
>
> Juan
>

Mime
View raw message