kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Elias Levy (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
Date Mon, 26 Sep 2016 22:41:20 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524402#comment-15524402

Elias Levy commented on KAFKA-4212:

The general use case is the joining of updates to two tables over a limited period of time.

Consider a hypothetical monitoring service that that allows clients to query the status of
nodes.  The application may wish to inform the clients whenever the status of a node that
they have queried changes, but only if the client has queried the status during the past 24
hours and if the last status for a node is different from the last status the client received.

To do so the service can consume a stream of client node status queries with their responses
and node status updates.  From the stream of client node status queries the service would
maintain a cache of the last status for a node sent to to a client such that entries expire
after 24 hours.  From the node status updates the service would maintain a mapping of node
to latest status.

When a client query is received, the service can check on the node status mapping to see if
there is a newer status, and if there is, generate a notification.  When a node status update
is received, the service can check the last status sent to clients in the cache and generate
a notification with the new status to all clients that previously queried for a node's status.

As an optimization the mapping of nodes to latest status can also be a cache with a TTL, since
you don't need to keep the statuses of a nodes that haven't changed in more than 24 hours,
as you'll never receive a delayed node status query to match it against.

Abstractly this is equivalent to a {{KTable}}-{{KTable}} inner join where entries in each
{{KTable}} expire after some TTL, and where one table has a composite primary key (node id
and client id on one {{KTable}} vs just node it on the other).

It could also be though as a windowed {{KTable}} - {{KTable}} join (although in such case
records that fall outside the window would never be used and are just wasting space), or a
windowed {{KStream}}-{{KStream}} join of table updates where only the latest updated values
are used (i.e. discard updates in the window if there is a newer update).  Although, again,
these would be joins where the primary keys are not identical as one is a composite.

Alas, Kafka Streams does not support windowed {{KTable}}-{{KTable}} joins, TTL'ed {{KeyValueStore}}
s, or joins across {{KTable}} s and/or {{KStream}} s with different keys.

That said, the above service can be implemented by joining the client status query and client
status updates streams using custom processors and by abusing {{WindowStore}}.  {{WindowStore}}
can be used as a form of TTL'ed {{KeyValueStore}}, as it will drop old values that fall out
of its window, and by iterating in reverse order and only using the latest value. And since
it allows you to store multiple values for the same key (node id), you can record the node
status you handed out to clients (node id key; client id, status, and timestamp as value)
and then iterate over all of them for a given node id keeping only the latest one for each
client id when a node status update comes in an you perform the join.

> Add a key-value store that is a TTL persistent cache
> ----------------------------------------------------
>                 Key: KAFKA-4212
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4212
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions:
>            Reporter: Elias Levy
>            Assignee: Guozhang Wang
> Some jobs needs to maintain as state a large set of key-values for some period of time.
 I.e. they need to maintain a TTL cache of values potentially larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  Neither
is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as required, but
does not support expiration.  The TTL option of RocksDB is explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment dropping,
but it stores multiple items per key, based on their timestamp.  But this store can be repurposed
as a cache by fetching the items in reverse chronological order and returning the first item
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here we desire
a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be useful to
have an official and proper TTL cache API and implementation.

This message was sent by Atlassian JIRA

View raw message