kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John Roesler" <vvcep...@apache.org>
Subject Re: Streams, Kafka windows
Date Thu, 16 Jan 2020 02:58:51 GMT
Hi Viktor,

I’m not sure why you get two identical outputs in response to a single record. Regardless,
since you say that you want to get a single, final result for the window and you expect multiple
inputs to the windows, you need Suppression.

My guess is that you just sent one record to try it out and didn’t see any output? This
is expected. Just as the window boundaries are defined by the time stamps of the records,
not by the current system time, suppression is governed by the timestamp of the records. I.e.,
a thirty-second window is not actually closed until you see a new record with a timestamp
thirty seconds later.

 Maybe try just sending a sequence of updates with incrementing timestamps. If the first record
has timestamp T, then you should see an output when you pass in a record with timestamp T+30.


Important note: there is a built-in grace period that delays the output of final results after
the window ends. For complicated reasons, the default is 24 hours! So you would actually not
see an output until you send a record with timestamp T+30+(24 hours) ! I strongly recommend
you set the grace period on TimeWindows to zero for your testing. You can increase it later
if you want to tolerate some late-arriving records. 

Thanks,
-John

On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> Hi,
> 
> My name is Viktor. I'm currently working with Kafka streams and have
> several questions about Kafka and I can not find answers in the official
> docs.
> 
> 1. Why suppress functionality does not work with Hopping windows? How to
> make it work?
> 
> Example of the code:
> 
> KStream<String, String> finalStream = source
>                 .groupByKey()
> 
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
>                 .reduce((aggValue, newValue) -> newValue,
> Materialized.with(Serdes.String(), Serdes.String()))
> 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>                 .toStream();
> 
> finalStream.print(Printed.toSysOut());
> finalStream.to(outputTopic);
> 
> After I run the code above - output stream is empty. There were no
> errors/exceptions.
> NOTE: With Tumbling Window the code working as expected.
> Maybe I simply use it incorrectly?
> 
> 2. Why with Hopping windows (without suppress) there are duplicates in the
> output stream?
> E.g., I send one record in the input kstream with Hopping window
> (duration=30s, advanceBy=2s) but get two same records (duplicate) in the
> output kstream.
> Is that an expected behavior? If so, how can I filter/switch off these
> duplicates?
> 
> 3. Mainly I'm trying to solve this problem:
> I have kstream with events inside and events can be repeated (duplicates).
> In the output kstream I would like to receive only unique events for the
> last 24 hours (window duration) with 1 hour window overlay (window
> advanceBy).
> Could you recommend me any examples of code or docs please?
> I have already read official docs and examples but it was not enough to get
> full understanding of how I can achieve this.
> 
> Best regards,
> Viktor Markvardt
>

Mime
View raw message