kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Gustafson <ja...@confluent.io>
Subject Re: Kafka Consumer thoughts
Date Sat, 01 Aug 2015 00:44:00 GMT
Hi Jun,

This is still debatable, but I think it makes the most sense to keep
pause/resume independent of assignment. Otherwise we still get into the
weird ordering problems that we were trying to resolve before. To me,
pause/resume expresses clearly the intent to suppress consumption from a
set of partitions without changing assignment. And it seems intuitive that
you should be able to seek or get the position of a paused partition. This
is less clear to me if assign is used for pause as well.


On Fri, Jul 31, 2015 at 4:55 PM, Jun Rao <jun@confluent.io> wrote:

> Jason,
> I guess that with the new setAssignment() api, we will also be getting
> rid of pause() and resume()?
> Thanks,
> Jun
> On Fri, Jul 31, 2015 at 11:29 AM, Jason Gustafson <jason@confluent.io>
> wrote:
>> I was thinking a little bit this morning about the subscription API and I
>> have a few ideas on how to address some of the concerns about intuitiveness
>> and exception handling.
>> 1. Split the current notion of topic/partition subscription into
>> subscription of topics and assignment of partitions. These concepts are
>> pretty fundamentally different and I think at least some of the confusion
>> about when subscriptions() can be used is caused by the fact that we
>> overload the term. If instead that method is renamed to assignment(), then
>> we are communicating to users that it is possible to have a subscription
>> without an active assignment, which is not obvious with the current API.
>> The code in fact already separates these concepts internally, so this would
>> just expose it to the user.
>> 2. Merge rebalance callback into a subscription callback and add method a
>> way to handle errors. The consumer's current rebalance callback is
>> basically invoked when a subscription "succeeds," so it seems a little
>> weird to also provide a callback on subscription. Perhaps we can just take
>> the rebalance callback out of configuration and have the user provide it on
>> subscribe(). We can add a method to the callback to handle errors (e.g. for
>> non-existing topics). Since the callback is provided at subscribe time, it
>> should be clearer to the user that the assignment will not be ready
>> immediately when subscribe returns. It's also arguably a little more
>> natural to set this callback at subscription time rather than when the
>> consumer is constructed.
>> 3. Get rid of the additive subscribe methods and just use setSubscription
>> which would clear the old subscription. After you start providing callbacks
>> to subscribe, then the implementation starts to get tricky if each call to
>> subscribe provides a separate callback. Instead, as Jay suggested, we could
>> just provide a way to set the full list of subscriptions at once, and then
>> there is only one callback to maintain.
>> With these points, the API might look something like this:
>> void setSubscription(List<String> topics, RebalanceCallback callback);
>> void setAssignment(List<TopicPartition> partitions);
>> List<String> subscription();
>> List<TopicPartition> assignment();
>> interface RebalanceCallback {
>>   void onAssignment(List<TopicPartition> partitions);
>>   void onRevocation(List<TopicPartition> partitions);
>>   // handle non-existing topics, etc.
>>   void onError(Exception e);
>> }
>> Any thoughts?
>> -Jason
>> On Thu, Jul 30, 2015 at 11:59 AM, Jay Kreps <jay@confluent.io> wrote:
>>> Hey Becket,
>>> Yeah the high-level belief here is that it is possible to give something
>>> as high level as the existing "high level" consumer, but this is not likely
>>> to be the end-all be-all of high-level interfaces for processing streams of
>>> messages. For example neither of these interfaces handles the threading
>>> model for the processing, which obviously is a fairly low-level
>>> implementation detail left to the user in you proposal, the current code,
>>> as well as the existing scala consumer.
>>> There will be many of these: the full-fledged stream processing
>>> frameworks like Storm/Spark, scalaz streams, the RxJava stuff, a more
>>> traditional message queue like "processor" interface, not to mention the
>>> stuff we're trying to do with KIP-28. For these frameworks it will be quite
>>> weird to add a bunch of new threads since they will want to dictate the
>>> threading model.
>>> What will be a major failure though is if this client isn't low-level
>>> enough and we need to introduce another layer underneath. This would happen
>>> either because we dictate too much to make it usable for various
>>> applications, frameworks, or use cases. This is the concern with dictating
>>> threading and processing models.
>>> So to summarize the goal is to subsume the existing APIs, which I think
>>> we all agree this does, and be a foundation on which to build other
>>> abstractions.
>>> WRT KIP-28, I think it is quite general and if we do that right it will
>>> subsume a lot of the higher level processing and will give a full threaded
>>> processing model to the user.
>>> -Jay
>>> On Wed, Jul 29, 2015 at 6:25 PM, Jiangjie Qin <jqin@linkedin.com> wrote:
>>>> Thanks for the comments Jason and Jay.
>>>> Jason, I had the same concern for producer's callback as well before,
>>>> but it seems to be fine from some callbacks I wrote - user can always pass
>>>> in object in the constructor if necessary for synchronization.
>>>> Jay, I agree that the current API might be fine for people who wants to
>>>> wrap it up. But I thought the new consumer was supposed to be a combination
>>>> of old high and low level consumer, which means it should be able to be
>>>> used as is, just like producer. If KafkaConsumer is designed to be wrapped
>>>> up for use, then the question becomes whether Kafka will provide a decent
>>>> wrapper or not? Neha mentioned that KIP-28 will address the users who only
>>>> care about data. Would that be the wrapper provided by Kafka? I am not sure
>>>> if that is sufficient though because the processor is highly abstracted,
>>>> and might only meet the static data stream requirement as I listed in the
>>>> grid. For users who need something from the other grids, are we going to
>>>> have another wrapper? Or are we expecting all the user to write their own
>>>> wrapper for KafkaConsumer? Some other comments are in line.
>>>> Thanks,
>>>> Jiangjie (Becket) Qin
>>>> On Wed, Jul 29, 2015 at 3:16 PM, Jay Kreps <jay@confluent.io> wrote:
>>>>> Some comments on the proposal:
>>>>> I think we are conflating a number of things that should probably be
>>>>> addressed individually because they are unrelated. My past experience is
>>>>> that this always makes progress hard. The more we can pick apart these
>>>>> items the better:
>>>>>    1. threading model
>>>>>    2. blocking vs non-blocking semantics
>>>>>    3. missing apis
>>>>>    4. missing javadoc and other api surprises
>>>>>    5. Throwing exceptions.
>>>>> The missing APIs are getting added independently. Some like your
>>>>> proposed offsetByTime where things we agreed to hold off on for the first
>>>>> release and do when we'd thought it through. If there are uses for it now
>>>>> we can accelerate. I think each of these is really independent, we know
>>>>> there are things that need to be added but lumping them all into one
>>>>> discussion will be confusing.
>>>>> WRT throwing exceptions the policy is to throw exceptions that are
>>>>> unrecoverable and handle and log other exceptions that are transient. That
>>>>> policy makes sense if you go through the thought exercise of "what will the
>>>>> user do if i throw this exception to them" if they have no other rational
>>>>> response but to retry (and if failing to anticipate and retry with that
>>>>> exception will kill their program) . You can argue whether the topic not
>>>>> existing is transient or not, unfortunately the way we did auto-creation
>>>>> makes it transient if you are in "auto create mode" and non-transient
>>>>> otherwise (ick!). In any case this is an orthogonal discussion to
>>>>> everything else. I think the policy is right and if we don't conform to it
>>>>> in some way that is really an independent bug/discussion.
>>>> Agreed we can discuss about them separately.
>>>>> I suggest we focus on threading and the current event-loop style of
>>>>> api design since I think that is really the crux.
>>>>> The analogy between the producer threading model and the consumer
>>>>> model actually doesn't work for me. The goal of the producer is actually to
>>>>> take requests from many many user threads and shove them into a single
>>>>> buffer for batching. So the threading model isn't the 1:1 threads you
>>>>> describe it is N:1.The goal of the consumer is to support single-threaded
>>>>> processing. This is what drives the difference. Saying that the producer
>>>>> has N:1 threads therefore for the consumer should have 1:1 threads instead
>>>>> of just 1 thread doesn't make sense any more then an analogy to the brokers
>>>>> threading model would--the problem we're solving is totally different.
>>>> I think the ultimate goal for producer and consumer are still allowing
>>>> user to send/receive data in parallel. In producer we picked the solution
>>>> of one-producer-serving-multiple-threads, and in consumer we picked
>>>> multiple-single-threaded-consumers instead of
>>>> single-consumer-serving-multiple threads. And we believe people can always
>>>> implement the latter with the former. I think this is a reasonable
>>>> decision. However, there are also reasonable concerns over the
>>>> multiple-single-threaded-consumers solution which is that the single-thread
>>>> might have to be a dedicate polling thread in many cases which pushes user
>>>> towards the other solution - i.e. implementing a
>>>> single-thread-consumer-serving-multiple-threads wrapper. From what we hear,
>>>> it seems to be a quite common concern for most of the users we talked to.
>>>> Plus the adoption bar of the consumer will be much higher because user will
>>>> have to understand some of the details of the things they don't care as
>>>> listed in the grid.
>>>> The analogy between producer/consumer is intended to show that a
>>>> separate polling thread will solve the concerns we have.
>>>> I think ultimately though what you need to think about is, does an
>>>>> event loop style of API make sense? That is the source of all the issues
>>>>> you describe. This style of API is incredibly prevalent from unix select to
>>>>> GUIs to node.js. It's a great way to model multiple channels of messages
>>>>> coming in. It is a fantastic style for event processing. Programmers
>>>>> understand this style of api though I would agree it is unusual compared to
>>>>> blocking apis. But it is is a single threaded processing model. The current
>>>>> approach is basically a pure event loop with some convenience methods that
>>>>> are effectively "poll until X is complete".
>>>>> I think basically all the confusion you are describing comes from not
>>>>> documenting/expecting an event loop. The "if you don't call poll nothing
>>>>> happens" point is basically this. It's an event loop. You have to loop. You
>>>>> can't not call poll. The docs don't cover this right now, perhaps. I think
>>>>> if they do it's not unreasonable behavior.
>>>> I'm not sure if I understand the event-loop correctly and honestly I
>>>> did not think about it clearly before. My understanding is that an
>>>> even-loop model means a single listener thread, but there can be multiple
>>>> event generator threads. The downside is that the listener thread has to be
>>>> fast and very careful about blocking. If we look at the consumer, the
>>>> current model is the caller thread itself act as both event generator and
>>>> listener. As a generator, it generates different task by calling the
>>>> convenience methods. As a listener, it listens to the messages on broker
>>>> and also the tasks generated by itself. So in our proposal, we are not
>>>> changing the event-loop model here just separated the event generator and
>>>> event listener. It looks to me that the underlying execution thread follows
>>>> the event-loop model, the special thing might be it is not only listening
>>>> to the messages from broker, but also listening to the tasks from the user
>>>> thread. This is essentially the thing a consumer has to do - interact with
>>>> both server and user.
>>>>> If we want to move away from an event loop I'm not sure *any* aspect
>>>>> of the current event loop style of api makes sense any more. I am not
>>>>> totally married to event loops, but i do think what we have gives an
>>>>> elegant way of implementing any higher level abstractions that would fully
>>>>> implement the user's parallelism model. I don't want to go rethink
>>>>> everything but I do think a half-way implementation that is event loop +
>>>>> background threads is likely going to be icky.
>>>> We brought this up before to change the consumer.poll() to
>>>> consumer.consume(). And did not do so simply because we wanted to less
>>>> change in API... I might be crazy but can we think of the proposed model as
>>>> processing thread + event-loop instead, rather than event-loop + background
>>>> thread?
>>>>> WRT making it configurable whether liveness means "actually consuming"
>>>>> or "background thread running" I would suggest that that is really the
>>>>> worst outcome. These type of "modes" that are functionally totally
>>>>> different are just awful from a documentation, testing, usability, etc pov.
>>>>> I would strongly prefer we pick either of these, document it, and make it
>>>>> work well rather than trying to do both.
>>>> Previously I thought this was the major benefit we wanted from a single
>>>> threaded model, personally I don't have a strong preference on this. So I
>>>> am OK with either way.
>>>>> -Jay
>>>>> On Wed, Jul 29, 2015 at 1:20 PM, Neha Narkhede <neha@confluent.io>
>>>>> wrote:
>>>>>> Works now. Thanks Becket!
>>>>>> On Wed, Jul 29, 2015 at 1:19 PM, Jiangjie Qin <jqin@linkedin.com>
>>>>>> wrote:
>>>>>>> Ah... My bad, forgot to change the URL link for pictures.
>>>>>>> Thanks for the quick response, Neha. It should be fixed now, can you
>>>>>>> try again?
>>>>>>> Jiangjie (Becket) Qin
>>>>>>> On Wed, Jul 29, 2015 at 1:10 PM, Neha Narkhede <neha@confluent.io>
>>>>>>> wrote:
>>>>>>>> Thanks Becket. Quick comment - there seem to be a bunch of images
>>>>>>>> that the wiki refers to, but none loaded for me. Just making sure if its
>>>>>>>> just me or can everyone not see the pictures?
>>>>>>>> On Wed, Jul 29, 2015 at 12:00 PM, Jiangjie Qin <jqin@linkedin.com>
>>>>>>>> wrote:
>>>>>>>>> I agree with Ewen that a single threaded model will be tricky to
>>>>>>>>> implement the same conventional semantic of async or Future. We just
>>>>>>>>> drafted the following wiki which explains our thoughts in LinkedIn on the
>>>>>>>>> new consumer API and threading model.
>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal
>>>>>>>>> We were trying to see:
>>>>>>>>> 1. If we can use some kind of methodology to help us think about
>>>>>>>>> what API we want to provide to user for different use cases.
>>>>>>>>> 2. What is the pros and cons of current single threaded model. Is
>>>>>>>>> there a way that we can maintain the benefits while solve the issues we are
>>>>>>>>> facing now with single threaded model.
>>>>>>>>> Thanks,
>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>> On Tue, Jul 28, 2015 at 10:28 PM, Ewen Cheslack-Postava <
>>>>>>>>> ewen@confluent.io> wrote:
>>>>>>>>>> On Tue, Jul 28, 2015 at 5:18 PM, Guozhang Wang <
>>>>>>>>>> wangguoz@gmail.com> wrote:
>>>>>>>>>>> I think Ewen has proposed these APIs for using callbacks along
>>>>>>>>>>> with returning future in the commit calls, i.e. something similar to:
>>>>>>>>>>> public Future<void> commit(ConsumerCommitCallback callback);
>>>>>>>>>>> public Future<void> commit(Map<TopicPartition, Long> offsets,
>>>>>>>>>>> ConsumerCommitCallback callback);
>>>>>>>>>>> At that time I was slightly intending not to include the Future
>>>>>>>>>>> besides adding the callback mainly because of the implementation complexity
>>>>>>>>>>> I feel it could introduce along with the retry settings after looking
>>>>>>>>>>> through the code base. I would happy to change my mind if we could propose
>>>>>>>>>>> a prototype implementation that is simple enough.
>>>>>>>>>> One of the reasons that interface ended up being difficult (or
>>>>>>>>>> maybe impossible) to make work reasonably is because the consumer was
>>>>>>>>>> thread-safe at the time. That made it impossible to know what should be
>>>>>>>>>> done when Future.get() is called -- should the implementation call poll()
>>>>>>>>>> itself, or would the fact that the user is calling get() imply that there's
>>>>>>>>>> a background thread running the poll() loop and we just need to wait for it?
>>>>>>>>>> The consumer is no longer thread safe, but I think the same
>>>>>>>>>> problem remains because the expectation with Futures is that they are
>>>>>>>>>> thread safe. Which means that even if the consumer isn't thread safe, I
>>>>>>>>>> would expect to be able to hand that Future off to some other thread, have
>>>>>>>>>> the second thread call get(), and then continue driving the poll loop in my
>>>>>>>>>> thread (which in turn would eventually resolve the Future).
>>>>>>>>>> I quite dislike the sync/async enum. While both operations commit
>>>>>>>>>> offsets, their semantics are so different that overloading a single method
>>>>>>>>>> with both is messy. That said, I don't think we should consider this an
>>>>>>>>>> inconsistency wrt the new producer API's use of Future because the two APIs
>>>>>>>>>> have a much more fundamental difference that justifies it: they have
>>>>>>>>>> completely different threading and execution models.
>>>>>>>>>> I think a Future-based API only makes sense if you can guarantee
>>>>>>>>>> the operations that Futures are waiting on will continue to make progress
>>>>>>>>>> regardless of what the thread using the Future does. The producer API makes
>>>>>>>>>> that work by processing asynchronous requests in a background thread. The
>>>>>>>>>> new consumer does not, and so it becomes difficult/impossible to implement
>>>>>>>>>> the Future correctly. (Or, you have to make assumptions which break other
>>>>>>>>>> use cases; if you want to support the simple use case of just making a
>>>>>>>>>> commit() synchronous by calling get(), the Future has to call poll()
>>>>>>>>>> internally; but if you do that, then if any user ever wants to add
>>>>>>>>>> synchronization to the consumer via some external mechanism, then the
>>>>>>>>>> implementation of the Future's get() method will not be subject to that
>>>>>>>>>> synchronization and things will break).
>>>>>>>>>> -Ewen
>>>>>>>>>>> Guozhang
>>>>>>>>>>> On Tue, Jul 28, 2015 at 4:03 PM, Neha Narkhede <
>>>>>>>>>>> neha@confluent.io> wrote:
>>>>>>>>>>>> Hey Adi,
>>>>>>>>>>>> When we designed the initial version, the producer API was
>>>>>>>>>>>> still changing. I thought about adding the Future and then just didn't get
>>>>>>>>>>>> to it. I agree that we should look into adding it for consistency.
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Neha
>>>>>>>>>>>> On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar <
>>>>>>>>>>>> aauradkar@linkedin.com> wrote:
>>>>>>>>>>>>> Great discussion everyone!
>>>>>>>>>>>>> One general comment on the sync/async API's on the new
>>>>>>>>>>>>> consumer. I think the producer tackles sync vs async API's
>>>>>>>>>>>>> well. For API's that can either be sync or async, can we simply return a
>>>>>>>>>>>>> future? That seems more elegant for the API's that make sense either in
>>>>>>>>>>>>> both flavors. From the users perspective, it is more consistent with the
>>>>>>>>>>>>> new producer. One easy example is the commit call with the CommitType
>>>>>>>>>>>>> enum.. we can make that call always async and users can block on the future
>>>>>>>>>>>>> if they want to make sure their offsets are committed.
>>>>>>>>>>>>> Aditya
>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman <
>>>>>>>>>>>>> okaraman@linkedin.com> wrote:
>>>>>>>>>>>>>> Thanks for the great responses, everyone!
>>>>>>>>>>>>>> To expand a tiny bit on my initial post: while I did bring up
>>>>>>>>>>>>>> old high level consumers, the teams we spoke to were actually not the types
>>>>>>>>>>>>>> of services that simply wanted an easy way to get ConsumerRecords. We spoke
>>>>>>>>>>>>>> to infrastructure teams that I would consider to be closer to the
>>>>>>>>>>>>>> "power-user" end of the spectrum and would want KafkaConsumer's level of
>>>>>>>>>>>>>> granularity. Some would use auto group management. Some would use explicit
>>>>>>>>>>>>>> group management. All of them would turn off auto offset commits. Yes, the
>>>>>>>>>>>>>> Samza team had prior experience with the old SimpleConsumer, but this is
>>>>>>>>>>>>>> the first kafka consumer being used by the Databus team. So I don't really
>>>>>>>>>>>>>> think the feedback received was about the simpler times or wanting
>>>>>>>>>>>>>> additional higher-level clients.
>>>>>>>>>>>>>> - Onur
>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson <
>>>>>>>>>>>>>> jason@confluent.io> wrote:
>>>>>>>>>>>>>>> I think if we recommend a longer session timeout, then we
>>>>>>>>>>>>>>> should expose the heartbeat frequency in configuration since this generally
>>>>>>>>>>>>>>> controls how long normal rebalances will take. I think it's currently
>>>>>>>>>>>>>>> hard-coded to 3 heartbeats per session timeout. It could also be nice to
>>>>>>>>>>>>>>> have an explicit LeaveGroup request to implement clean shutdown of a
>>>>>>>>>>>>>>> consumer. Then the coordinator doesn't have to wait for the timeout to
>>>>>>>>>>>>>>> reassign partitions.
>>>>>>>>>>>>>>> -Jason
>>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <jay@confluent.io
>>>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>>>>> Hey Kartik,
>>>>>>>>>>>>>>>> Totally agree we don't want people tuning timeouts in the
>>>>>>>>>>>>>>>> common case.
>>>>>>>>>>>>>>>> However there are two ways to avoid this:
>>>>>>>>>>>>>>>> 1. Default the timeout high
>>>>>>>>>>>>>>>> 2. Put the heartbeat in a separate thread
>>>>>>>>>>>>>>>> When we were doing the consumer design we discussed this
>>>>>>>>>>>>>>>> tradeoff and I think the conclusion we came to was that defaulting to a
>>>>>>>>>>>>>>>> high timeout was actually better. This means it takes a little longer to
>>>>>>>>>>>>>>>> detect a failure, but usually that is not a big problem and people who want
>>>>>>>>>>>>>>>> faster failure detection can tune it down. This seemed better than having
>>>>>>>>>>>>>>>> the failure detection not really cover the consumption and just be a
>>>>>>>>>>>>>>>> background ping. The two reasons where (a) you still have the GC problem
>>>>>>>>>>>>>>>> even for the background thread, (b) consumption is in some sense a better
>>>>>>>>>>>>>>>> definition of an active healthy consumer and a lot of problems crop up when
>>>>>>>>>>>>>>>> you have an inactive consumer with an active background thread (as today).
>>>>>>>>>>>>>>>> When we had the discussion I think what we realized was
>>>>>>>>>>>>>>>> that most people who were worried about the timeout where imagining a very
>>>>>>>>>>>>>>>> low default (500ms) say. But in fact just setting this to 60 seconds or
>>>>>>>>>>>>>>>> higher as a default would be okay, this adds to the failure detection time
>>>>>>>>>>>>>>>> but only apps that care about this need to tune. This should largely
>>>>>>>>>>>>>>>> eliminate false positives since after all if you disappear for 60 seconds
>>>>>>>>>>>>>>>> that actually starts to be more of a true positive, even if you come
>>>>>>>>>>>>>>>> back... :-)
>>>>>>>>>>>>>>>> -Jay
>>>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam <
>>>>>>>>>>>>>>>> kparamasivam@linkedin.com> wrote:
>>>>>>>>>>>>>>>>> adding the open source alias.  This email started off as a
>>>>>>>>>>>>>>>>> broader discussion around the new consumer.  I was zooming into only the
>>>>>>>>>>>>>>>>> aspect of poll() being the only mechanism for driving the heartbeats.
>>>>>>>>>>>>>>>>> Yes the lag is the effect of the problem (not the
>>>>>>>>>>>>>>>>> problem).  Monitoring the lag is important as it is the primary way to tell
>>>>>>>>>>>>>>>>> if the application is wedged.  There might be other metrics which can
>>>>>>>>>>>>>>>>> possibly capture the same essence. Yes the lag is at the consumer group
>>>>>>>>>>>>>>>>> level, but you can tell that one of the consumers is messed up if one of
>>>>>>>>>>>>>>>>> the partitions in the application start generating lag and others are good
>>>>>>>>>>>>>>>>> for e.g.
>>>>>>>>>>>>>>>>> Monitoring aside, I think the main point of concern is
>>>>>>>>>>>>>>>>> that in the old consumer most customers don't have to worry about
>>>>>>>>>>>>>>>>> unnecessary rebalances and most of the things that they do in their app
>>>>>>>>>>>>>>>>> doesn't have an impact on the session timeout..  (i.e. the only thing that
>>>>>>>>>>>>>>>>> causes rebalances is when the GC is out of whack).    For the handful of
>>>>>>>>>>>>>>>>> customers who are impacted by GC related rebalances, i would imagine that
>>>>>>>>>>>>>>>>> all of them would really want us to make the system more resilient.    I
>>>>>>>>>>>>>>>>> agree that the GC problem can't be solved easily in the java client,
>>>>>>>>>>>>>>>>> however it appears that now we would be expecting the consuming
>>>>>>>>>>>>>>>>> applications to be even more careful with ongoing tuning of the timeouts.
>>>>>>>>>>>>>>>>> At LinkedIn, we have seen that most kafka applications don't have much of a
>>>>>>>>>>>>>>>>> clue about configuring the timeouts and just end up calling the Kafka team
>>>>>>>>>>>>>>>>> when their application sees rebalances.
>>>>>>>>>>>>>>>>> The other side effect of poll driving the heartbeats is
>>>>>>>>>>>>>>>>> that we have to make sure that people don't set a poll timeout that is
>>>>>>>>>>>>>>>>> larger than the session timeout.   If we had a notion of implicit
>>>>>>>>>>>>>>>>> heartbeats then we could also automatically make this work for consumers by
>>>>>>>>>>>>>>>>> sending hearbeats at the appropriate interval even though the customers
>>>>>>>>>>>>>>>>> want to do a long poll.
>>>>>>>>>>>>>>>>> We could surely work around this in LinkedIn if either we
>>>>>>>>>>>>>>>>> have the Pause() api or an explicit HeartBeat() api on the consumer.
>>>>>>>>>>>>>>>>> Would love to hear how other people think about this
>>>>>>>>>>>>>>>>> subject ?
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Kartik
>>>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede <
>>>>>>>>>>>>>>>>> neha@confluent.io> wrote:
>>>>>>>>>>>>>>>>>> Agree with the dilemma you are pointing out, which is
>>>>>>>>>>>>>>>>>> that there are many ways the application's message processing could fail
>>>>>>>>>>>>>>>>>> and we wouldn't be able to model all of those in the consumer's failure
>>>>>>>>>>>>>>>>>> detection mechanism. So we should try to model as much of it as we can so
>>>>>>>>>>>>>>>>>> the consumer's failure detection is meaningful.
>>>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect
>>>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag increases then for
>>>>>>>>>>>>>>>>>>> sure something is wrong.
>>>>>>>>>>>>>>>>>> The lag is merely the effect of the problem, not the
>>>>>>>>>>>>>>>>>> problem itself. Lag is also a consumer group level concept and the problem
>>>>>>>>>>>>>>>>>> we have is being able to detect failures at the level of individual
>>>>>>>>>>>>>>>>>> consumer instances.
>>>>>>>>>>>>>>>>>> As you pointed out, a consumer that poll() is a stronger
>>>>>>>>>>>>>>>>>> indicator of whether the consumer is alive or not. The dilemma then is who
>>>>>>>>>>>>>>>>>> defines what a healthy poll() frequency is. No one else but the application
>>>>>>>>>>>>>>>>>> owner can define what a "normal" processing latency is for their
>>>>>>>>>>>>>>>>>> application. Now the question is what's the easiest way for the user to
>>>>>>>>>>>>>>>>>> define this without having to tune and fine tune this too often. The
>>>>>>>>>>>>>>>>>> heartbeat interval certainly does not have to be
>>>>>>>>>>>>>>>>>> *exactly* 99tile of processing latency but could be in
>>>>>>>>>>>>>>>>>> the ballpark + an error delta. The error delta is the application owner's
>>>>>>>>>>>>>>>>>> acceptable risk threshold during which they would be ok if the application
>>>>>>>>>>>>>>>>>> remains part of the group despite being dead. It is ultimately a tradeoff
>>>>>>>>>>>>>>>>>> between operational ease and more accurate failure detection.
>>>>>>>>>>>>>>>>>> With quotas the write latencies to kafka could range from
>>>>>>>>>>>>>>>>>>> a few milliseconds all the way to a tens of seconds.
>>>>>>>>>>>>>>>>>> This is actually no different from the GC problem. Most
>>>>>>>>>>>>>>>>>> most of the times, the normal GC falls in the few ms range and there are
>>>>>>>>>>>>>>>>>> many applications even at LinkedIn for which the max GC falls in the
>>>>>>>>>>>>>>>>>> multiple seconds range. Note that it also can't be predicted, so has to be
>>>>>>>>>>>>>>>>>> an observed value. One way or the other, you have to observe what this
>>>>>>>>>>>>>>>>>> acceptable "max" is for your application and then set the appropriate
>>>>>>>>>>>>>>>>>> timeouts.
>>>>>>>>>>>>>>>>>> Since this is not something that can be automated, this
>>>>>>>>>>>>>>>>>> is a config that the application owner has to set based on the expected
>>>>>>>>>>>>>>>>>> behavior of their application. Not wanting to do that leads to ending up
>>>>>>>>>>>>>>>>>> with bad consumption semantics where the application process continues to
>>>>>>>>>>>>>>>>>> be part of a group owning partitions but not consuming since it has halted
>>>>>>>>>>>>>>>>>> due to a problem. The fact that the design requires them to express that in
>>>>>>>>>>>>>>>>>> poll() frequency or not doesn't change the fact that the application owner
>>>>>>>>>>>>>>>>>> has to go through the process of measuring and then defining this "max".
>>>>>>>>>>>>>>>>>> The reverse where they don't do this and the application
>>>>>>>>>>>>>>>>>> remains in the group despite being dead is super confusing and frustrating
>>>>>>>>>>>>>>>>>> too. So the due diligence up front is actually worth. And as long as the
>>>>>>>>>>>>>>>>>> poll() latency and processing latency can be monitored, it should be easy
>>>>>>>>>>>>>>>>>> to tell the reason for a rebalance, whether that is valid or not and how
>>>>>>>>>>>>>>>>>> that should be tuned.
>>>>>>>>>>>>>>>>>> As for the wrapper, KIP-28 is the wrapper in open source
>>>>>>>>>>>>>>>>>> that will hide this complexity and I agree that LI is unblocked since you
>>>>>>>>>>>>>>>>>> can do this in TrackerConsumer in the meantime.
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Neha
>>>>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam <
>>>>>>>>>>>>>>>>>> kparamasivam@linkedin.com> wrote:
>>>>>>>>>>>>>>>>>>> For commit(), I think it should hopefully be an easier
>>>>>>>>>>>>>>>>>>> discussion, so maybe we can follow up when we meet up next.
>>>>>>>>>>>>>>>>>>> As far as the heartbeat is concerned, I think the points
>>>>>>>>>>>>>>>>>>> you discuss are all very valid.
>>>>>>>>>>>>>>>>>>> GC pauses impacting the heartbeats is a real issue.
>>>>>>>>>>>>>>>>>>> However there are a smaller percentage of memory hungry apps that get hit
>>>>>>>>>>>>>>>>>>> by it.
>>>>>>>>>>>>>>>>>>> The broader issue whereby even if the heartbeats are
>>>>>>>>>>>>>>>>>>> healthy, the app might not be behaving correctly is also real.  If the app
>>>>>>>>>>>>>>>>>>> is calling poll() then the probability that the app is healthy is surely
>>>>>>>>>>>>>>>>>>> higher.  But this again isn't an absolute measure that the app is
>>>>>>>>>>>>>>>>>>> processing correctly.
>>>>>>>>>>>>>>>>>>> In other cases the app might have even died in which
>>>>>>>>>>>>>>>>>>> case this discussion is moot.
>>>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect
>>>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag increases then for
>>>>>>>>>>>>>>>>>>> sure something is wrong.
>>>>>>>>>>>>>>>>>>> The proposal seems to be that the application needs to
>>>>>>>>>>>>>>>>>>> tune their session timeout based on the 99tile of the time they take to
>>>>>>>>>>>>>>>>>>> process events after every poll.   This turns out is a nontrivial thing to
>>>>>>>>>>>>>>>>>>> do for an application todo. To start with when an application is new their
>>>>>>>>>>>>>>>>>>> data is going to be based on tests that they have done on synthetic data.
>>>>>>>>>>>>>>>>>>> This often times doesn't represent what they will see in production.  Once
>>>>>>>>>>>>>>>>>>> the app is in production their processing latencies will potentially vary
>>>>>>>>>>>>>>>>>>> over time.  It is extremely unlikely that the application owner does a
>>>>>>>>>>>>>>>>>>> careful job of monitoring the 99tile of latencies over time and readjust
>>>>>>>>>>>>>>>>>>> the settings.  Often times the latencies vary because of variance is other
>>>>>>>>>>>>>>>>>>> services that are called by the consumer as part of processing the events.
>>>>>>>>>>>>>>>>>>> Case in point would be a simple app which reads events
>>>>>>>>>>>>>>>>>>> and writes to Kafka.  With quotas the write latencies to kafka could range
>>>>>>>>>>>>>>>>>>> from a few milliseconds all the way to a tens of seconds.  As the scale of
>>>>>>>>>>>>>>>>>>> processing for an app increasing the app or that 'user' could now get
>>>>>>>>>>>>>>>>>>> quotaed.  Instead of slowing down gracefully unless the application owner
>>>>>>>>>>>>>>>>>>> has carefully tuned the timeout, now we are looking at a potential outage
>>>>>>>>>>>>>>>>>>> where the app could get hit by constant rebalances.
>>>>>>>>>>>>>>>>>>> If we expose the pause() Api then It is possible for us
>>>>>>>>>>>>>>>>>>> to take care of this in the linkedin wrapper.  Whereby we would keep
>>>>>>>>>>>>>>>>>>> calling poll on a separate thread periodically and enqueue the messages.
>>>>>>>>>>>>>>>>>>> When the queue is full we would call pause().
>>>>>>>>>>>>>>>>>>> In essence we can work around it in LinkedIn, however I
>>>>>>>>>>>>>>>>>>> think it is vastly better if we address this in the Api as every major
>>>>>>>>>>>>>>>>>>> customer will eventually be pained by it.
>>>>>>>>>>>>>>>>>>> Kartik
>>>>>>>>>>>>>>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <
>>>>>>>>>>>>>>>>>>> jay@confluent.io> wrote:
>>>>>>>>>>>>>>>>>>> Hey guys,
>>>>>>>>>>>>>>>>>>> Happy to discuss. I agree there may be some rough edges
>>>>>>>>>>>>>>>>>>> and now is definitely the time to clean them up.
>>>>>>>>>>>>>>>>>>> I'm pretty reluctant to change the threading model or
>>>>>>>>>>>>>>>>>>> undergo a big api redesign at this point beyond the group management stuff
>>>>>>>>>>>>>>>>>>> we've discussed in the context of Samza/copycat which is already a big
>>>>>>>>>>>>>>>>>>> effort.
>>>>>>>>>>>>>>>>>>> Overall I agree that we have done a poor job of
>>>>>>>>>>>>>>>>>>> documenting which apis block and which don't and when people are surprised
>>>>>>>>>>>>>>>>>>> because we haven't labeled something that will be unintuitive. But the
>>>>>>>>>>>>>>>>>>> overall style of poll/select-based apis is quite common in programming
>>>>>>>>>>>>>>>>>>> going back to unix select so I don't think it's beyond people if explained
>>>>>>>>>>>>>>>>>>> well (after all we need to mix sync and async apis and if we don't say
>>>>>>>>>>>>>>>>>>> which is which any scheme will be confusing).
>>>>>>>>>>>>>>>>>>> For what it's worth the experience with this api has
>>>>>>>>>>>>>>>>>>> actually been about 1000x better than the issues people had around
>>>>>>>>>>>>>>>>>>> intuitiveness with the high-level api. The crazy blocking iterator,
>>>>>>>>>>>>>>>>>>> impossible internal queue sizing, baroque threading model, etc  have all
>>>>>>>>>>>>>>>>>>> caused endless amounts of anger. Not to mention that that client
>>>>>>>>>>>>>>>>>>> effectively disqualifies about 50% of the use cases people want to try to
>>>>>>>>>>>>>>>>>>> use it for (plus I regularly hear people tell me they've heard not to use
>>>>>>>>>>>>>>>>>>> it at all for various reasons ranging from data loss to lack of features).
>>>>>>>>>>>>>>>>>>> It's important to have that context when people need to switch and they say
>>>>>>>>>>>>>>>>>>> "oh the old way was so simple and the new way complex!" :-)
>>>>>>>>>>>>>>>>>>> Let me give some context related to your points, based
>>>>>>>>>>>>>>>>>>> on our previous discussions:
>>>>>>>>>>>>>>>>>>> For commit, let's discuss, that is easy either way.
>>>>>>>>>>>>>>>>>>> The motivation for avoiding additional threading was
>>>>>>>>>>>>>>>>>>> two-fold. First this client is really intended to be the lowest level
>>>>>>>>>>>>>>>>>>> client. There are many, many possible higher level processing abstractions.
>>>>>>>>>>>>>>>>>>> One thing we found to be a big problem with the high-level client was that
>>>>>>>>>>>>>>>>>>> it coupled things everyone must have--failover, etc--with things that are
>>>>>>>>>>>>>>>>>>> different in each use case like the appropriate threading model. If you do
>>>>>>>>>>>>>>>>>>> this you need to also maintain a thread free low-level consumer api for
>>>>>>>>>>>>>>>>>>> people to get around whatever you have done.
>>>>>>>>>>>>>>>>>>> The second reason was that the internal threading in the
>>>>>>>>>>>>>>>>>>> client became quite complex. The answer with threading is always that "it
>>>>>>>>>>>>>>>>>>> won't be complex this time", but it always is.
>>>>>>>>>>>>>>>>>>> For the heartbeat you correctly describe the downside to
>>>>>>>>>>>>>>>>>>> coupling heartbeat with poll--the contract is that the application must
>>>>>>>>>>>>>>>>>>> regularly consume to be considered an active consumer. This allows the
>>>>>>>>>>>>>>>>>>> possibility of false positive failure detections. However it's important to
>>>>>>>>>>>>>>>>>>> understand the downside of the alternative. If you do background polling a
>>>>>>>>>>>>>>>>>>> consumer is considered active as long as it isn't shutdown. This leads to
>>>>>>>>>>>>>>>>>>> all kinds of active consumers that aren't consuming because they have
>>>>>>>>>>>>>>>>>>> leaked or otherwise stopped but are still claiming partitions and
>>>>>>>>>>>>>>>>>>> heart-beating. This failure mode is actually far far worse. If you allow
>>>>>>>>>>>>>>>>>>> false positives the user sees the frequent rebalances and knows they aren't
>>>>>>>>>>>>>>>>>>> consuming frequently enough to be considered active but if you allows false
>>>>>>>>>>>>>>>>>>> negatives you end up having weeks go by before someone notices that a
>>>>>>>>>>>>>>>>>>> partition has been unconsumed the whole time at which point the data is
>>>>>>>>>>>>>>>>>>> gone. Plus of course even if you do this you still have regular false
>>>>>>>>>>>>>>>>>>> positives anyway from GC pauses (as now). We discussed this in some depth
>>>>>>>>>>>>>>>>>>> at the time and decided that it is better to have the liveness notion tied
>>>>>>>>>>>>>>>>>>> to *actual* consumption which is the actual definition
>>>>>>>>>>>>>>>>>>> of liveness.
>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>> -Jay
>>>>>>>>>>>>>>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman <
>>>>>>>>>>>>>>>>>>> okaraman@linkedin.com> wrote:
>>>>>>>>>>>>>>>>>>>> Hi Confluent Team.
>>>>>>>>>>>>>>>>>>>> There has recently been a lot of open source activity
>>>>>>>>>>>>>>>>>>>> regarding the new KafkaConsumer:
>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123
>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350
>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359
>>>>>>>>>>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3CCAAUywg_PWbS3HsEVNp5RCCMPvQBaAmaP+ZgN8Fh+WOeLvt_hFA@mail.gmail.com%3E
>>>>>>>>>>>>>>>>>>>> We’ve explained the KafkaConsumer API to the Databus,
>>>>>>>>>>>>>>>>>>>> Samza, and some other teams and we got similar feedback.
>>>>>>>>>>>>>>>>>>>> To summarize the feedback we received from other teams:
>>>>>>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>>>>>>    The current behavior is not intuitive. For example,
>>>>>>>>>>>>>>>>>>>>    KafkaConsumer.poll drives everything. The other methods like subscribe,
>>>>>>>>>>>>>>>>>>>>    unsubscribe, seek, commit(async) don’t do anything without a
>>>>>>>>>>>>>>>>>>>>    KafkaConsumer.poll call.
>>>>>>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>>>>>>    The semantics of a commit() call should be
>>>>>>>>>>>>>>>>>>>>    consistent between sync and async operations. Currently, sync commit is a
>>>>>>>>>>>>>>>>>>>>    blocking call which actually sends out an OffsetCommitRequest and waits for
>>>>>>>>>>>>>>>>>>>>    the response upon the user’s KafkaConsumer.commit call. However, the async
>>>>>>>>>>>>>>>>>>>>    commit is a nonblocking call which just queues up the OffsetCommitRequest.
>>>>>>>>>>>>>>>>>>>>    The request itself is later sent out in the next poll. The teams we talked
>>>>>>>>>>>>>>>>>>>>    to found this misleading.
>>>>>>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>>>>>>    Heartbeats are dependent on user application
>>>>>>>>>>>>>>>>>>>>    behavior (i.e. user applications calling poll). This can be a big problem
>>>>>>>>>>>>>>>>>>>>    as we don’t control how different applications behave. For example, we
>>>>>>>>>>>>>>>>>>>>    might have an application which reads from Kafka and writes to Espresso. If
>>>>>>>>>>>>>>>>>>>>    Espresso is slow for whatever reason, then in rebalances could happen.
>>>>>>>>>>>>>>>>>>>> Generally speaking, we feel that the current
>>>>>>>>>>>>>>>>>>>> KafkaConsumer API design is more of a wrapping around the old simple
>>>>>>>>>>>>>>>>>>>> consumer, i.e. in old consumer we ask users to deal with raw protocols and
>>>>>>>>>>>>>>>>>>>> error handlings while in KafkaConsumer we do that for users. However, for
>>>>>>>>>>>>>>>>>>>> old high level consumer users (which are the majority of users), the
>>>>>>>>>>>>>>>>>>>> experience is a noticeable regression. The old high level consumer
>>>>>>>>>>>>>>>>>>>> interface is simple and easy to use for end user, while KafkaConsumer
>>>>>>>>>>>>>>>>>>>> requires users to be aware of many underlying details and is becoming
>>>>>>>>>>>>>>>>>>>> prohibitive for users to adopt. This is hinted by the javadoc growing
>>>>>>>>>>>>>>>>>>>> bigger and bigger.
>>>>>>>>>>>>>>>>>>>> We think it's getting to the point where we should take
>>>>>>>>>>>>>>>>>>>> a step back and look at the big picture.
>>>>>>>>>>>>>>>>>>>> The current state of KafkaConsumer is that it's
>>>>>>>>>>>>>>>>>>>> single-threaded. There's one big KafkaConsumer.poll called by the user
>>>>>>>>>>>>>>>>>>>> which pretty much drives everything:
>>>>>>>>>>>>>>>>>>>> - data fetches
>>>>>>>>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>>>>>>>> - join groups (new consumer joining a group, topic
>>>>>>>>>>>>>>>>>>>> subscription changes, reacting to group rebalance)
>>>>>>>>>>>>>>>>>>>> - async offset commits
>>>>>>>>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>>>>>>>> Given that the selector's poll is being driven by the
>>>>>>>>>>>>>>>>>>>> end user, this ends up making us educate users on NIO and the consequences
>>>>>>>>>>>>>>>>>>>> of not calling KafkaConsumer.poll frequently enough:
>>>>>>>>>>>>>>>>>>>> - Coordinator will mark the consumer dead
>>>>>>>>>>>>>>>>>>>> - async commits won't send
>>>>>>>>>>>>>>>>>>>> - callbacks won't fire
>>>>>>>>>>>>>>>>>>>> More generally speaking, there are many surprises with
>>>>>>>>>>>>>>>>>>>> the current KafkaConsumer implementation.
>>>>>>>>>>>>>>>>>>>> Here's what we consider to be the goals of
>>>>>>>>>>>>>>>>>>>> KafkaConsumer:
>>>>>>>>>>>>>>>>>>>> - NIO
>>>>>>>>>>>>>>>>>>>> - ability to commit, manipulate offsets, and consume
>>>>>>>>>>>>>>>>>>>> messages
>>>>>>>>>>>>>>>>>>>> - a way to subscribe to topics(auto group management)
>>>>>>>>>>>>>>>>>>>> or partitions(explicit group management)
>>>>>>>>>>>>>>>>>>>> - no surprises in the user experience
>>>>>>>>>>>>>>>>>>>> The last point is the big one that we think we aren't
>>>>>>>>>>>>>>>>>>>> hitting. We think the most important example is that there should be no
>>>>>>>>>>>>>>>>>>>> requirement from the end user to consistently KafkaConsumer.poll in order
>>>>>>>>>>>>>>>>>>>> for all of the above tasks to happen. We think it would be better to split
>>>>>>>>>>>>>>>>>>>> those tasks into tasks that should not rely on KafkaConsumer.poll and tasks
>>>>>>>>>>>>>>>>>>>> that should rely on KafkaConsumer.poll.
>>>>>>>>>>>>>>>>>>>> Tasks that should not rely on KafkaConsumer.poll:
>>>>>>>>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>>>>>>>> - join groups
>>>>>>>>>>>>>>>>>>>> - commits
>>>>>>>>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>>>>>>>> Only data fetches should rely on KafkaConsumer.poll
>>>>>>>>>>>>>>>>>>>> This would help reduce the amount of surprises to the
>>>>>>>>>>>>>>>>>>>> end user.
>>>>>>>>>>>>>>>>>>>> We’ve sketched out a proposal and we’ll send it out to
>>>>>>>>>>>>>>>>>>>> you guys early next week. We’d like to meet up with you at LinkedIn on *July
>>>>>>>>>>>>>>>>>>>> 31, 2015* so we can talk about it before proposing it
>>>>>>>>>>>>>>>>>>>> to open source.
>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>> LinkedIn Kafka Dev Team
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Neha
>>>>>>>>>>>> --
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Neha
>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>> --
>>>>>>>>>> Thanks,
>>>>>>>>>> Ewen
>>>>>>>> --
>>>>>>>> Thanks,
>>>>>>>> Neha
>>>>>> --
>>>>>> Thanks,
>>>>>> Neha

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message