flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: TestStreamEnvironment: await last flush of processing time-based windows
Date Tue, 24 Jan 2017 16:28:19 GMT
Hi,
I'm afraid there is no way of making this work with the current
implementation. Especially getting this to work in a distributed setting
seems hard.

I'm very open for suggestions on this topic, though. :-)

Cheers,
Aljoscha

On Mon, 23 Jan 2017 at 23:19 Steven Ruppert <steven@fullcontact.com> wrote:

> Hi,
>
> I'm attempting to unit test link with the flink-test-utils support, on
> flink 1.1.4. I've got basic flatMap stuff flowing through just fine,
> but when running any processing time-based windowing functions,
> `env.execute()` will return before any values are flushed out of the
> windows.
>
> import org.apache.flink.api.java.tuple.Tuple2;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.util.TestStreamEnvironment;
> import org.junit.Test;
>
> import java.util.concurrent.atomic.AtomicBoolean;
>
> import static org.junit.Assert.assertTrue;
>
> public class TestMinimal {
>     static AtomicBoolean sinked = new AtomicBoolean(false);
>     @Test
>     public void testThing() throws Exception {
>         StreamExecutionEnvironment env =
>             TestStreamEnvironment.getExecutionEnvironment();
>
>         env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 1))
>             .keyBy(0)
>             .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
>             .sum(1)
>             .addSink(new SinkFunction<Tuple2<String, Integer>>() {
>                 @Override
>                 public void invoke(Tuple2<String, Integer> value)
> throws Exception {
>                     sinked.set(true);
>                 }
>             });
>         env.execute();
>         // presumably once execute returns, all elements have passed
> through all operators.
>         assertTrue(sinked.get());
>     }
> }
>
> Is there a way to make this test pass?
>
> Using event time windows instead does seem to work, but processing
> time would be a little more convenient.
>
> --
> *CONFIDENTIALITY NOTICE: This email message, and any documents, files or
> previous e-mail messages attached to it is for the sole use of the intended
> recipient(s) and may contain confidential and privileged information. Any
> unauthorized review, use, disclosure or distribution is prohibited. If you
> are not the intended recipient, please contact the sender by reply email
> and destroy all copies of the original message.*
>

Mime
View raw message