flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ashish pok <ashish...@yahoo.com>
Subject Re: Unit / Integration Test Timer
Date Fri, 14 Sep 2018 17:57:26 GMT
 Hi Till,
To answer your first question, I currently don't (and honestly now sure how other than of
course in IDE I can use breakpoint, or if something like MockIto can do it). So did I interpret
it correctly that it sounds like execution env started using flink-test-utils will essentially
tear down once it consumes last data point (ie. end of collection I am passing) even though
there could be active Timers Registered? 
Further, most of our pipelines are using low-level process functions - we toyed around with
other windowing and session functions but process functions gave the most amount of flexibility
(at least at this point until we can re-visit) and we generate keys for aggregation/windowing
somewhere upstream (say map, flatMap or another process functions). Meaning some pipelines
are event / processing time agnostic in a sense. Although technically within the process functions
we will have timers registered etc. This helped us with unbounded keys, sensor data that could
potentially be backfilled (ie. watermarks have passed way back etc). I wouldn't doubt a bit
there are probably better solutions :)
With that background, I am sort of not following your second note about event time and how
we can leverage that for testing. Our intent is to create sampled input from results and compare
output from tests to results (ie. end to end integration tests) as part of our CICD. Normal
flow seems to work well, just getting "negative" test cases of timeouts seems to be mystery
right now :) So Single Operator harnesses doesn't sound like the right approach. let me know

    On Friday, September 14, 2018, 11:42:17 AM EDT, Till Rohrmann <trohrmann@apache.org>
 Hi Ashish,
how do you make sure that all of your data is not consumed within a fraction of the 2 seconds?
For this it would be better to use event time which allows you to control how time passes.
If you want to test a specific operator you could try out the One/TwoInputStreamOperatorTestHarness.
On Fri, Sep 14, 2018 at 3:36 PM ashish pok <ashishpok@yahoo.com> wrote:

Hopefully a quick one. I feel like I have seen this answered before a few times before but
can't find an appropriate example. I am trying to run few tests where registered timeouts
are invoked (snippet below). Simple example as show in documentation for integration test
(using flink-test-utils) seems to complete even though Timers are registered and have not
been invoked. 
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
      env.setParallelism(1);        CollectSink.values.clear();        // create
a stream of custom elements and apply transformations        env.fromCollection(t.getTestInputs())
.process(new TupleProcessFn()) .keyBy(FactTuple::getKey) .process(new NormalizeDataProcessFn(2))

I have a 2 second processing timer registered. If I put a breakpoint in first TupleProcessFn()
after a few Tuples are collected I can see onTimer being invoked. So what is the trick here?
I went as far as putting in a MapFunction after second process function that has a sleep to
no avail.
View raw message