kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: [DISCUSS] KIP-349 Priorities for Source Topics
Date Mon, 17 Sep 2018 16:51:19 GMT
I am not sure if this feature would help with stream-table joins. Also
note, that we recently merged a PR that improves the timestamp
synchronization of Kafka Streams -- this will vastly improve the guarantees.

What I don't understand:

> So table records that have been updated recently will not be read until the stream records
reach or exceed that same timestamp.

Yes, this is on purpose / by design.

> and if they do it will be with old data

What do you mean by "old data"? By definition, the stream record will
join with a table that contains data up-to the stream record's
timestamp. It does semantically not make sense to advance the table
beyond the stream record's timestamp, because if you do this, you would
semantically join with "future data" what---from my point of view---is
semantically incorrect.

Shameless plug: you might want to read
https://www.confluent.io/blog/streams-tables-two-sides-same-coin



-Matthias

On 9/17/18 8:23 AM, Thomas Becker wrote:
> For my part, a major use-case for this feature is stream-table joins. Currently, KafkaStreams
does the wrong thing in some cases because the only message choosing strategy available is
timestamp-based. So table records that have been updated recently will not be read until the
stream records reach or exceed that same timestamp. So there is no guarantee these records
get joined at all, and if they do it will be with old data. I realize we're talking about
the consumer here and not streams specifically, but as it stands I can't even write a non-streams
application that does a join but prioritizes table-topic records over stream records without
using multiple consumers.
> 
> On Wed, 2018-09-05 at 08:18 -0700, Colin McCabe wrote:
> 
> Hi all,
> 
> 
> I agree that DISCUSS is more appropriate than VOTE at this point, since I don't remember
the last discussion coming to a definite conclusion.
> 
> 
> I guess my concern is that this will add complexity and memory consumption on the server
side.  In the case of incremental fetch requests, we will have to track at least two extra
bytes per partition, to know what the priority of each partition is within each active fetch
session.
> 
> 
> It would be nice to hear more about the use-cases for this feature.  I think Gwen asked
about this earlier, and I don't remember reading a response.  The fact that we're now talking
about Samza interfaces is a bit of a red flag.  After all, Samza didn't need partition priorities
to do what it did.  You can do a lot with muting partitions and using appropriate threading
in your code.
> 
> 
> For example, you can hand data from a partition off to a work queue with a fixed size,
which is handled by a separate service thread.  If the queue gets full, you can mute the partition
until some of the buffered data is processed.  Kafka Streams uses a similar approach to avoid
reading partition data that isn't immediately needed.
> 
> 
> There might be some use-cases that need priorities eventually, but I'm concerned that
we're jumping the gun by trying to implement this before we know what they are.
> 
> 
> best,
> 
> Colin
> 
> 
> 
> On Wed, Sep 5, 2018, at 01:06, Jan Filipiak wrote:
> 
> 
> On 05.09.2018 02:38, nick@afshartous.com<mailto:nick@afshartous.com> wrote:
> 
> 
> On Sep 4, 2018, at 4:20 PM, Jan Filipiak <Jan.Filipiak@trivago.com<mailto:Jan.Filipiak@trivago.com>>
wrote:
> 
> 
> what I meant is litterally this interface:
> 
> 
> https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html
<https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html>
> 
> Hi Jan,
> 
> 
> Thanks for the reply and I have a few questions.  This Samza doc
> 
> 
>    https://samza.apache.org/learn/documentation/0.14/container/streams.html <https://samza.apache.org/learn/documentation/0.14/container/streams.html>
> 
> 
> indicates that the chooser is set via configuration.  Are you suggesting adding a new
configuration for Kafka ?  Seems like we could also have a method on KafkaConsumer
> 
> 
>      public void register(MessageChooser messageChooser)
> 
> I don't have strong opinions regarding this. I like configs, i also
> 
> don't think it would be a problem to have both.
> 
> 
> 
> to make it more dynamic.
> 
> 
> Also, the Samza MessageChooser interface has method
> 
> 
>    /* Notify the chooser that a new envelope is available for a processing. */
> 
> void update(IncomingMessageEnvelope envelope)
> 
> 
> and I’m wondering how this method would be translated to Kafka API.  In particular
what corresponds to IncomingMessageEnvelope.
> 
> I think Samza uses the envelop abstraction as they support other sources
> 
> besides kafka aswell. They are more
> 
> on the spark end of things when it comes to different input types. I
> 
> don't have strong opinions but it feels like
> 
> we wouldn't need such a thing in the kafka consumer but just use a
> 
> regular ConsumerRecord or so.
> 
> 
> Best,
> 
> --
> 
>        Nick
> 
> 
> 
> 
> 
> 
> ________________________________
> 
> This email and any attachments may contain confidential and privileged material for the
sole use of the intended recipient. Any review, copying, or distribution of this email (or
any attachments) by others is prohibited. If you are not the intended recipient, please contact
the sender immediately and permanently delete this email and any attachments. No employee
or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc.
by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.
> 


Mime
View raw message