kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From radai <radai.rosenbl...@gmail.com>
Subject Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging
Date Thu, 15 Dec 2016 23:07:07 GMT
some clarifications on my alternative proposal:

TX msgs are written "sideways" to a transaction (ad-hoc) partition. this
partition can be replicated to followers, or can be an in-mem buffer -
depends on the resilience guarantees you want to provide for TXs in case of
broker crash.
on "commit" the partition leader broker (being the single point of
synchronization for the partition anyway) can atomically append the
contents of this TX "partition" onto the real target partition. this is the
point where the msgs get "real" offsets. there's some trickiness around how
not to expose these offsets to any consumers until everything's been
replicated to followers, but we believe its possible.



On Thu, Dec 15, 2016 at 2:31 PM, radai <radai.rosenblatt@gmail.com> wrote:

> I can see several issues with the current proposal.
>
> messages, even if sent under a TX, are delivered directly to their
> destination partitions, downstream consumers need to be TX-aware. they can
> either:
>    1. be oblivious to TXs. that means they will deliver "garbage" - msgs
> sent during eventually-aborted TXs.
>    2. "opt-in" - they would have to not deliver _ANY_ msg until they know
> the fate of all outstanding overlapping TXs - if i see msg A1 (in a TX),
> followed by B, which is not under any TX, i cannot deliver B until i know
> if A1 was committed or not (or I violate ordering). this would require some
> sort of buffering on consumers. with a naive buffering impl i could DOS
> everyone on a topic - just start a TX on a very busy topic and keep it open
> as long as I can ....
>    3. explode if youre an old consumer that sees a control msg (whats your
> migration plan?)
>    4. cross-cluster replication mechanisms either replicate the garbage or
> need to clean it up. there are >1 such different mechanism (almost one per
> company really :-) ) so lots of adjustments.
>
> I think the end result could be better if ongoing TXs are treated as
> logically separate topic partitions, and only atomically appended onto the
> target partitions on commit (meaning they are written to separate journal
> file(s) on the broker).
>
> such a design would present a "clean" view to any downstream consumers -
> anything not committed wont even show up. old consumers wont need to know
> about control msgs, no issues with unbounded msg buffering, generally
> cleaner overall?
>
> there would need to be adjustments made to watermark and follower fetch
> logic but some of us here have discussed this over lunch and we think its
> doable.
>
>
> On Thu, Dec 15, 2016 at 1:08 AM, Rajini Sivaram <rsivaram@pivotal.io>
> wrote:
>
>> Hi Apurva,
>>
>> Thank you, makes sense.
>>
>> Rajini
>>
>> On Wed, Dec 14, 2016 at 7:36 PM, Apurva Mehta <apurva@confluent.io>
>> wrote:
>>
>> > Hi Rajini,
>> >
>> > I think my original response to your point 15 was not accurate. The
>> regular
>> > definition of durability is that data once committed would never be
>> lost.
>> > So it is not enough for only the control messages to be flushed before
>> > being acknowledged -- all the messages (and offset commits) which are
>> part
>> > of the transaction would need to be flushed before being acknowledged as
>> > well.
>> >
>> > Otherwise, it is possible that if all replicas of a topic partition
>> crash
>> > before the transactional messages are flushed, those messages will be
>> lost
>> > even if the commit marker exists in the log. In this case, the
>> transaction
>> > would be 'committed' with incomplete data.
>> >
>> > Right now, there isn't any config which will ensure that the flush to
>> disk
>> > happens before the acknowledgement. We could add it in the future, and
>> get
>> > durability guarantees for kafka transactions.
>> >
>> > I hope this clarifies the situation. The present KIP does not intend to
>> add
>> > the aforementioned config, so even the control messages are susceptible
>> to
>> > being lost if there is a simultaneous crash across all replicas. So
>> > transactions are only as durable as existing Kafka messages. We don't
>> > strengthen any durability guarantees as part of this KIP.
>> >
>> > Thanks,
>> > Apurva
>> >
>> >
>> > On Wed, Dec 14, 2016 at 1:52 AM, Rajini Sivaram <rsivaram@pivotal.io>
>> > wrote:
>> >
>> > > Hi Apurva,
>> > >
>> > > Thank you for the answers. Just one follow-on.
>> > >
>> > > 15. Let me rephrase my original question. If all control messages
>> > (messages
>> > > to transaction logs and markers on user logs) were acknowledged only
>> > after
>> > > flushing the log segment, will transactions become durable in the
>> > > traditional sense (i.e. not restricted to min.insync.replicas
>> failures) ?
>> > > This is not a suggestion to update the KIP. It seems to me that the
>> > design
>> > > enables full durability if required in the future with a rather
>> > > non-intrusive change. I just wanted to make sure I haven't missed
>> > anything
>> > > fundamental that prevents Kafka from doing this.
>> > >
>> > >
>> > >
>> > > On Wed, Dec 14, 2016 at 5:30 AM, Ben Kirwin <ben@kirw.in> wrote:
>> > >
>> > > > Hi Apurva,
>> > > >
>> > > > Thanks for the detailed answers... and sorry for the late reply!
>> > > >
>> > > > It does sound like, if the input-partitions-to-app-id mapping never
>> > > > changes, the existing fencing mechanisms should prevent duplicates.
>> > > Great!
>> > > > I'm a bit concerned the proposed API will be delicate to program
>> > against
>> > > > successfully -- even in the simple case, we need to create a new
>> > producer
>> > > > instance per input partition, and anything fancier is going to need
>> its
>> > > own
>> > > > implementation of the Streams/Samza-style 'task' idea -- but that
>> may
>> > be
>> > > > fine for this sort of advanced feature.
>> > > >
>> > > > For the second question, I notice that Jason also elaborated on this
>> > > > downthread:
>> > > >
>> > > > > We also looked at removing the producer ID.
>> > > > > This was discussed somewhere above, but basically the idea is
to
>> > store
>> > > > the
>> > > > > AppID in the message set header directly and avoid the mapping
to
>> > > > producer
>> > > > > ID altogether. As long as batching isn't too bad, the impact
on
>> total
>> > > > size
>> > > > > may not be too bad, but we were ultimately more comfortable with
a
>> > > fixed
>> > > > > size ID.
>> > > >
>> > > > ...which suggests that the distinction is useful for performance,
>> but
>> > not
>> > > > necessary for correctness, which makes good sense to me. (Would a
>> > 128-bid
>> > > > ID be a reasonable compromise? That's enough room for a UUID, or a
>> > > > reasonable hash of an arbitrary string, and has only a marginal
>> > increase
>> > > on
>> > > > the message size.)
>> > > >
>> > > > On Tue, Dec 6, 2016 at 11:44 PM, Apurva Mehta <apurva@confluent.io>
>> > > wrote:
>> > > >
>> > > > > Hi Ben,
>> > > > >
>> > > > > Now, on to your first question of how deal with consumer
>> rebalances.
>> > > The
>> > > > > short answer is that the application needs to ensure that the
the
>> > > > > assignment of input partitions to appId is consistent across
>> > > rebalances.
>> > > > >
>> > > > > For Kafka streams, they already ensure that the mapping of input
>> > > > partitions
>> > > > > to task Id is invariant across rebalances by implementing a custom
>> > > sticky
>> > > > > assignor. Other non-streams apps can trivially have one producer
>> per
>> > > > input
>> > > > > partition and have the appId be the same as the partition number
>> to
>> > > > achieve
>> > > > > the same effect.
>> > > > >
>> > > > > With this precondition in place, we can maintain transactions
>> across
>> > > > > rebalances.
>> > > > >
>> > > > > Hope this answers your question.
>> > > > >
>> > > > > Thanks,
>> > > > > Apurva
>> > > > >
>> > > > > On Tue, Dec 6, 2016 at 3:22 PM, Ben Kirwin <ben@kirw.in>
wrote:
>> > > > >
>> > > > > > Thanks for this! I'm looking forward to going through the
full
>> > > proposal
>> > > > > in
>> > > > > > detail soon; a few early questions:
>> > > > > >
>> > > > > > First: what happens when a consumer rebalances in the middle
of
>> a
>> > > > > > transaction? The full documentation suggests that such a
>> > transaction
>> > > > > ought
>> > > > > > to be rejected:
>> > > > > >
>> > > > > > > [...] if a rebalance has happened and this consumer
>> > > > > > > instance becomes a zombie, even if this offset message
is
>> > appended
>> > > in
>> > > > > the
>> > > > > > > offset topic, the transaction will be rejected later
on when
>> it
>> > > tries
>> > > > > to
>> > > > > > > commit the transaction via the EndTxnRequest.
>> > > > > >
>> > > > > > ...but it's unclear to me how we ensure that a transaction
can't
>> > > > complete
>> > > > > > if a rebalance has happened. (It's quite possible I'm missing
>> > > something
>> > > > > > obvious!)
>> > > > > >
>> > > > > > As a concrete example: suppose a process with PID 1 adds
offsets
>> > for
>> > > > some
>> > > > > > partition to a transaction; a consumer rebalance happens
that
>> > assigns
>> > > > the
>> > > > > > partition to a process with PID 2, which adds some offsets
to
>> its
>> > > > current
>> > > > > > transaction; both processes try and commit. Allowing both
>> commits
>> > > would
>> > > > > > cause the messages to be processed twice -- how is that
avoided?
>> > > > > >
>> > > > > > Second: App IDs normally map to a single PID. It seems like
one
>> > could
>> > > > do
>> > > > > > away with the PID concept entirely, and just use App IDs
in most
>> > > places
>> > > > > > that require a PID. This feels like it would be significantly
>> > > simpler,
>> > > > > > though it does increase the message size. Are there other
>> reasons
>> > why
>> > > > the
>> > > > > > App ID / PID split is necessary?
>> > > > > >
>> > > > > > On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang <
>> wangguoz@gmail.com
>> > >
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi all,
>> > > > > > >
>> > > > > > > I have just created KIP-98 to enhance Kafka with exactly
once
>> > > > delivery
>> > > > > > > semantics:
>> > > > > > >
>> > > > > > > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>> > > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging>*
>> > > > > > >
>> > > > > > > This KIP adds a transactional messaging mechanism along
with
>> an
>> > > > > > idempotent
>> > > > > > > producer implementation to make sure that 1) duplicated
>> messages
>> > > sent
>> > > > > > from
>> > > > > > > the same identified producer can be detected on the
broker
>> side,
>> > > and
>> > > > > 2) a
>> > > > > > > group of messages sent within a transaction will atomically
be
>> > > either
>> > > > > > > reflected and fetchable to consumers or not as a whole.
>> > > > > > >
>> > > > > > > The above wiki page provides a high-level view of the
proposed
>> > > > changes
>> > > > > as
>> > > > > > > well as summarized guarantees. Initial draft of the
detailed
>> > > > > > implementation
>> > > > > > > design is described in this Google doc:
>> > > > > > >
>> > > > > > > https://docs.google.com/document/d/11Jqy_
>> > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
>> > > > > > > 0wSw9ra8
>> > > > > > >
>> > > > > > >
>> > > > > > > We would love to hear your comments and suggestions.
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > >
>> > > > > > > -- Guozhang
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message