kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer
Date Tue, 28 Mar 2017 00:33:14 GMT
Mickael,

Sorry for the late review of the KIP. I'm +1 on the proposed change as
well. Just a few minor comments on the wiki itself:

1. By the "MemoryPool" are you referring to a new class impl or to reusing "
org.apache.kafka.clients.producer.internals.BufferPool"? I assume it was
the latter case, and if yes, could you update the wiki page to make it
clear?

2. I think it is sufficient to add the priority to KafkaChannel class, but
not needed in Node (but one may need to add this parameter to Selector#
connect). Could you point me to which usage of Node needs to access the
priority?


Guozhang


On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <mickael.maison@gmail.com>
wrote:

> Thanks Jason for the feedback! Yes it makes sense to always use the
> MemoryPool is we can. I've updated the KIP with the suggestion
>
> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <jason@confluent.io>
> wrote:
> > Just a minor comment. The KIP suggests that coordinator responses are
> > always allocated outside of the memory pool, but maybe we can reserve
> that
> > capability for only when the pool does not have enough space? It seems a
> > little nicer to use the pool if we can. If that seems reasonable, I'm +1
> on
> > the KIP. Thanks for the effort!
> >
> > -Jason
> >
> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
> mickael.maison@gmail.com>
> > wrote:
> >
> >> Yes I agree, having a generic flag is more future proof.
> >> I'll update the KIP in the coming days.
> >>
> >> Thanks
> >>
> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson <jason@confluent.io>
> >> wrote:
> >> > Hey Mickael,
> >> >
> >> > The suggestion to add something to Node makes sense. I could imagine
> for
> >> > example adding a flag to indicate that the connection has a higher
> >> > "priority," meaning that we can allocate outside of the memory pool if
> >> > necessary. That would still be generic even if the only use case is
> the
> >> > consumer coordinator. We might also face a similar problem when the
> >> > producer is sending requests to the transaction coordinator for
> KIP-98.
> >> > What do you think?
> >> >
> >> > Thanks,
> >> > Jason
> >> >
> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
> >> mickael.maison@gmail.com>
> >> > wrote:
> >> >
> >> >> Apologies for the late response.
> >> >>
> >> >> Thanks Jason for the suggestion. Yes you are right, the Coordinator
> >> >> connection is "tagged" with a different id, so we could retrieve it
> in
> >> >> NetworkReceive to make the distinction.
> >> >> However, currently the coordinator connection are made different by
> >> using:
> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
> >> >> for the Node id.
> >> >>
> >> >> So to identify Coordinator connections, we'd have to check that the
> >> >> NetworkReceive source is a value near Integer.MAX_VALUE which is a
> bit
> >> >> hacky ...
> >> >>
> >> >> Maybe we could add a constructor to Node that allows to pass in a
> >> >> sourceId String. That way we could make all the coordinator
> >> >> connections explicit (by setting it to "Coordinator-[ID]" for
> >> >> example).
> >> >> What do you think ?
> >> >>
> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
> jason@confluent.io>
> >> >> wrote:
> >> >> > Good point. The consumer does use a separate connection to the
> >> >> coordinator,
> >> >> > so perhaps the connection itself could be tagged for normal heap
> >> >> allocation?
> >> >> >
> >> >> > -Jason
> >> >> >
> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
> >> >> onurkaraman.apache@gmail.com
> >> >> >> wrote:
> >> >> >
> >> >> >> I only did a quick scan but I wanted to point out what I think
is
> an
> >> >> >> incorrect assumption in the KIP's caveats:
> >> >> >> "
> >> >> >> There is a risk using the MemoryPool that, after we fill up
the
> >> memory
> >> >> with
> >> >> >> fetch data, we can starve the coordinator's connection
> >> >> >> ...
> >> >> >> To alleviate this issue, only messages larger than 1Kb will
be
> >> >> allocated in
> >> >> >> the MemoryPool. Smaller messages will be allocated directly
on the
> >> Heap
> >> >> >> like before. This allows group/heartbeat messages to avoid
being
> >> >> delayed if
> >> >> >> the MemoryPool fills up.
> >> >> >> "
> >> >> >>
> >> >> >> So it sounds like there's an incorrect assumption that responses
> from
> >> >> the
> >> >> >> coordinator will always be small (< 1Kb as mentioned in
the
> caveat).
> >> >> There
> >> >> >> are now a handful of request types between clients and the
> >> coordinator:
> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit,
> >> OffsetFetch,
> >> >> >> ListGroups, DescribeGroups}. While true (at least today) for
> >> >> >> HeartbeatResponse and a few others, I don't think we can assume
> >> >> >> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse,
and
> >> >> >> OffsetFetchResponse will be small, as they are effectively
> bounded by
> >> >> the
> >> >> >> max message size allowed by the broker for the __consumer_offsets
> >> topic
> >> >> >> which by default is 1MB.
> >> >> >>
> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
> >> >> mickael.maison@gmail.com>
> >> >> >> wrote:
> >> >> >>
> >> >> >> > I've updated the KIP to address all the comments raised
here and
> >> from
> >> >> >> > the "DISCUSS" thread.
> >> >> >> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> >> >
> >> >> >> > Now, I'd like to restart the vote.
> >> >> >> >
> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> >> >> >> > <rajinisivaram@googlemail.com> wrote:
> >> >> >> > > Hi Mickael,
> >> >> >> > >
> >> >> >> > > I am +1 on the overall approach of this KIP, but
have a
> couple of
> >> >> >> > comments
> >> >> >> > > (sorry, should have brought them up on the discuss
thread
> >> earlier):
> >> >> >> > >
> >> >> >> > > 1. Perhaps it would be better to do this after KAFKA-4137
> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137>
is
> >> implemented?
> >> >> At
> >> >> >> > the
> >> >> >> > > moment, coordinator shares the same NetworkClient
(and hence
> the
> >> >> same
> >> >> >> > > Selector) with consumer connections used for fetching
records.
> >> Since
> >> >> >> > > freeing of memory relies on consuming applications
invoking
> >> poll()
> >> >> >> after
> >> >> >> > > processing previous records and potentially after
committing
> >> >> offsets,
> >> >> >> it
> >> >> >> > > will be good to ensure that coordinator is not blocked
for
> read
> >> by
> >> >> >> fetch
> >> >> >> > > responses. This may be simpler once coordinator
has its own
> >> >> Selector.
> >> >> >> > >
> >> >> >> > > 2. The KIP says: *Once messages are returned to
the user,
> >> messages
> >> >> are
> >> >> >> > > deleted from the MemoryPool so new messages can
be stored.*
> >> >> >> > > Can you expand that a bit? I am assuming that partial
buffers
> >> never
> >> >> get
> >> >> >> > > freed when some messages are returned to the user
since the
> >> >> consumer is
> >> >> >> > > still holding a reference to the buffer. Would buffers
be
> freed
> >> when
> >> >> >> > > fetches for all the partitions in a response are
parsed, but
> >> perhaps
> >> >> >> not
> >> >> >> > > yet returned to the user (i.e., is the memory freed
when a
> >> >> reference to
> >> >> >> > the
> >> >> >> > > response buffer is no longer required)? It will
be good to
> >> document
> >> >> the
> >> >> >> > > (approximate) maximum memory requirement for the
> non-compressed
> >> >> case.
> >> >> >> > There
> >> >> >> > > is data read from the socket, cached in the Fetcher
and (as
> Radai
> >> >> has
> >> >> >> > > pointed out), the records still with the user application.
> >> >> >> > >
> >> >> >> > >
> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
> >> radai.rosenblatt@gmail.com>
> >> >> >> > wrote:
> >> >> >> > >
> >> >> >> > >> +1 (non-binding).
> >> >> >> > >>
> >> >> >> > >> small nit pick - just because you returned a
response to user
> >> >> doesnt
> >> >> >> > mean
> >> >> >> > >> the memory id no longer used. for some cases
the actual
> "point
> >> of
> >> >> >> > >> termination" may be the deserializer (really
impl-dependant),
> >> but
> >> >> >> > >> generally, wouldnt it be "nice" to have an explicit
dispose()
> >> call
> >> >> on
> >> >> >> > >> responses (with the addition that getting the
next batch of
> data
> >> >> from
> >> >> >> a
> >> >> >> > >> consumer automatically disposes the previous
results)
> >> >> >> > >>
> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar
<
> >> ECOMAR@uk.ibm.com>
> >> >> >> > wrote:
> >> >> >> > >>
> >> >> >> > >> > +1 (non binding)
> >> >> >> > >> > --------------------------------------------------
> >> >> >> > >> > Edoardo Comar
> >> >> >> > >> > IBM MessageHub
> >> >> >> > >> > ecomar@uk.ibm.com
> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
> >> >> >> > >> >
> >> >> >> > >> > IBM United Kingdom Limited Registered in
England and Wales
> >> with
> >> >> >> number
> >> >> >> > >> > 741598 Registered office: PO Box 41, North
Harbour,
> >> Portsmouth,
> >> >> >> Hants.
> >> >> >> > >> PO6
> >> >> >> > >> > 3AU
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> > From:   Mickael Maison <mickael.maison@gmail.com>
> >> >> >> > >> > To:     dev@kafka.apache.org
> >> >> >> > >> > Date:   05/12/2016 14:38
> >> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch
memory usage in
> the
> >> >> >> > consumer
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> > Hi all,
> >> >> >> > >> >
> >> >> >> > >> > I'd like to start the vote for KIP-81:
> >> >> >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> > Thank you
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> > Unless stated otherwise above:
> >> >> >> > >> > IBM United Kingdom Limited - Registered
in England and
> Wales
> >> with
> >> >> >> > number
> >> >> >> > >> > 741598.
> >> >> >> > >> > Registered office: PO Box 41, North Harbour,
Portsmouth,
> >> >> Hampshire
> >> >> >> PO6
> >> >> >> > >> 3AU
> >> >> >> > >> >
> >> >> >> > >>
> >> >> >> > >
> >> >> >> > >
> >> >> >> > >
> >> >> >> > > --
> >> >> >> > > Regards,
> >> >> >> > >
> >> >> >> > > Rajini
> >> >> >> >
> >> >> >>
> >> >>
> >>
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message