flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Avi Levi <avi.l...@bluevoyant.com>
Subject Re: ingesting time for TimeCharacteristic.IngestionTime on unit test
Date Sat, 23 Mar 2019 08:31:57 GMT
Any idea what should I do to overcome this?

On Wed, Mar 20, 2019 at 7:17 PM Avi Levi <avi.levi@bluevoyant.com> wrote:

> Hi Andrey,
> I am testing a Filter operator that receives a key from the stream and
> checks if it is a new one or not. if it is new it keeps it in state and
> fire a timer all that is done using the ProcessFunction.
> The testing is using some CollectSink as described here
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing>
and
> the source is implementation of the SourceFunction that accepts a
> collection of values and adds it to ctx.collect .
> The ctx.timestamp() is null, BUT even if I set the timer to sometime in
> the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp +
> x) the timer is fired immediately.
>
>
> On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin <andrey@ververica.com>
> wrote:
>
>> Hi Avi,
>>
>> what is the structure of your unit test? do you create some source and
>> then apply function or you test only ProcessFunction methods in isolation?
>> does ctx.timestamp() return zero or which value?
>>
>> Best,
>> Andrey
>>
>>
>> On Tue, Mar 19, 2019 at 9:19 PM Avi Levi <avi.levi@bluevoyant.com> wrote:
>>
>>> Hi Andrey ,
>>> I'm using IngestionTime
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>
>>> This is my timer in the processElement:
>>>    val nextTime: Long = ctx.timestamp()  + daysInMilliseconds(14)
>>>    ctx.timerService.registerProcessingTimeTimer(nextTim)
>>>
>>> The problem is how do I use it in my unit tests ? since there is no
>>> IngestionTime and timers are fired immediately so the timers actions (such
>>> as state cleanup) are fired before time and causing the tests to fail .
>>>
>>>
>>>
>>>
>>> On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin <andrey@ververica.com>
>>> wrote:
>>>
>>>> Hi Avi,
>>>>
>>>> do you use processing time timer
>>>> (timerService().registerProcessingTimeTimer)?
>>>> why do you need ingestion time? do you
>>>> set TimeCharacteristic.IngestionTime?
>>>>
>>>> Best,
>>>> Andrey
>>>>
>>>> On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <avi.levi@bluevoyant.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> Our stream is not based on time sequence and we do not use time based
>>>>> operations. we do want to clean the state after x days hence we fire
timer
>>>>> event. My problem is that our unit test fires the event immediately (there
>>>>> is no ingestion time) how can I inject ingestion time ?
>>>>>
>>>>> Cheers
>>>>> Avi
>>>>>
>>>>>

Mime
View raw message