From user-return-22912-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Sep 14 17:42:18 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0BA58180647 for ; Fri, 14 Sep 2018 17:42:17 +0200 (CEST) Received: (qmail 62057 invoked by uid 500); 14 Sep 2018 15:42:16 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 62048 invoked by uid 99); 14 Sep 2018 15:42:16 -0000 Received: from mail-relay.apache.org (HELO mailrelay2-lw-us.apache.org) (207.244.88.137) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Sep 2018 15:42:16 +0000 Received: from mail-qt0-f182.google.com (mail-qt0-f182.google.com [209.85.216.182]) by mailrelay2-lw-us.apache.org (ASF Mail Server at mailrelay2-lw-us.apache.org) with ESMTPSA id 092A12297 for ; Fri, 14 Sep 2018 15:42:16 +0000 (UTC) Received: by mail-qt0-f182.google.com with SMTP id z8-v6so9104680qto.9 for ; Fri, 14 Sep 2018 08:42:16 -0700 (PDT) X-Gm-Message-State: APzg51BxrD1THKHmfmUez/2NSW/L1MXXIbJ6LTxxZ5qGFaA2fz6g3Ohh 9Cb31UOrgJHgHpGNKiz2iRvxOFWvzOMZmDp81Ik= X-Google-Smtp-Source: ANB0Vdb4rE5OxIBUZngkaCUbtnoxavCLnzU3y7pROGQn76LhCmOsMSb1fLpOxgEtMppkGa70I5SG9oZ3lH9Oo+/j33Y= X-Received: by 2002:ac8:6c8:: with SMTP id j8-v6mr9123592qth.314.1536939735730; Fri, 14 Sep 2018 08:42:15 -0700 (PDT) MIME-Version: 1.0 References: <754783170.5339737.1536932150121.ref@mail.yahoo.com> <754783170.5339737.1536932150121@mail.yahoo.com> In-Reply-To: <754783170.5339737.1536932150121@mail.yahoo.com> From: Till Rohrmann Date: Fri, 14 Sep 2018 17:41:39 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Unit / Integration Test Timer To: Ashish Pokharel Cc: flink-user@apache.org Content-Type: multipart/alternative; boundary="000000000000fd4f160575d6aa6d" --000000000000fd4f160575d6aa6d Content-Type: text/plain; charset="UTF-8" Hi Ashish, how do you make sure that all of your data is not consumed within a fraction of the 2 seconds? For this it would be better to use event time which allows you to control how time passes. If you want to test a specific operator you could try out the One/TwoInputStreamOperatorTestHarness. Cheers, Till On Fri, Sep 14, 2018 at 3:36 PM ashish pok wrote: > All, > > Hopefully a quick one. I feel like I have seen this answered before a few > times before but can't find an appropriate example. I am trying to run few > tests where registered timeouts are invoked (snippet below). Simple example > as show in documentation for integration test (using flink-test-utils) > seems to complete even though Timers are registered and have not been > invoked. > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > CollectSink.values.clear(); > // create a stream of custom elements and apply transformations > env.fromCollection(t.getTestInputs()) > .process(new TupleProcessFn()) > .keyBy(FactTuple::getKey) > .process(new NormalizeDataProcessFn(2)) > .addSink(getSink()) > > env.execute(); > > I have a 2 second processing timer registered. If I put a breakpoint in > first TupleProcessFn() after a few Tuples are collected I can see onTimer > being invoked. So what is the trick here? I went as far as putting in a > MapFunction after second process function that has a sleep to no avail. > > Thanks, > > Ashish > --000000000000fd4f160575d6aa6d Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Ashish,

how do you make sure that al= l of your data is not consumed within a fraction of the 2 seconds? For this= it would be better to use event time which allows you to control how time = passes. If you want to test a specific operator you could try out the One/T= woInputStreamOperatorTestHarness.

Cheers,
Till

On Fri, S= ep 14, 2018 at 3:36 PM ashish pok <ashishpok@yahoo.com> wrote:
All,

Hopefu= lly a quick one. I feel like I have seen this answered before a few times b= efore but can't find an appropriate example. I am trying to run few tes= ts where registered timeouts are invoked (snippet below). Simple example as= show in documentation for integration test (using flink-test-utils) seems = to complete even though Timers are registered and have not been invoked.=C2= =A0

StreamExecutionEnvironment env =3D StreamExecutionEnvironment.get= ExecutionEnvironment();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 env.setParall= elism(1);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 CollectSink.values.clear();=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 // create a stream of custom element= s and apply transformations
=C2=A0 =C2=A0 =C2=A0 =C2=A0 env.fromC= ollection(t.getTestInputs())
.process(new TupleProcessFn())
.keyBy(FactTuple::getKey)
.process(new NormalizeDataProcessFn(2))
.addSink(getSink())
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 =C2=A0 env.execute= ();

I have a 2 second processing timer registe= red. If I put a breakpoint in first TupleProcessFn() after a few Tuples are= collected I can see onTimer being invoked. So what is the trick here? I we= nt as far as putting in a MapFunction after second process function that ha= s a sleep to no avail.

Thanks,

Ashish=C2=A0
--000000000000fd4f160575d6aa6d--