spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <>
Subject Re: Upgrading to Kafka 0.9.x
Date Wed, 02 Mar 2016 19:17:23 GMT
Jay, thanks for the response.

Regarding the new consumer API for 0.9, I've been reading through the code
for it and thinking about how it fits in to the existing Spark integration.
So far I've seen some interesting challenges, and if you (or anyone else on
the dev list) have time to provide some hints, I'd appreciate it.

To recap how the existing Spark integration works (this is all using the
Kafka simple consumer api):

- Single driver node.  For each spark microbatch, it queries Kafka for the
offset high watermark for all topicpartitions of interest.  Creates one
spark partition for each topicpartition.  The partition doesn't have
messages, it just has a lower offset equal to the last consumed position,
upper offset equal to the high water mark. Sends those partitions to the

- Multiple worker nodes.  For each spark partition, it opens a simple
consumer, consumes from kafka the lower to the upper offset for a single
topicpartition, closes the consumer.

This is really blunt, but it actually works better than the integration
based on the older higher level consumer.  Churn of simple consumers on the
worker nodes in practice wasn't much of a problem (because the granularity
of microbatches is rarely under 1 second), so we don't even bother to cache
connections between batches.

The new consumer api presents some interesting challenges

- On the driver node, it would be desirable to be the single member of a
consumer group with dynamic topic subscription (so that users can take
advantage of topic patterns, etc).  Heartbeat happens only on a poll.  But
clearly the driver doesn't actually want to poll any messages, because that
load should be distributed to workers. I've seen KIP-41, which might help
if polling a single message is sufficiently lightweight.  In the meantime
the only things I can think of are trying to subclass to make a .heartbeat
method, or possibly setting max fetch bytes to a very low value.

- On the worker nodes, we aren't going to be able to get away with creating
and closing an instance of the new consumer every microbatch, since
prefetching, security, metadata requests all make that heavier weight than
a simple consumer.  The new consumer doesn't have a way to poll for only a
given number of messages (again KIP-41 would help here).  But the new
consumer also doesn't provide a way to poll for only a given
topicpartition, and the .pause method flushes fetch buffers so it's not an
option either.  I don't see a way to avoid caching one consumer per
topicpartition, which is probably less desirable than e.g. one consumer per

Any suggestions welcome, even if it's "why don't you go work on
implementing KIP-41", or "You're doing it wrong" :)


On Fri, Feb 26, 2016 at 1:36 PM, Jay Kreps <> wrote:

> Hey, yeah, we'd really like to make this work well for you guys.
> I think there are actually maybe two questions here:
> 1. How should this work in steady state?
> 2. Given that there was a major reworking of the kafka consumer java
> library for 0.9 how does that impact things right now? (
> )
> Quick recap of how we do compatibility, just so everyone is on the same
> page:
> 1. The protocol is versioned and the cluster supports multiple versions.
> 2. As we evolve Kafka we always continue to support older versions of the
> protocol an hence older clients continue to work with newer Kafka versions.
> 2. In general we don't try to have the clients support older versions of
> Kafka since, after all, the whole point of the new client is to add
> features which often require those features to be in the broker.
> So I think in steady state the answer is to choose a conservative version
> to build against and it's on us to keep that working as Kafka evolves. As
> always there will be some tradeoff between using the newest features and
> being compatible with old stuff.
> But that steady state question ignores the fact that we did a complete
> rewrite of the consumer in 0.9. The old consumer is still there, supported,
> and still works as before but the new consumer is the path forward and what
> we are adding features to. At some point you will want to migrate to this
> new api, which will be a non-trivial change to your code.
> This api has a couple of advantages for you guys (1) it supports security,
> (2) It allows low-level control over partition assignment and offsets
> without the crazy fiddliness of the old "simple consumer" api, (3) it no
> longer directly accesses ZK, (4) no scala dependency and no dependency on
> Kafka core. I think all four of these should be desirable for Spark et al.
> One thing we could discuss is the possibility of doing forwards and
> backwards compatibility in the clients. I'm not sure this would actually
> make things better, that would probably depend on the details of how it
> worked.
> -Jay
> On Fri, Feb 26, 2016 at 9:46 AM, Mark Grover <> wrote:
>> Hi Kafka devs,
>> I come to you with a dilemma and a request.
>> Based on what I understand, users of Kafka need to upgrade their brokers
>> to
>> Kafka 0.9.x first, before they upgrade their clients to Kafka 0.9.x.
>> However, that presents a problem to other projects that integrate with
>> Kafka (Spark, Flume, Storm, etc.). From here on, I will speak for Spark +
>> Kafka, since that's the one I am most familiar with.
>> In the light of compatibility (or the lack thereof) between 0.8.x and
>> 0.9.x, Spark is faced with a problem of what version(s) of Kafka to be
>> compatible with, and has 2 options (discussed in this PR
>> <>):
>> 1. We either upgrade to Kafka 0.9, dropping support for 0.8. Storm and
>> Flume are already on this path.
>> 2. We introduce complexity in our code to support both 0.8 and 0.9 for the
>> entire duration of our next major release (Apache Spark 2.x).
>> I'd love to hear your thoughts on which option, you recommend.
>> Long term, I'd really appreciate if Kafka could do something that doesn't
>> make Spark having to support two, or even more versions of Kafka. And, if
>> there is something that I, personally, and Spark project can do in your
>> next release candidate phase to make things easier, please do let us know.
>> Thanks!
>> Mark

View raw message