kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabor Somogyi <gabor.g.somo...@gmail.com>
Subject Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer
Date Tue, 13 Aug 2019 09:52:25 GMT
I've had concerns calling AdminClient.listTopics because on big clusters
I've seen OOM because of too many TopicPartitions.
On the other this problem already exists in the actual implementation
because as Colin said Consumer is doing the same on client side. All in all
this part is fine.

I've checked all the actual use-cases on Spark side which has to be covered
and it looks doable.


On Tue, Aug 13, 2019 at 6:01 AM Jungtaek Lim <kabhwan@gmail.com> wrote:

> So in overall, AdminClient covers the necessary to retrieve up-to-date
> topic-partitions, whereas KIP-396 will cover the necessary to retrieve
> offset (EARLIEST, LATEST, timestamp) on partition.
>
> Gabor, could you please add the input if I'm missing something? I'd like to
> double-check on this.
>
> Assuming I'm not missing something, what would be preferred next action?
> Personally I'd keep this as it is until KIP-396 passes the vote (the vote
> for KIP-396 opened at January and it still doesn't pass - 7 months - which
> worries me a bit if it's going to pass the vote or not), but I also respect
> the lifecycle of KIP in Kafka community.
>
> On Tue, Aug 13, 2019 at 12:46 PM Jungtaek Lim <kabhwan@gmail.com> wrote:
>
> >
> >
> > On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cmccabe@apache.org>
> wrote:
> >
> >> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote:
> >> > Thanks for the feedbacks Colin and Matthias.
> >> >
> >> > I agree with you regarding getting topics and partitions via
> >> AdminClient,
> >> > just curious how much the overhead would be. Would it be lighter, or
> >> > heavier? We may not want to list topics in regular intervals - in plan
> >> > phase we want to know up-to-date information so that the calculation
> >> from
> >> > Spark itself makes sense.
> >>
> >> It would be lighter. The consumer will periodically refresh metadata for
> >> any topic you are subscribed to. AdminClient doesn’t have the concept of
> >> subscriptions, and won’t refresh topic metadata until you request it.
> >>
> >
> > Sounds great! Happy to hear about that.
> >
> >
> >>
> >> >
> >> > On the other hands I'm not seeing any information regarding offset in
> >> > current AdminClient, which is also one of reason we leverage consumer
> >> and
> >> > call poll(0). Colin, as you mentioned there're KIPs addressing this,
> >> could
> >> > you refer KIPs so that we can see whether it would work for our case?
> >> > Without support of this we cannot replace our usage of consumer/poll
> >> with
> >> > AdminClient.
> >>
> >> KIP-396 is the one for listing offsets in AdminClient.
> >>
> >
> > KIP-396 seems to fit to the needs on Spark's purpose to get offset
> > information, even for timestamp. Thanks!
> > I'd wish there's a way to get a range of (EARLIEST, LATEST) in one call,
> > but not a big deal as it just requires two calls.
> >
> > >
> >> > ps. IMHO it seems to be helpful if there's overloaded `listTopics`
> which
> >> > receives regex same as consumer subscription via pattern. We would
> like
> >> to
> >> > provide same behavior what Kafka is basically providing as a source.
> >>
> >> We don’t have a regex listTopics at the moment, though we could add
> this.
> >> Currently, the regex is done on the client side anyway (although we’d
> >> really like to change this in the future). So just listing everything
> and
> >> filtering locally would be the same performance and behavior as the
> >> Consumer.
> >>
> >
> > I see. Good to know regex is done on the client side - I've just searched
> > some code and it applies filter for all topics retrieved from metadata
> > fetch. Then it would be mostly no difference on this. Thanks for
> confirming.
> >
> >
> >>
> >> best,
> >> Colin
> >>
> >> >
> >> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <
> matthias@confluent.io>
> >> > wrote:
> >> >
> >> > > Thanks for the details Jungtaek!
> >> > >
> >> > > I tend to agree with Colin, that using the AdminClient seems to be
> the
> >> > > better choice.
> >> > >
> >> > > You can get all topics via `listTopics()` (and you can refresh this
> >> > > information on regular intervals) and match any pattern against the
> >> list
> >> > > of available topics in the driver.
> >> > >
> >> > > As you use `assignment()` and store offsets in the Spark checkpoint,
> >> it
> >> > > seems that using consumer group management is not a good fit for the
> >> use
> >> > > case.
> >> > >
> >> > >
> >> > > Thoughts?
> >> > >
> >> > >
> >> > >
> >> > > -Matthias
> >> > >
> >> > > On 8/12/19 8:22 AM, Colin McCabe wrote:
> >> > > > Hi,
> >> > > >
> >> > > > If there’s no need to consume records in the Spark driver,
then
> the
> >> > > Consumer is probably the wrong thing to use. Instead, Spark should
> use
> >> > > AdminClient to find out what partitions exist and where, manage
> their
> >> > > offsets, and so on. There are some KIPs under discussion now that
> >> would add
> >> > > the necessary APIs for managing offsets.
> >> > > >
> >> > > > Best,
> >> > > > Colin
> >> > > >
> >> > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> >> > > >> My feeling is that I didn't explain the use case for Spark
> >> properly and
> >> > > >> hence fail to explain the needs. Sorry about this.
> >> > > >>
> >> > > >> Spark leverages the single instance of KafkaConsumer in the
> driver
> >> > > which is
> >> > > >> registered solely on the consumer group. This is used in
the plan
> >> phase
> >> > > for
> >> > > >> each micro-batch to calculate the overall topicpartitions
with
> its
> >> > > offset
> >> > > >> ranges for this batch, and split and assign (topicpartition,
> >> fromOffset,
> >> > > >> untilOffset) to each input partition. After the planning
is done
> >> and
> >> > > tasks
> >> > > >> are being distributed to executors, consumer per each input
> >> partition
> >> > > will
> >> > > >> be initialized from some executor (being assigned to the
single
> >> > > >> topicpartition), and pull the actual records. (Pooling consumers
> is
> >> > > applied
> >> > > >> for sure.) As plan phase is to determine the overall
> >> topicpartitions and
> >> > > >> offset ranges to process, Spark is never interested on pulling
> the
> >> > > records
> >> > > >> in driver side.
> >> > > >>
> >> > > >> Spark mainly leverages poll(0) to get the latest assigned
> >> partitions and
> >> > > >> adopt the changes or validate the expectation. That's not
only
> use
> >> case
> >> > > for
> >> > > >> poll(0). Spark is also seeking the offset per topicpartition
to
> the
> >> > > >> earliest or the latest, or specific one (either provided
by end
> >> user or
> >> > > the
> >> > > >> last committed offset) so that Spark can have actual offset
or
> >> validate
> >> > > the
> >> > > >> provided offset. According to the javadoc (if I understand
> >> correctly),
> >> > > to
> >> > > >> get the offset immediately it seems to be required to call
`poll`
> >> or
> >> > > >> `position`.
> >> > > >>
> >> > > >> The way Spark interacts with Kafka in this plan phase in
driver
> is
> >> > > >> synchronous, as the phase should finish ASAP to run the next
> phase.
> >> > > >> Registering ConsumerRebalanceListener and tracking the change
> will
> >> > > require
> >> > > >> some asynchronous handling which sounds to add unnecessary
> >> complexity.
> >> > > >> Spark may be OK with deal with synchronous with timeout (that's
> >> what
> >> > > >> methods in KafkaConsumer have been providing - they're not
> >> > > asynchronous, at
> >> > > >> least for callers) but dealing with asynchronous is another
level
> >> of
> >> > > >> interest. I can see the benefit where continuous thread runs
and
> >> the
> >> > > >> consumer is busy with something continuously, relying on
listener
> >> to
> >> > > hear
> >> > > >> the news on reassignment. Unfortunately that's not the case.
> >> > > >>
> >> > > >> Unit tests in Spark have similar needs: looks like Kafka
test
> code
> >> also
> >> > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)`
in
> many
> >> > > places
> >> > > >> as it's appropriate to the place which blocking (+timeout)
call
> is
> >> > > >> preferred - so I can see the similar needs from here as well.
> >> > > >>
> >> > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
> >> > > gabor.g.somogyi@gmail.com>
> >> > > >> wrote:
> >> > > >>
> >> > > >>> Hi Guys,
> >> > > >>>
> >> > > >>> Please see the actual implementation, pretty sure it
explains
> the
> >> > > situation
> >> > > >>> well:
> >> > > >>>
> >> > > >>>
> >> > >
> >>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> >> > > >>>
> >> > > >>> To answer one question/assumption which popped up from
all of
> you
> >> > > Spark not
> >> > > >>> only uses KafkaConsumer#subscribe but pattern subscribe
+
> >> > > >>> KafkaConsumer#assign as well.
> >> > > >>> Please see here:
> >> > > >>>
> >> > > >>>
> >> > >
> >>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> >> > > >>>
> >> > > >>> BR,
> >> > > >>> G
> >> > > >>>
> >> > > >>>
> >> > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
> >> > > satish.duggana@gmail.com>
> >> > > >>> wrote:
> >> > > >>>
> >> > > >>>> Hi Jungtaek,
> >> > > >>>> Thanks for the KIP. I have a couple of questions
here.
> >> > > >>>> Is not Spark using Kafka's consumer group management
across
> >> multiple
> >> > > >>>> consumers?
> >> > > >>>>
> >> > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> >> > > >>>> ConsumerRebalanceListener listener) only to get all
the topics
> >> for a
> >> > > >>>> pattern based subscription and Spark manually assigns
those
> >> > > >>>> topic-partitions across consumers on workers?
> >> > > >>>>
> >> > > >>>> Thanks,
> >> > > >>>> Satish.
> >> > > >>>>
> >> > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
> >> > > matthias@confluent.io>
> >> > > >>>> wrote:
> >> > > >>>>
> >> > > >>>>> If am not sure if I fully understand yet.
> >> > > >>>>>
> >> > > >>>>> The fact, that Spark does not stores offsets
in Kafka but as
> >> part of
> >> > > >>> its
> >> > > >>>>> own checkpoint mechanism seems to be orthogonal.
Maybe I am
> >> missing
> >> > > >>>>> something here.
> >> > > >>>>>
> >> > > >>>>> As you are using subscribe(), you use Kafka consumer
group
> >> mechanism,
> >> > > >>>>> that takes care of the assignment of partitions
to clients
> >> within the
> >> > > >>>>> group. Therefore, I am not sure what you mean
by:
> >> > > >>>>>
> >> > > >>>>>> which Spark needs to
> >> > > >>>>>>> know to coordinate multiple consumers
to pull correctly.
> >> > > >>>>>
> >> > > >>>>> Multiple thoughts that may help:
> >> > > >>>>>
> >> > > >>>>> - if Spark needs more control about the partition
assignment,
> >> you can
> >> > > >>>>> provide a custom `ConsumerPartitionAssignor`
(via the consumer
> >> > > >>>>> configuration)
> >> > > >>>>>
> >> > > >>>>> - you may also want to register `ConsumerRebalanceListener`
> via
> >> > > >>>>> `subscribe()` to get informed when the group
rebalances
> >> > > >>>>>
> >> > > >>>>> As you pointed out, using pattern subscription
metadata can
> >> change if
> >> > > >>>>> topic are added/deleted. However, each metadata
change will
> >> > > triggering
> >> > > >>> a
> >> > > >>>>> rebalance and thus you would get corresponding
calls to you
> >> rebalance
> >> > > >>>>> listener to learn about it and react accordingly.
> >> > > >>>>>
> >> > > >>>>> Maybe you can explain why neither of both approaches
works and
> >> what
> >> > > gap
> >> > > >>>>> the new API would close?
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>> -Matthias
> >> > > >>>>>
> >> > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> >> > > >>>>>> Let me elaborate my explanation a bit more.
Here we say about
> >> Apache
> >> > > >>>>> Spark,
> >> > > >>>>>> but this will apply for everything which
want to control
> >> offset of
> >> > > >>>> Kafka
> >> > > >>>>>> consumers.
> >> > > >>>>>>
> >> > > >>>>>> Spark is managing the committed offsets and
the offsets which
> >> should
> >> > > >>> be
> >> > > >>>>>> polled now. Topics and partitions as well.
This is required
> as
> >> Spark
> >> > > >>>>> itself
> >> > > >>>>>> has its own general checkpoint mechanism
and Kafka is just a
> >> one of
> >> > > >>>>>> source/sink (though it's considered as very
important).
> >> > > >>>>>>
> >> > > >>>>>> To pull records from Kafka, Spark provides
to Kafka which
> >> topics and
> >> > > >>>>>> partitions it wants to subscribe(, and do
seek and poll), but
> >> as
> >> > > >>> Spark
> >> > > >>>>> can
> >> > > >>>>>> also provide "patterns" of topics, as well
as subscription
> can
> >> be
> >> > > >>>> changed
> >> > > >>>>>> in Kafka side (topic added/dropped, partitions
added) which
> >> Spark
> >> > > >>> needs
> >> > > >>>>> to
> >> > > >>>>>> know to coordinate multiple consumers to
pull correctly.
> >> > > >>>>>>
> >> > > >>>>>> Looks like assignment() doesn't update the
assignment
> >> information in
> >> > > >>>>>> consumer. It just returns known one. There's
only one known
> >> approach
> >> > > >>>>> doing
> >> > > >>>>>> this, calling `poll`, but Spark is not interested
on returned
> >> > > >>> records,
> >> > > >>>> so
> >> > > >>>>>> there's a need for a hack `poll(0)`, and
Kafka deprecated the
> >> API.
> >> > > >>> This
> >> > > >>>>> KIP
> >> > > >>>>>> proposes to support this as official approach.
> >> > > >>>>>>
> >> > > >>>>>>
> >> > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek
Lim <
> >> kabhwan@gmail.com>
> >> > > >>>> wrote:
> >> > > >>>>>>
> >> > > >>>>>>> Sorry I didn't recognize you're also
asking it here as well.
> >> I'm in
> >> > > >>>>> favor
> >> > > >>>>>>> of describing it in this discussion thread
so the discussion
> >> itself
> >> > > >>>> can
> >> > > >>>>> go
> >> > > >>>>>>> forward. So copying my answer here:
> >> > > >>>>>>>
> >> > > >>>>>>> We have some use case which we don't
just rely on everything
> >> what
> >> > > >>>> Kafka
> >> > > >>>>>>> consumer provides. We want to know current
assignment on
> this
> >> > > >>>> consumer,
> >> > > >>>>> and
> >> > > >>>>>>> to get the latest assignment, we called
the hack `poll(0)`.
> >> > > >>>>>>>
> >> > > >>>>>>> That said, we don't want to pull any
records here, and if
> I'm
> >> not
> >> > > >>>>> missing
> >> > > >>>>>>> here, there's no way to accomplish this.
Please guide me if
> >> I'm
> >> > > >>>> missing
> >> > > >>>>>>> something.
> >> > > >>>>>>>
> >> > > >>>>>>> Thanks,
> >> > > >>>>>>> Jungtaek Lim (HeartSaVioR)
> >> > > >>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias
J. Sax <
> >> > > >>>> matthias@confluent.io>
> >> > > >>>>>>> wrote:
> >> > > >>>>>>>
> >> > > >>>>>>>> Thanks for the KIP.
> >> > > >>>>>>>>
> >> > > >>>>>>>> Can you elaborate a little bit more
on the use case for
> this
> >> > > >>> feature?
> >> > > >>>>>>>> Why would a consumer need to update
it's metadata
> explicitly?
> >> > > >>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>> -Matthias
> >> > > >>>>>>>>
> >> > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> >> > > >>>>>>>>> Hi devs,
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> I'd like to initiate discussion
around KIP-505, exposing
> new
> >> > > >>> public
> >> > > >>>>>>>> method
> >> > > >>>>>>>>> to only update assignment metadata
in consumer.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> `poll(0)` has been misused as
according to Kafka doc it
> >> doesn't
> >> > > >>>>>>>> guarantee
> >> > > >>>>>>>>> that it doesn't pull any records,
and new method
> >> `poll(Duration)`
> >> > > >>>>>>>> doesn't
> >> > > >>>>>>>>> have same semantic, so would
like to propose new public
> API
> >> which
> >> > > >>>> only
> >> > > >>>>>>>> does
> >> > > >>>>>>>>> the desired behavior.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> Please feel free to suggest any
improvements on proposal,
> >> as I'm
> >> > > >>> new
> >> > > >>>>> to
> >> > > >>>>>>>>> Kafka community and may not catch
preferences (like
> >> > > >>> TimeoutException
> >> > > >>>>> vs
> >> > > >>>>>>>>> boolean, etc.) on Kafka project.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> Thanks in advance!
> >> > > >>>>>>>>> Jungtaek Lim (HeartSaVioR)
> >> > > >>>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>> --
> >> > > >>>>>>> Name : Jungtaek Lim
> >> > > >>>>>>> Blog : http://medium.com/@heartsavior
> >> > > >>>>>>> Twitter : http://twitter.com/heartsavior
> >> > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > > >>
> >> > > >> --
> >> > > >> Name : Jungtaek Lim
> >> > > >> Blog : http://medium.com/@heartsavior
> >> > > >> Twitter : http://twitter.com/heartsavior
> >> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior
> >> > > >>
> >> > > >
> >> > >
> >> > >
> >> >
> >> > --
> >> > Name : Jungtaek Lim
> >> > Blog : http://medium.com/@heartsavior
> >> > Twitter : http://twitter.com/heartsavior
> >> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >> >
> >>
> >
> >
> > --
> > Name : Jungtaek Lim
> > Blog : http://medium.com/@heartsavior
> > Twitter : http://twitter.com/heartsavior
> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >
>
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message