spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hari Shreedharan" <>
Subject Re: Which committers care about Kafka?
Date Thu, 18 Dec 2014 21:27:00 GMT
I get what you are saying. But getting exactly once right is an extremely hard problem - especially
in presence of failure. The issue is failures can happen in a bunch of places. For example,
before the notification of downstream store being successful reaches the receiver that updates
the offsets, the node fails. The store was successful, but duplicates came in either way.
This is something worth discussing by itself - but without uuids etc this might not really
be solved even when you think it is.

Anyway, I will look at the links. Even I am interested in all of the features you mentioned
- no HDFS WAL for Kafka and once-only delivery, but I doubt the latter is really possible
to guarantee - though I really would love to have that!

Thanks, Hari

On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger <>

> Thanks for the replies.
> Regarding skipping WAL, it's not just about optimization.  If you actually
> want exactly-once semantics, you need control of kafka offsets as well,
> including the ability to not use zookeeper as the system of record for
> offsets.  Kafka already is a reliable system that has strong ordering
> guarantees (within a partition) and does not mandate the use of zookeeper
> to store offsets.  I think there should be a spark api that acts as a very
> simple intermediary between Kafka and the user's choice of downstream store.
> Take a look at the links I posted - if there's already been 2 independent
> implementations of the idea, chances are it's something people need.
> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>> wrote:
>> Hi Cody,
>> I am an absolute +1 on SPARK-3146. I think we can implement something
>> pretty simple and lightweight for that one.
>> For the Kafka DStream skipping the WAL implementation - this is something
>> I discussed with TD a few weeks ago. Though it is a good idea to implement
>> this to avoid unnecessary HDFS writes, it is an optimization. For that
>> reason, we must be careful in implementation. There are a couple of issues
>> that we need to ensure works properly - specifically ordering. To ensure we
>> pull messages from different topics and partitions in the same order after
>> failure, we’d still have to persist the metadata to HDFS (or some other
>> system) - this metadata must contain the order of messages consumed, so we
>> know how to re-read the messages. I am planning to explore this once I have
>> some time (probably in Jan). In addition, we must also ensure bucketing
>> functions work fine as well. I will file a placeholder jira for this one.
>> I also wrote an API to write data back to Kafka a while back -
>> . I am hoping that this will
>> get pulled in soon, as this is something I know people want. I am open to
>> feedback on that - anything that I can do to make it better.
>> Thanks,
>> Hari
>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell <>
>> wrote:
>>> Hey Cody,
>>> Thanks for reaching out with this. The lead on streaming is TD - he is
>>> traveling this week though so I can respond a bit. To the high level
>>> point of whether Kafka is important - it definitely is. Something like
>>> 80% of Spark Streaming deployments (anecdotally) ingest data from
>>> Kafka. Also, good support for Kafka is something we generally want in
>>> Spark and not a library. In some cases IIRC there were user libraries
>>> that used unstable Kafka API's and we were somewhat waiting on Kafka
>>> to stabilize them to merge things upstream. Otherwise users wouldn't
>>> be able to use newer Kakfa versions. This is a high level impression
>>> only though, I haven't talked to TD about this recently so it's worth
>>> revisiting given the developments in Kafka.
>>> Please do bring things up like this on the dev list if there are
>>> blockers for your usage - thanks for pinging it.
>>> - Patrick
>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger <>
>>> wrote:
>>> > Now that 1.2 is finalized... who are the go-to people to get some
>>> > long-standing Kafka related issues resolved?
>>> >
>>> > The existing api is not sufficiently safe nor flexible for our
>>> production
>>> > use. I don't think we're alone in this viewpoint, because I've seen
>>> > several different patches and libraries to fix the same things we've
>>> been
>>> > running into.
>>> >
>>> > Regarding flexibility
>>> >
>>> >
>>> >
>>> > has been outstanding since August, and IMHO an equivalent of this is
>>> > absolutely necessary. We wrote a similar patch ourselves, then found
>>> that
>>> > PR and have been running it in production. We wouldn't be able to get
>>> our
>>> > jobs done without it. It also allows users to solve a whole class of
>>> > problems for themselves (e.g. SPARK-2388, arbitrary delay of messages,
>>> etc).
>>> >
>>> > Regarding safety, I understand the motivation behind WriteAheadLog as a
>>> > general solution for streaming unreliable sources, but Kafka already is
>>> a
>>> > reliable source. I think there's a need for an api that treats it as
>>> > such. Even aside from the performance issues of duplicating the
>>> > write-ahead log in kafka into another write-ahead log in hdfs, I need
>>> > exactly-once semantics in the face of failure (I've had failures that
>>> > prevented reloading a spark streaming checkpoint, for instance).
>>> >
>>> > I've got an implementation i've been using
>>> >
>>> >
>>> > /src/main/scala/org/apache/spark/rdd/kafka
>>> >
>>> > Tresata has something similar at,
>>> > and I know there were earlier attempts based on Storm code.
>>> >
>>> > Trying to distribute these kinds of fixes as libraries rather than
>>> patches
>>> > to Spark is problematic, because large portions of the implementation
>>> are
>>> > private[spark].
>>> >
>>> > I'd like to help, but i need to know whose attention to get.
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail:
>>> For additional commands, e-mail:
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message