kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics
Date Wed, 15 Mar 2017 21:36:49 GMT
Hi,

I want to pick up this thread again. As there are some concerns about
the "producer per task" design, we did write up an alternative "producer
per thread" design and discuss pros/cons of both approaches:

https://docs.google.com/document/d/1CfOJaa6mdg5o7pLf_zXISV4oE0ZeMZwT_sG1QWgL4EE


Looking forward to your feedback.


-Matthias


On 3/10/17 3:24 AM, Damian Guy wrote:
> Hi Matthias,
> 
> Thanks for the response. I agree with you regarding the use of
> PartitionGrouper to reduce the number of tasks. It would be good to have an
> idea of any additional load on the brokers as we increase the number of
> tasks and therefore producers.
> 
> Thanks,
> Damian
> 
> On Wed, 8 Mar 2017 at 01:45 Matthias J. Sax <matthias@confluent.io> wrote:
> 
>> Damian, Jun,
>>
>> Thanks for your input.
>>
>>
>> About Performance test:
>>
>> I can follow up with more performance tests using more partitions and
>> also collecting broker metrics.
>>
>> However, I want to highlight again, that even if 1000+ partitions would
>> be problematic, one can simply implement PartitionGrouper interface and
>> reduce the number of tasks to 250 or 100... So I am not sure, if we
>> should block this KIP, even if there might be some performance penalty
>> for currently single partitioned tasks.
>>
>> About memory usage. JXM max-heap and max-off-heap did report 256MB and
>> 133MB for all experiments (thus I did not put it in the spreadsheet).
>> Thus, using 100 producers (each using a max of 32MB of memory) was not
>> an issue with regard to memory consumption. I did not track "current
>> head/off-heap" memory as this would require a more advance test setup to
>> monitor it over time. If you think this would be required, we can do
>> some tests though.
>>
>> However, as 256 MB was enough memory, and there are other components
>> next to the producers using memory, I don't expect a severely increased
>> memory usage. Producer allocate memory on-demand, and if load is shared
>> over multiple producers, overall memory usage should stay the same as a
>> single producer should allocate less memory.
>>
>>
>> About Batching:
>>
>> As you can see from the benchmarks (in the detailed view -- I also added
>> some graphs to the summary now) the average batch size gets slightly
>> decrease with an increased number of partitions. However, there is no
>> big difference between "producer per thread" and "producer per task"
>> scenario.
>>
>>
>> About acks:
>>
>> This is covered by KIP-98 already. If idempotent producer is use, it's
>> required to set max.in.flight.requests.per.connection=1 and retries > 0
>> -- otherwise a config exception will be thrown. For transactions, it's
>> further required that acks=-1 to avoid a config exception.
>>
>> Other bits, like min.isr, replication.factor, etc. (ie, all broker/topic
>> configs) are out of scope, and it's user responsibility to set those
>> values correctly to ensure transactionality and idempotency.
>>
>>
>>
>> -Matthias
>>
>>
>> On 3/7/17 9:32 AM, Jun Rao wrote:
>>> Hi, Guozhang,
>>>
>>> Thanks for the KIP. A couple of comments.
>>>
>>> 1. About the impact on producer batching. My understanding is that
>>> typically different sub-topologies in the same task are publishing to
>>> different topics. Since the producer batching happens at the
>>> topic/partition level, using a producer per task may not impact batching
>>> much.
>>>
>>> 2. When processing.guarantee is set to exactly_once, do we want to
>> enforce
>>> acks to all in the producer? The default acks is 1 and may cause acked
>> data
>>> to be lost later when the leader changes.
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <damian.guy@gmail.com> wrote:
>>>
>>>> Hi Matthias,
>>>>
>>>> Thanks. The perf test is a good start but I don't think it goes far
>> enough.
>>>> 100 partitions is not a lot. What happens when there are thousands of
>>>> partitions? What is the load on the brokers? How much more memory is
>> used
>>>> by the Streams App etc?
>>>>
>>>> Thanks,
>>>> Damian
>>>>
>>>> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <matthias@confluent.io>
>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I want to give a first respond:
>>>>>
>>>>>
>>>>>
>>>>> 1. Producer per task:
>>>>>
>>>>> First, we did some performance tests, indicating that the performance
>>>>> penalty is small. Please have a look here:
>>>>>
>>>>> https://docs.google.com/spreadsheets/d/18aGOB13-
>>>> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
>>>>>
>>>>> For the test, we ran with a trunk version and a modified version that
>>>>> uses a producer per task (of course, no transactions, but at-least-once
>>>>> semantics). The scaling factor indicates the number of brokers and
>>>>> (single threaded) Streams instances. We used SimpleBenchmark that is
>>>>> part of AK code base.
>>>>>
>>>>>
>>>>> Second, as the design is "producer per task" (and not "producer per
>>>>> partition") it is possible to specify a custom PartitionGrouper that
>>>>> assigns multiple partitions to a single task. Thus, it allows to reduce
>>>>> the number of tasks for scenarios with many partitions. Right now, this
>>>>> interface must be implemented solely by the user, but we could also add
>>>>> a new config parameter that specifies the max.number.of.tasks or
>>>>> partitions.per.task so that the user can configure this instead of
>>>>> implementing the interface.
>>>>>
>>>>> Third, there is the idea of a "Producer Pool" that would allow to share
>>>>> resources (network connections, memory, etc) over multiple producers.
>>>>> This would allow to separate multiple transaction on the producer
>> level,
>>>>> while resources are shared. There is no detailed design document yet
>> and
>>>>> there would be a KIP for this feature.
>>>>>
>>>>> Thus, if there should be any performance problems for high scale
>>>>> scenarios, there are multiple ways to tackle them while keeping the
>>>>> "producer per task" design.
>>>>>
>>>>> Additionally, a "producer per thread" design would be way more complex
>>>>> and I summarized the issues in a separate document. I will share a link
>>>>> to the document soon.
>>>>>
>>>>>
>>>>>
>>>>> 2. StateStore recovery:
>>>>>
>>>>> Streams EoS will in the first design not allow to exploit the
>>>>> improvements that are added for 0.11 at the moment. However, as 0.10.2
>>>>> faces the same issues of potentially long recovery, there is no
>>>>> regression with this regard. Thus, I see those improvements as
>>>>> orthogonal or add-ons. Nevertheless, we should try to explore those
>>>>> options and if possible get them into 0.11 such that Streams with EoS
>>>>> gets the same improvements as at-least-once scenario.
>>>>>
>>>>>
>>>>>
>>>>> 3. Caching:
>>>>>
>>>>> We might need to do some experiments to quantify the impact on caching.
>>>>> If it's severe, the suggested default commit interval of 100ms could
>>>>> also be increased. Also, EoS will not enforce any commit interval, but
>>>>> only change the default value. Thus, a user can freely trade-off
>> latency
>>>>> vs. caching-effect.
>>>>>
>>>>> Last but not least, there is the idea to allow "read_uncommitted" for
>>>>> intermediate topic. This would be an advance design for Streams EoS
>> that
>>>>> allows downstream sub-topologies to read uncommitted data
>>>>> optimistically. In case of failure, a cascading abort of transactions
>>>>> would be required. This change will need another KIP.
>>>>>
>>>>>
>>>>>
>>>>> 4. Idempotent Producer:
>>>>>
>>>>> The transactional part automatically leverages the idempotent
>> properties
>>>>> of the producer. Idempotency is a requirement:
>>>>>
>>>>>> Note that enable.idempotence must be enabled if a TransactionalId
is
>>>>> configured.
>>>>>
>>>>> See
>>>>>
>>>>>
>> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
>>>> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
>>>>>
>>>>> All idempotent retries, are handled by the producer internally (with
or
>>>>> without transaction) if enable.idempotence is set to true.
>>>>>
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>>
>>>>> On 3/3/17 3:34 AM, Eno Thereska wrote:
>>>>>> Another question:
>>>>>>
>>>>>> The KIP doesn’t exactly spell out how it uses the idempotence
>> guarantee
>>>>> from KIP-98. It seems that only the transactional part is needed. Or
is
>>>> the
>>>>> idempotence guarantee working behind the scenes and helping for some
>>>>> scenarios for which it is not worthwhile aborting a transaction (e.g.,
>>>>> retransmitting a record after a temporary network glitch)?
>>>>>>
>>>>>> Thanks
>>>>>> Eno
>>>>>>
>>>>>>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <jay@confluent.io>
wrote:
>>>>>>>
>>>>>>> I second the concern on with the one producer per task approach.
At a
>>>>>>> high-level it seems to make sense but I think Damian is exactly
right
>>>>> that
>>>>>>> that cuts against the general design of the producer. Many people
>> have
>>>>> high
>>>>>>> input partition counts and will have high task counts as a result.
I
>>>>> think
>>>>>>> processing 1000 partitions should not be an unreasonable thing
to
>> want
>>>>> to
>>>>>>> do.
>>>>>>>
>>>>>>> The tricky bits will be:
>>>>>>>
>>>>>>>   - Reduced effectiveness of batching (or more latency and memory
to
>>>> get
>>>>>>>   equivalent batching). This doesn't show up in simple benchmarks
>>>>> because
>>>>>>>   much of the penalty is I/O and CPU on the broker and the additional
>>>>> threads
>>>>>>>   from all the producers can make a single-threaded benchmark
seem
>>>>> faster.
>>>>>>>   - TCP connection explosion. We maintain one connection per
broker.
>>>>> This
>>>>>>>   is already high since each app instance does this. This design
>>>> though
>>>>> will
>>>>>>>   add an additional multiplicative factor based on the partition
>> count
>>>>> of the
>>>>>>>   input.
>>>>>>>   - Connection and metadata request storms. When an instance
with
>> 1000
>>>>>>>   tasks starts up it is going to try to create many thousands
of
>>>>> connections
>>>>>>>   and issue a thousand metadata requests all at once.
>>>>>>>   - Memory usage. We currently default to 64MB per producer.
This can
>>>> be
>>>>>>>   tuned down, but the fact that we are spreading the batching
over
>>>> more
>>>>>>>   producers will fundamentally mean we need a lot more memory
to get
>>>>> good
>>>>>>>   perf and the memory usage will change as your task assignment
>>>> changes
>>>>> so it
>>>>>>>   will be hard to set correctly unless it is done automatically.
>>>>>>>   - Metrics explosion (1000 producer instances, each with their
own
>>>>>>>   metrics to monitor).
>>>>>>>   - Thread explosion, 1000 background threads, one per producer,
each
>>>>>>>   sending data.
>>>>>>>
>>>>>>> -Jay
>>>>>>>
>>>>>>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <damian.guy@gmail.com>
>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Guozhang,
>>>>>>>>
>>>>>>>> Thanks for the KIP! This is an important feature for Kafka
Streams
>>>> and
>>>>> will
>>>>>>>> help to unlock a bunch of use cases.
>>>>>>>>
>>>>>>>> I have some concerns/questions:
>>>>>>>>
>>>>>>>>   1. Producer per task: I'm worried about the overhead this
is going
>>>> to
>>>>>>>>   put on both the streams app and the Kafka Brokers. You
can easily
>>>>>>>> imagine
>>>>>>>>   an app consuming thousands of partitions. What load will
this put
>>>> on
>>>>> the
>>>>>>>>   brokers? Am i correct in assuming that there will be metadata
>>>>> requests
>>>>>>>> per
>>>>>>>>   Producer? The memory overhead in the streams app will also
>> increase
>>>>>>>> fairly
>>>>>>>>   significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_
>>>> CONFIG?
>>>>>>>>   2. State Store recovery: As we already know, restoring
the entire
>>>>>>>>   changelog can take an extremely long time. Even with a
fairly
>> small
>>>>>>>> dataset
>>>>>>>>   and an inappropriately tuned segment size, this can take
way too
>>>>> long.
>>>>>>>> My
>>>>>>>>   concern is that failures happen and then recovery takes
"forever"
>>>>> and we
>>>>>>>>   end up in a situation where we need to change the
>> max.poll.interval
>>>>> to
>>>>>>>> be
>>>>>>>>   some very large number or else we end up in "rebalance
hell". I
>>>> don't
>>>>>>>> think
>>>>>>>>   this provides a very good user experience. You mention
RocksDB
>>>>>>>>   checkpointing in the doc - should we explore this idea
some more?
>>>>> i.e.,
>>>>>>>>   understand the penalty for checkpointing. Maybe checkpoint
every
>>>> *n*
>>>>>>>>    commits?
>>>>>>>>   3. What does EoS mean for Caching? If we set the commit
interval
>> to
>>>>>>>>   100ms then the cache is not going to be very effective.
Should it
>>>>> just
>>>>>>>> be
>>>>>>>>   disabled?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Damian
>>>>>>>>
>>>>>>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wangguoz@gmail.com>
>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> I have just created KIP-129 to leverage KIP-98 in Kafka
Streams and
>>>>>>>> provide
>>>>>>>>> exactly-once processing semantics:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>> 129%3A+Streams+Exactly-Once+Semantics
>>>>>>>>>
>>>>>>>>> This KIP enables Streams users to optionally turn on
exactly-once
>>>>>>>>> processing semantics without changing their app code
at all by
>>>>> leveraging
>>>>>>>>> the transactional messaging features provided in KIP-98.
>>>>>>>>>
>>>>>>>>> The above wiki page provides a high-level view of the
proposed
>>>>> changes,
>>>>>>>>> while detailed implementation design can be found in
this Google
>>>> doc:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
>>>>>>>> FK1DAB8_gBYA2c
>>>>>>>>>
>>>>>>>>> We would love to hear your comments and suggestions.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> -- Guozhang
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
> 


Mime
View raw message