kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Evgeny Veretennikov (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4468) Correctly calculate the window end timestamp after read from state stores
Date Thu, 06 Jul 2017 13:13:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076472#comment-16076472
] 

Evgeny Veretennikov commented on KAFKA-4468:
--------------------------------------------

Let me show you such example:

{code:java}
final Serde<Windowed<String>> windowedSerde = new WrapperSerde(
    new WindowedSerializer<>(new StringSerializer()),
    new WindowedDeserializer<>(new StringDeserializer())
);
final String topic = "name";
final RocksDBStore<Windowed<String>, String> store = new RocksDBStore<>(topic,
windowedSerde, Serdes.String());
final MockProcessorContext context = ...;
context.setRecordContext(...);
store.init(context, store);
store.put(new Windowed<>("key1", new TimeWindow(100, 123)), "value1");
store.put(new Windowed<>("key2", new TimeWindow(101, 456)), "value2");
final KeyValueIterator<Windowed<String>, String> all = store.all();
all.next(); // KeyValue([key1@100/9223372036854775807], value1)
all.next(); // KeyValue([key2@101/9223372036854775807], value2)
{code}

We are able to put in store two time windows with different window sizes. When we try to get
them back from store, we get two windows with proper begins, but broken ends ({{Long.MAX_VALUE}},
as in {{WindowedDeserializer}}). So, we are unable to calculate window end without saving
it in {{WindowSerializer}}.

Now it seems, that [~bbejeck] was actually correct about this:

{noformat}
Unless I'm missing something, this task implies we'll need to include the window_size (and
forgo the 8 bytes per key storage savings) on serialization with WindowedSerializer. As after
we've read it via the WindowedDeserializer we only have the key and the start timestamp and
don't have access to the original window_size to do the calculation.
{noformat}

> Correctly calculate the window end timestamp after read from state stores
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-4468
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4468
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>              Labels: architecture
>
> When storing the WindowedStore on the persistent KV store, we only use the start timestamp
of the window as part of the combo-key as (start-timestamp, key). The reason that we do not
add the end-timestamp as well is that we can always calculate it from the start timestamp
+ window_length, and hence we can save 8 bytes per key on the persistent KV store.
> However, after read it (via {{WindowedDeserializer}}) we do not set its end timestamp
correctly but just read it as an {{UnlimitedWindow}}. We should fix this by calculating its
end timestamp as mentioned above.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message