kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwen Shapira <g...@confluent.io>
Subject Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging
Date Fri, 10 Feb 2017 22:33:33 GMT
Thank you so much for the detailed explanation!
I got the READ COMMITTED behavior. It is very odd that we will
routinely see parts of a committed transaction but not other parts.
But I understand the reasons.

Regarding transactional.id: I read the document but I'm still not 100%
clear on how unique transactional ids can happen in practice.
I'm sure you've thought of this, but I'd like to understand:
Right now, I'm writing an consume-process-produce app, I give it a
consumer group.id, and it is safe to deploy the app multiple times
with the same configuration - because all the instances will have the
same group.id. But it sounds like each instance will need its own
transactional.id, so it can commit its own transactions? Is the idea
that I'll read the configuration from a file and an admin will be
responsible to come up with unique transactional IDs while deploying?
This sounds a bit more complex than I'd like. Maybe you have a simple
solution I am not seeing...

Gwen




On Fri, Feb 10, 2017 at 12:25 AM, Jason Gustafson <jason@confluent.io> wrote:
> Hey Gwen,
>
> Thanks for the questions and comments. Responses below:
>
> I not sure I'm clear on the expected behavior of READ_COMMITTED in
>> some interleaved cases:
>> * If a transaction starts, sends few messages and someone writes
>> non-transactional event into the same topic/partition, few more events
>> from that transaction and then a commit. I think the producer will
>> block until the commit, but I'm not sure in what order I'll see events
>> after that.
>
> * Same for transaction A starts, transactions B starts, transaction B
>> commits, transaction A commits... when will we unblock? and what will
>> we see?
>
>
> In READ_COMMITTED (as well as in READ_UNCOMMITTED), the messages are always
> returned in the order of their offsets. What controls the visibility of
> messages in READ_COMMITTED is the last stable offset (LSO). This is defined
> as the last offset in the log such that all messages with smaller offsets
> have been decided (either committed or aborted).
>
> Take the first case: non-transactional data interleaved with transactional
> data. Let N be a non-transactional message, T be a transactional message,
> and C be a commit marker. Suppose we have the following state of the log
> (the first row is the offsets, the second is the messages):
>
> 0, 1, 2
> N, T, N
>
> The LSO in this log is 1, which means the non-transactional message at
> offset 0 can be read by a consumer, but the one at offset 2 cannot because
> the transaction beginning at offset 1 has not completed. Later we get some
> more data:
>
> 0, 1, 2, 3, 4
> N, T, N, N, T
>
> The LSO still cannot advance because the transaction beginning at offset 1
> has not completed. Finally we get the commit marker:
>
> 0, 1, 2, 3, 4, 5
> N, T, N, N, T, C
>
> At this point, the LSO advances to 6, and all of the messages from offset 1
> to 5 are returned, in that order, to consumers.
>
> Now consider interleaved transactions. Let T1 be a transactional message
> from producer 1, and T2 be a transactional message from producer 2. Both
> producers begin a transaction:
>
> 0, 1, 2
> T1, T2, T1
>
> The LSO is 0 since the transaction starting from that offset has not been
> decided. Now suppose T1 commits.
>
> 0, 1, 2, 3
> T1, T2, T1, C1
>
> The LSO then advances to offset 1. The message at offset 0 becomes visible
> to consumers in READ_COMMITTED mode, but the remaining messages from the
> transaction are blocked until T2 is decided. Now T2 commits:
>
> 0, 1, 2, 3, 4, 5
> T1, T2, T1, C1, T2, C2
>
> The LSO advances to 6, and just like above, all of the data from offsets 1
> to 5 becomes visible.
>
> This ordering is what we refer to as "offset order." We always return
> messages in the order of their offsets. Another interesting option to
> consider is "transaction order." Consider again the state after T1
> committed:
>
> 0, 1, 2, 3
> T1, T2, T1, C1
>
> In transaction order, all of the messages from T1 become visible
> immediately upon receiving the commit marker. Later when T2 commits, its
> messages will also becomes visible. The main advantage is lower latency,
> but it is quite a bit more complicated to implement (see the design
> document for the discussion). We went with the simpler approach and left
> this for possible future work.
>
> i'm concerned about transactional.id uniqueness in real-world conditions.
>> When users were forced to manually define unique broker ids, we saw
>> lots of errors with "duplicate broker id" on startup show up on the
>> mailing list.
>> When we added a persistent (to filesystem) auto-generated broker ID,
>> the number of issues dropped dramatically.
>> I wonder if this makes sense here too.
>
>
> There has been a lot of discussion about this. I will link you to the
> section in the design document:
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#heading=h.lizhp2urqn59
> .
>
> It sounds like producers will now be idempotent by default. I.e. - if
>> we upgrade and do nothing different, we have idempotent producer? and
>> absolutely no way to get the old behavior back?
>
>
> This may have been a case of the KIP wiki getting out of sync with the
> design document. I've updated it to include the `enable.idempotence`
> producer config which was added previously to the design document. By
> default, idempotence will be disabled. Down the road, it may make sense to
> change that, but we're uncomfortable turning it on in the first version
> because of the subtle changes in behavior (in particular, the fact that we
> raise the fatal OutOfSequenceException in the case of message loss or PID
> expiration).
>
> Thanks,
> Jason
>
> On Thu, Feb 9, 2017 at 10:41 PM, Gwen Shapira <gwen@confluent.io> wrote:
>
>> It sounds like producers will now be idempotent by default. I.e. - if
>> we upgrade and do nothing different, we have idempotent producer? and
>> absolutely no way to get the old behavior back?
>>
>> I don't think anyone needs non-idempotent producer, just want to clarify :)
>>
>> On Tue, Feb 7, 2017 at 3:10 PM, Jason Gustafson <jason@confluent.io>
>> wrote:
>> > A couple follow-ups from Ismael's previous comments.
>> >
>> > 1. We have removed the individual message CRC field from the message
>> > format. Because this field can already change on the broker in some
>> > situations, we feel it is probably not wise to let clients depend on it,
>> > and removing it saves some space and the redundant verification on the
>> > broker. We have also proposed to deprecate the checksum() APIs which are
>> > exposed in RecordMetadata and ConsumerRecord.
>> > 2. We changed the message timestamp field to be variable length. The
>> values
>> > are computed relative to the timestamp of the first message in the set.
>> We
>> > considered my previous suggestion to make the first message timestamp
>> > absolute with later messages relative to their previous timestamp, but
>> > ultimately felt it was simpler to stick with the same pattern that is
>> used
>> > for relative offsets. To enable this, we added an initial timestamp field
>> > to the message set.
>> >
>> > Thanks,
>> > Jason
>> >
>> > On Mon, Feb 6, 2017 at 6:01 PM, Apurva Mehta <apurva@confluent.io>
>> wrote:
>> >
>> >> Hello,
>> >>
>> >> I have gone ahead and updated the KIP wiki with a summary of the
>> changes to
>> >> the RPC protocol. The KIP wiki should now have _all_ the public facing
>> >> changes being proposed.
>> >>
>> >> The proposed changes were always in the Google doc, and now we are
>> simply
>> >> making good on our promise to copy them over to the wiki since the
>> design
>> >> is almost finalized.
>> >>
>> >> Thanks,
>> >> Apurva
>> >>
>> >> On Mon, Feb 6, 2017 at 4:02 PM, Jason Gustafson <jason@confluent.io>
>> >> wrote:
>> >>
>> >> > Hey Tom,
>> >> >
>> >> > Re; complexity. This is always a tradeoff with new features. The
>> changes
>> >> > we've made during the design and review process have greatly
>> simplified
>> >> the
>> >> > implementation for clients, and especially for the consumer, but
>> there is
>> >> > nontrivial work needed here to support transactions on the producer.
>> I'm
>> >> > not sure how it could be otherwise and we've spent a ton of time
>> thinking
>> >> > about this. It's also worth mentioning that there's a relatively low
>> bar
>> >> to
>> >> > support the idempotent producer while keeping the client thin (it
>> >> requires
>> >> > support for one new request type and some sequence bookkeeping).
>> >> >
>> >> > Ultimately, we have to decide whether the improved semantics are worth
>> >> the
>> >> > cost of the complexity. In my opinion, they are. The benefit of having
>> >> > exactly-once processing in streaming applications is huge. And to
>> manage
>> >> > the complexity, we've intentionally used patterns that were already
>> >> > familiar in the codebase (e.g. our approach to maintaining transaction
>> >> > state through a coordinator is copied from how consumer offsets are
>> >> > managed). We've moved as much work from the clients to the broker as
>> >> > possible, and we have resisted at every turn complicating the client
>> APIs
>> >> > even where it may have simplified some internals.
>> >> >
>> >> > -Jason
>> >> >
>> >> > On Mon, Feb 6, 2017 at 2:55 PM, Apurva Mehta <apurva@confluent.io>
>> >> wrote:
>> >> >
>> >> > > Hi Tom,
>> >> > >
>> >> > > I updated the KIP with a note our plans for performance testing:
>> >> > >
>> >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-
>> >> > > ExactlyOnceDeliveryandTransactionalMessaging-Performance
>> >> > >
>> >> > > Thanks for pointing that out.
>> >> > >
>> >> > > Regards,
>> >> > > Apurva
>> >> > >
>> >> > >
>> >> > >
>> >> > >
>> >> > >
>> >> > > On Mon, Feb 6, 2017 at 7:40 AM, Tom Crayford <tcrayford@heroku.com>
>> >> > wrote:
>> >> > >
>> >> > > > I think the updated wiki page makes sense with respect to
ACLs,
>> there
>> >> > > seems
>> >> > > > to be little potential for abuse there (other than the noted
and
>> >> known
>> >> > > > issues).
>> >> > > >
>> >> > > > I am going to note that this is a major complexity increase
for
>> >> Kafka,
>> >> > > and
>> >> > > > that I'm concerned about performance impact (the JVM is quite…
>> >> pedantic
>> >> > > > about method size, for example, and even adding conditionals
to
>> >> larger
>> >> > > > methods could impact this). The KIP doesn't note plans for
>> >> performance
>> >> > > > testing.
>> >> > > >
>> >> > > > I'm also concerned about the impact on non-JVM client libraries
-
>> >> > > writing a
>> >> > > > client for Kafka is already a very complicated undertaking,
and
>> this
>> >> > adds
>> >> > > > to that complexity significantly.
>> >> > > >
>> >> > > > However, the approach seems ok enough. It does also violate
the
>> >> "Kafka
>> >> > > has
>> >> > > > dumb brokers and smart clients" (which I realize is in direct
>> >> > > contradiction
>> >> > > > of the previous statement about client implementation being
hard).
>> >> I'd
>> >> > > love
>> >> > > > to see some discussion in either the doc or the wiki as to
why
>> much
>> >> of
>> >> > > this
>> >> > > > transactional work isn't a client-side part of Kafka Streams.
>> >> > > >
>> >> > > > On Sat, Feb 4, 2017 at 3:38 AM, Jason Gustafson <
>> jason@confluent.io>
>> >> > > > wrote:
>> >> > > >
>> >> > > > > One additional note on the authorization. The WriteTxnMarker
>> API is
>> >> > > > > restricted to inter-broker usage, so it requires Cluster
>> >> > authorization
>> >> > > > > (just like other inter-broker APIs). I've updated the
document
>> and
>> >> > wiki
>> >> > > > to
>> >> > > > > reflect this.
>> >> > > > >
>> >> > > > > Also, I have renamed GroupCoordinatorRequest to
>> >> > FindCoordinatorRequest
>> >> > > > > since there is no group for transactional producers.
Let me
>> know if
>> >> > > there
>> >> > > > > are any concerns.
>> >> > > > >
>> >> > > > > -Jason
>> >> > > > >
>> >> > > > > On Fri, Feb 3, 2017 at 2:35 PM, Jason Gustafson <
>> >> jason@confluent.io>
>> >> > > > > wrote:
>> >> > > > >
>> >> > > > > > Hi Tom,
>> >> > > > > >
>> >> > > > > > I said this in the voting thread, but can the authors
include
>> a
>> >> > > section
>> >> > > > > >> about new ACLs if there are going to be ACLs
for
>> >> TransactionalId.
>> >> > > It's
>> >> > > > > >> mentioned in the google doc, but I think new
ACLs should be
>> in a
>> >> > KIP
>> >> > > > > >> directly.
>> >> > > > > >
>> >> > > > > >
>> >> > > > > > We've updated the wiki. Can you take a look and
let us know if
>> >> you
>> >> > > have
>> >> > > > > > additional concerns?
>> >> > > > > >
>> >> > > > > > Thanks,
>> >> > > > > > Jason
>> >> > > > > >
>> >> > > > > > On Fri, Feb 3, 2017 at 1:52 PM, Rajini Sivaram
<
>> >> > > > rajinisivaram@gmail.com>
>> >> > > > > > wrote:
>> >> > > > > >
>> >> > > > > >> Hi Jason,
>> >> > > > > >>
>> >> > > > > >> Thank you for the responses. Agree that authorizing
>> >> > > transactional.id
>> >> > > > in
>> >> > > > > >> the
>> >> > > > > >> producer requests will be good enough for version
1. And
>> making
>> >> it
>> >> > > > > tighter
>> >> > > > > >> in future based on delegation tokens sounds
good too.
>> >> > > > > >>
>> >> > > > > >> Regards,
>> >> > > > > >>
>> >> > > > > >> Rajini
>> >> > > > > >>
>> >> > > > > >>
>> >> > > > > >> On Fri, Feb 3, 2017 at 8:04 PM, Jason Gustafson
<
>> >> > jason@confluent.io
>> >> > > >
>> >> > > > > >> wrote:
>> >> > > > > >>
>> >> > > > > >> > Hey Rajini,
>> >> > > > > >> >
>> >> > > > > >> > Thanks for the questions. Responses below:
>> >> > > > > >> >
>> >> > > > > >> >
>> >> > > > > >> > > 1. Will the transaction coordinator
check topic ACLs
>> based
>> >> on
>> >> > > the
>> >> > > > > >> > > requesting client's credentials?
Access to transaction
>> logs,
>> >> > > > topics
>> >> > > > > >> being
>> >> > > > > >> > > added for transaction etc?
>> >> > > > > >> >
>> >> > > > > >> >
>> >> > > > > >> > Good question. I think it makes sense
to check topic Write
>> >> > > > permission
>> >> > > > > >> when
>> >> > > > > >> > adding partitions to the transaction.
I'll add this to the
>> >> > > document.
>> >> > > > > >> > Perhaps authorization to the transaction
log itself,
>> however,
>> >> > can
>> >> > > be
>> >> > > > > >> > assumed from having access to the ProducerTransactionalId
>> >> > > resource?
>> >> > > > > This
>> >> > > > > >> > would be similar to how access to __consumer_offsets
is
>> >> assumed
>> >> > if
>> >> > > > the
>> >> > > > > >> > client has access to the Group resource.
>> >> > > > > >> >
>> >> > > > > >> > 2. If I create a transactional produce
request (by hand,
>> not
>> >> > using
>> >> > > > the
>> >> > > > > >> > > producer API) with a random PID (random,
hence unlikely
>> to
>> >> be
>> >> > in
>> >> > > > > use),
>> >> > > > > >> > will
>> >> > > > > >> > > the broker append a transactional
message to the logs,
>> >> > > preventing
>> >> > > > > LSO
>> >> > > > > >> > from
>> >> > > > > >> > > moving forward? What validation will
broker do for PIDs?
>> >> > > > > >> >
>> >> > > > > >> >
>> >> > > > > >> > Yes, that is correct. Validation of the
TransactionalId to
>> PID
>> >> > > > binding
>> >> > > > > >> is a
>> >> > > > > >> > known gap in the current proposal, and
is discussed in the
>> >> > design
>> >> > > > > >> document.
>> >> > > > > >> > Now that I'm thinking about it a bit more,
I think there
>> is a
>> >> > good
>> >> > > > > case
>> >> > > > > >> for
>> >> > > > > >> > including the TransactionalId in the ProduceRequest
(I
>> think
>> >> Jun
>> >> > > > > >> suggested
>> >> > > > > >> > this previously). Verifying it does not
ensure that the
>> >> included
>> >> > > PID
>> >> > > > > is
>> >> > > > > >> > correct, but it does ensure that the client
is authorized
>> to
>> >> use
>> >> > > > > >> > transactions. If the client wanted to
do an "endless
>> >> transaction
>> >> > > > > >> attack,"
>> >> > > > > >> > having Write access to the topic and an
authorized
>> >> > transactionalID
>> >> > > > is
>> >> > > > > >> all
>> >> > > > > >> > they would need anyway even if we could
authorize the PID
>> >> > itself.
>> >> > > > This
>> >> > > > > >> > seems like a worthwhile improvement.
>> >> > > > > >> >
>> >> > > > > >> > For future work, my half-baked idea to
authorize the PID
>> >> binding
>> >> > > is
>> >> > > > to
>> >> > > > > >> > leverage the delegation work in KIP-48.
When the PID is
>> >> > generated,
>> >> > > > we
>> >> > > > > >> can
>> >> > > > > >> > give the producer a token which is then
used in produce
>> >> requests
>> >> > > > (say
>> >> > > > > an
>> >> > > > > >> > hmac covering the TransactionalId and
PID).
>> >> > > > > >> >
>> >> > > > > >> >
>> >> > > > > >> > > 3. Will every broker check that a
client sending
>> >> transactional
>> >> > > > > produce
>> >> > > > > >> > > requests at least has write access
to transaction log
>> topic
>> >> > > since
>> >> > > > it
>> >> > > > > >> is
>> >> > > > > >> > not
>> >> > > > > >> > > validating transactional.id (for
every produce request)?
>> >> > > > > >> >
>> >> > > > > >> >  4. I understand that brokers cannot authorize
the
>> >> transactional
>> >> > > id
>> >> > > > > for
>> >> > > > > >> > each
>> >> > > > > >> > > produce request since requests contain
only the PID. But
>> >> since
>> >> > > > there
>> >> > > > > >> is a
>> >> > > > > >> > > one-to-one mapping between PID and
transactional.id,
>> and a
>> >> > > > > >> connection is
>> >> > > > > >> > > never expected to change its transactional.id,
perhaps
>> it
>> >> is
>> >> > > > > >> feasible to
>> >> > > > > >> > > add authorization and cache the results
in the Session?
>> >> > Perhaps
>> >> > > > not
>> >> > > > > >> for
>> >> > > > > >> > > version 1, but feels like it will
be good to close the
>> >> > security
>> >> > > > gap
>> >> > > > > >> here.
>> >> > > > > >> > > Obviously it would be simpler if
transactional.id was in
>> >> the
>> >> > > > > produce
>> >> > > > > >> > > request if the overhead was acceptable.
>> >> > > > > >> >
>> >> > > > > >> >
>> >> > > > > >> > I think my response above addresses both
of these. We
>> should
>> >> > > include
>> >> > > > > the
>> >> > > > > >> > TransactionalId in the ProduceRequest.
Of course it need
>> not
>> >> be
>> >> > > > > >> included in
>> >> > > > > >> > the message format, so I'm not too concerned
about the
>> >> > additional
>> >> > > > > >> overhead
>> >> > > > > >> > it adds.
>> >> > > > > >> >
>> >> > > > > >> > Thanks,
>> >> > > > > >> > Jason
>> >> > > > > >> >
>> >> > > > > >> >
>> >> > > > > >> > On Fri, Feb 3, 2017 at 6:52 AM, Ismael
Juma <
>> >> ismael@juma.me.uk>
>> >> > > > > wrote:
>> >> > > > > >> >
>> >> > > > > >> > > Comments inline.
>> >> > > > > >> > >
>> >> > > > > >> > > On Thu, Feb 2, 2017 at 6:28 PM, Jason
Gustafson <
>> >> > > > jason@confluent.io
>> >> > > > > >
>> >> > > > > >> > > wrote:
>> >> > > > > >> > >
>> >> > > > > >> > > > Took me a while to remember
why we didn't do this. The
>> >> > > timestamp
>> >> > > > > >> that
>> >> > > > > >> > is
>> >> > > > > >> > > > included at the message set
level is the max timestamp
>> of
>> >> > all
>> >> > > > > >> messages
>> >> > > > > >> > in
>> >> > > > > >> > > > the message set as is the case
in the current message
>> >> format
>> >> > > (I
>> >> > > > > will
>> >> > > > > >> > > update
>> >> > > > > >> > > > the document to make this explicit).
We could make the
>> >> > message
>> >> > > > > >> > timestamps
>> >> > > > > >> > > > relative to the max timestamp,
but that makes
>> >> serialization
>> >> > a
>> >> > > > bit
>> >> > > > > >> > awkward
>> >> > > > > >> > > > since the timestamps are not
assumed to be increasing
>> >> > > > sequentially
>> >> > > > > >> or
>> >> > > > > >> > > > monotonically. Once the messages
in the message set had
>> >> been
>> >> > > > > >> > determined,
>> >> > > > > >> > > we
>> >> > > > > >> > > > would need to go back and adjust
the relative
>> timestamps.
>> >> > > > > >> > > >
>> >> > > > > >> > >
>> >> > > > > >> > > Yes, I thought this would be a bit
tricky and hence why I
>> >> > > > mentioned
>> >> > > > > >> the
>> >> > > > > >> > > option of adding a new field at the
message set level for
>> >> the
>> >> > > > first
>> >> > > > > >> > > timestamp even though that's not
ideal either.
>> >> > > > > >> > >
>> >> > > > > >> > > Here's one idea. We let the timestamps
in the messages be
>> >> > > varints,
>> >> > > > > >> but we
>> >> > > > > >> > > > make their values be relative
to the timestamp of the
>> >> > previous
>> >> > > > > >> message,
>> >> > > > > >> > > > with the timestamp of the first
message being absolute.
>> >> For
>> >> > > > > >> example, if
>> >> > > > > >> > > we
>> >> > > > > >> > > > had timestamps 500, 501, 499,
then we would write 500
>> for
>> >> > the
>> >> > > > > first
>> >> > > > > >> > > > message, 1 for the next, and
-2 for the final message.
>> >> Would
>> >> > > > that
>> >> > > > > >> work?
>> >> > > > > >> > > Let
>> >> > > > > >> > > > me think a bit about it and
see if there are any
>> problems.
>> >> > > > > >> > > >
>> >> > > > > >> > >
>> >> > > > > >> > > It's an interesting idea. Comparing
to the option of
>> having
>> >> > the
>> >> > > > > first
>> >> > > > > >> > > timestamp in the message set, It's
a little more space
>> >> > efficient
>> >> > > > as
>> >> > > > > we
>> >> > > > > >> > > don't have both a full timestamp
in the message set
>> _and_ a
>> >> > > varint
>> >> > > > > in
>> >> > > > > >> the
>> >> > > > > >> > > first message (which would always
be 0, so we avoid the
>> >> extra
>> >> > > > byte)
>> >> > > > > >> and
>> >> > > > > >> > > also the deltas could be a little
smaller in the common
>> >> case.
>> >> > > The
>> >> > > > > main
>> >> > > > > >> > > downside is that it introduces a
semantics inconsistency
>> >> > between
>> >> > > > the
>> >> > > > > >> > first
>> >> > > > > >> > > message and the rest. Not ideal,
but maybe we can live
>> with
>> >> > > that.
>> >> > > > > >> > >
>> >> > > > > >> > > Ismael
>> >> > > > > >> > >
>> >> > > > > >> >
>> >> > > > > >>
>> >> > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>>
>>
>>
>> --
>> Gwen Shapira
>> Product Manager | Confluent
>> 650.450.2760 | @gwenshap
>> Follow us: Twitter | blog
>>



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog

Mime
View raw message