kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mickael Maison <mickael.mai...@gmail.com>
Subject Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer
Date Tue, 28 Feb 2017 18:09:58 GMT
Yes I agree, having a generic flag is more future proof.
I'll update the KIP in the coming days.

Thanks

On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson <jason@confluent.io> wrote:
> Hey Mickael,
>
> The suggestion to add something to Node makes sense. I could imagine for
> example adding a flag to indicate that the connection has a higher
> "priority," meaning that we can allocate outside of the memory pool if
> necessary. That would still be generic even if the only use case is the
> consumer coordinator. We might also face a similar problem when the
> producer is sending requests to the transaction coordinator for KIP-98.
> What do you think?
>
> Thanks,
> Jason
>
> On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <mickael.maison@gmail.com>
> wrote:
>
>> Apologies for the late response.
>>
>> Thanks Jason for the suggestion. Yes you are right, the Coordinator
>> connection is "tagged" with a different id, so we could retrieve it in
>> NetworkReceive to make the distinction.
>> However, currently the coordinator connection are made different by using:
>> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
>> for the Node id.
>>
>> So to identify Coordinator connections, we'd have to check that the
>> NetworkReceive source is a value near Integer.MAX_VALUE which is a bit
>> hacky ...
>>
>> Maybe we could add a constructor to Node that allows to pass in a
>> sourceId String. That way we could make all the coordinator
>> connections explicit (by setting it to "Coordinator-[ID]" for
>> example).
>> What do you think ?
>>
>> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <jason@confluent.io>
>> wrote:
>> > Good point. The consumer does use a separate connection to the
>> coordinator,
>> > so perhaps the connection itself could be tagged for normal heap
>> allocation?
>> >
>> > -Jason
>> >
>> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
>> onurkaraman.apache@gmail.com
>> >> wrote:
>> >
>> >> I only did a quick scan but I wanted to point out what I think is an
>> >> incorrect assumption in the KIP's caveats:
>> >> "
>> >> There is a risk using the MemoryPool that, after we fill up the memory
>> with
>> >> fetch data, we can starve the coordinator's connection
>> >> ...
>> >> To alleviate this issue, only messages larger than 1Kb will be
>> allocated in
>> >> the MemoryPool. Smaller messages will be allocated directly on the Heap
>> >> like before. This allows group/heartbeat messages to avoid being
>> delayed if
>> >> the MemoryPool fills up.
>> >> "
>> >>
>> >> So it sounds like there's an incorrect assumption that responses from
>> the
>> >> coordinator will always be small (< 1Kb as mentioned in the caveat).
>> There
>> >> are now a handful of request types between clients and the coordinator:
>> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit, OffsetFetch,
>> >> ListGroups, DescribeGroups}. While true (at least today) for
>> >> HeartbeatResponse and a few others, I don't think we can assume
>> >> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, and
>> >> OffsetFetchResponse will be small, as they are effectively bounded by
>> the
>> >> max message size allowed by the broker for the __consumer_offsets topic
>> >> which by default is 1MB.
>> >>
>> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
>> mickael.maison@gmail.com>
>> >> wrote:
>> >>
>> >> > I've updated the KIP to address all the comments raised here and from
>> >> > the "DISCUSS" thread.
>> >> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >> >
>> >> > Now, I'd like to restart the vote.
>> >> >
>> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
>> >> > <rajinisivaram@googlemail.com> wrote:
>> >> > > Hi Mickael,
>> >> > >
>> >> > > I am +1 on the overall approach of this KIP, but have a couple
of
>> >> > comments
>> >> > > (sorry, should have brought them up on the discuss thread earlier):
>> >> > >
>> >> > > 1. Perhaps it would be better to do this after KAFKA-4137
>> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is implemented?
>> At
>> >> > the
>> >> > > moment, coordinator shares the same NetworkClient (and hence the
>> same
>> >> > > Selector) with consumer connections used for fetching records.
Since
>> >> > > freeing of memory relies on consuming applications invoking poll()
>> >> after
>> >> > > processing previous records and potentially after committing
>> offsets,
>> >> it
>> >> > > will be good to ensure that coordinator is not blocked for read
by
>> >> fetch
>> >> > > responses. This may be simpler once coordinator has its own
>> Selector.
>> >> > >
>> >> > > 2. The KIP says: *Once messages are returned to the user, messages
>> are
>> >> > > deleted from the MemoryPool so new messages can be stored.*
>> >> > > Can you expand that a bit? I am assuming that partial buffers
never
>> get
>> >> > > freed when some messages are returned to the user since the
>> consumer is
>> >> > > still holding a reference to the buffer. Would buffers be freed
when
>> >> > > fetches for all the partitions in a response are parsed, but perhaps
>> >> not
>> >> > > yet returned to the user (i.e., is the memory freed when a
>> reference to
>> >> > the
>> >> > > response buffer is no longer required)? It will be good to document
>> the
>> >> > > (approximate) maximum memory requirement for the non-compressed
>> case.
>> >> > There
>> >> > > is data read from the socket, cached in the Fetcher and (as Radai
>> has
>> >> > > pointed out), the records still with the user application.
>> >> > >
>> >> > >
>> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <radai.rosenblatt@gmail.com>
>> >> > wrote:
>> >> > >
>> >> > >> +1 (non-binding).
>> >> > >>
>> >> > >> small nit pick - just because you returned a response to user
>> doesnt
>> >> > mean
>> >> > >> the memory id no longer used. for some cases the actual "point
of
>> >> > >> termination" may be the deserializer (really impl-dependant),
but
>> >> > >> generally, wouldnt it be "nice" to have an explicit dispose()
call
>> on
>> >> > >> responses (with the addition that getting the next batch of
data
>> from
>> >> a
>> >> > >> consumer automatically disposes the previous results)
>> >> > >>
>> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <ECOMAR@uk.ibm.com>
>> >> > wrote:
>> >> > >>
>> >> > >> > +1 (non binding)
>> >> > >> > --------------------------------------------------
>> >> > >> > Edoardo Comar
>> >> > >> > IBM MessageHub
>> >> > >> > ecomar@uk.ibm.com
>> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
>> >> > >> >
>> >> > >> > IBM United Kingdom Limited Registered in England and
Wales with
>> >> number
>> >> > >> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
>> >> Hants.
>> >> > >> PO6
>> >> > >> > 3AU
>> >> > >> >
>> >> > >> >
>> >> > >> >
>> >> > >> > From:   Mickael Maison <mickael.maison@gmail.com>
>> >> > >> > To:     dev@kafka.apache.org
>> >> > >> > Date:   05/12/2016 14:38
>> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory usage
in the
>> >> > consumer
>> >> > >> >
>> >> > >> >
>> >> > >> >
>> >> > >> > Hi all,
>> >> > >> >
>> >> > >> > I'd like to start the vote for KIP-81:
>> >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >> > >> >
>> >> > >> >
>> >> > >> > Thank you
>> >> > >> >
>> >> > >> >
>> >> > >> >
>> >> > >> >
>> >> > >> > Unless stated otherwise above:
>> >> > >> > IBM United Kingdom Limited - Registered in England and
Wales with
>> >> > number
>> >> > >> > 741598.
>> >> > >> > Registered office: PO Box 41, North Harbour, Portsmouth,
>> Hampshire
>> >> PO6
>> >> > >> 3AU
>> >> > >> >
>> >> > >>
>> >> > >
>> >> > >
>> >> > >
>> >> > > --
>> >> > > Regards,
>> >> > >
>> >> > > Rajini
>> >> >
>> >>
>>

Mime
View raw message