kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jay Kreps (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer
Date Thu, 08 Jun 2017 21:41:18 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16043494#comment-16043494

Jay Kreps commented on KAFKA-1955:

I think the patch I submitted was kind of a cool hack, but after thinking about it I don't
think it is really what you want.

Here are the considerations I thought we should probably think through:
1. How will recovery work? The patch I gave didn't have a mechanism to recover from a failure.
In the case of a OS crash the OS gives very weak guarantees on what is on disk for any data
that hasn't been fsync'd. Not only can arbitrary bits of data be missing but it is even possible
with some FS configurations to get arbitrary corrupt blocks that haven't been zero'd yet.
I think to get this right you need a commit log and recovery proceedure that verifies unsync'd
data on startup. I'm not 100% sure you can do this with just the buffer pool, though maybe
you can.
2. What are the ordering guarantees for buffered data?
3. How does this interact with transactions/EOS?
4. Should it be the case that all writes go through the commit log or should it be the case
that only failures are journaled. If you journal all writes prior to sending to the server,
the problem is that that amounts to significant overhead and leads to the possibility that
logging or other I/O can slow you down. If you journal only failures you have the problem
that your throughput may be very high in the non-failure scenario and then when Kafka goes
down suddenly you start doing I/O but that is much slower and your throughput drops precipitously.
Either may be okay but it is worth thinking through what the right behavior is.

> Explore disk-based buffering in new Kafka Producer
> --------------------------------------------------
>                 Key: KAFKA-1955
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1955
>             Project: Kafka
>          Issue Type: Improvement
>          Components: producer 
>    Affects Versions:
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: KAFKA-1955.patch, KAFKA-1955-RABASED-TO-8th-AUG-2015.patch
> There are two approaches to using Kafka for capturing event data that has no other "source
of truth store":
> 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would a database.
> 2. Write to some kind of local disk store and copy from that to Kafka.
> The cons of the second approach are the following:
> 1. You end up depending on disks on all the producer machines. If you have 10000 producers,
that is 10k places state is kept. These tend to fail a lot.
> 2. You can get data arbitrarily delayed
> 3. You still don't tolerate hard outages since there is no replication in the producer
> 4. This tends to make problems with duplicates more common in certain failure scenarios.
> There is one big pro, though: you don't have to keep Kafka running all the time.
> So far we have done nothing in Kafka to help support approach (2), but people have built
a lot of buffering things. It's not clear that this is necessarily bad.
> However implementing this in the new Kafka producer might actually be quite easy. Here
is an idea for how to do it. Implementation of this idea is probably pretty easy but it would
require some pretty thorough testing to see if it was a success.
> The new producer maintains a pool of ByteBuffer instances which it attempts to recycle
and uses to buffer and send messages. When unsent data is queuing waiting to be sent to the
cluster it is hanging out in this pool.
> One approach to implementing a disk-baked buffer would be to slightly generalize this
so that the buffer pool has the option to use a mmap'd file backend for it's ByteBuffers.
When the BufferPool was created with a totalMemory setting of 1GB it would preallocate a 1GB
sparse file and memory map it, then chop the file into batchSize MappedByteBuffer pieces and
populate it's buffer with those.
> Everything else would work normally except now all the buffered data would be disk backed
and in cases where there was significant backlog these would start to fill up and page out.
> We currently allow messages larger than batchSize and to handle these we do a one-off
allocation of the necessary size. We would have to disallow this when running in mmap mode.
However since the disk buffer will be really big this should not be a significant limitation
as the batch size can be pretty big.
> We would want to ensure that the pooling always gives out the most recently used ByteBuffer
(I think it does). This way under normal operation where requests are processed quickly a
given buffer would be reused many times before any physical disk write activity occurred.
> Note that although this let's the producer buffer very large amounts of data the buffer
isn't really fault-tolerant, since the ordering in the file isn't known so there is no easy
way to recovery the producer's buffer in a failure. So the scope of this feature would just
be to provide a bigger buffer for short outages or latency spikes in the Kafka cluster during
which you would hope you don't also experience failures in your producer processes.
> To complete the feature we would need to:
> a. Get some unit tests that would cover disk-backed usage
> b. Do some manual performance testing of this usage and understand the impact on throughput.
> c. Do some manual testing of failure cases (i.e. if the broker goes down for 30 seconds
we should be able to keep taking writes) and observe how well the producer handles the catch
up time when it has a large backlog to get rid of.
> d. Add a new configuration for the producer to enable this, something like use.file.buffers=true/false.
> e. Add documentation that covers these new options.

This message was sent by Atlassian JIRA

View raw message