flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com>
Subject An idea for a parallel AllWindowedStream
Date Wed, 09 Nov 2016 05:35:02 GMT

As a self training exercise I've defined a class extending WindowedStream
for implementing a proof of concept for a parallel version of

 * Tries to create a parallel version of a AllWindowStream for a DataStream
 * by creating a KeyedStream by using as key the hash of the elements module
 * a parallelism level
 * This only makes sense for window assigners that ensure the subwindows will be
 * in sync, like time based window assigners, and it is more stable
with ingestion
 * and event time because the window alignment is more reliable.
 * This doesn't work for counting or sessions window assigners.
 * Also note elements from different partitions might get out of order due
 * to parallelism
 * */
public static class ParAllWindowedStream<T,W extends Window> extends
WindowedStream<T, Integer, W> {
    private final transient WindowAssigner<Object,W> windowAssigner;

    public ParAllWindowedStream(DataStream<T> stream, final int parallelism,
                                WindowAssigner<Object,W> windowAssigner) {
        super(stream.keyBy(new KeySelector<T, Integer>() {
                            public Integer getKey(T value) throws Exception {
                                return value.hashCode() % parallelism;
        this.windowAssigner = windowAssigner;

    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFun) {
        return super.reduce(reduceFun)      // reduce each subwindow
                .windowAll(windowAssigner)  // synchronize
                .reduce(reduceFun);         // sequential aggregate of

    // Cannot override because we need an additional reduce function of type R
    // to recombine the result for each window
    // @Override
    public <R> SingleOutputStreamOperator<R>
applyPar(ReduceFunction<T> reduceFunction,
R, Integer, W> function,
reduceWindowsFunction) {
        return super.apply(reduceFunction, function)

Maybe someone might find this interesting. I have a toy example program in
for the curious.



View raw message