kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Becket Qin <becket....@gmail.com>
Subject Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability
Date Sun, 03 Dec 2017 07:21:57 GMT
Thanks for the explanation, Colin. A few more questions.

>The session epoch is not complex.  It's just a number which increments
>on each incremental fetch.  The session epoch is also useful for
>debugging-- it allows you to match up requests and responses when
>looking at log files.

Currently each request in Kafka has a correlation id to help match the
requests and responses. Is epoch doing something differently?

>Unfortunately, this doesn't work.  Imagine the client misses an
>increment fetch response about a partition.  And then the partition is
>never updated after that.  The client has no way to know about the
>partition, since it won't be included in any future incremental fetch
>responses.  And there are no offsets to compare, since the partition is
>simply omitted from the response.

I am curious about in which situation would the follower miss a response of
a partition. If the entire FetchResponse is lost (e.g. timeout), the
follower would disconnect and retry. That will result in sending a full
FetchRequest. If there is an error such as NotLeaderForPartition is
returned for some partitions, the follower can always send a full
FetchRequest. Is there a scenario that only some of the partitions in a
FetchResponse is lost?

Thanks,

Jiangjie (Becket) Qin


On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe <cmccabe@apache.org> wrote:

> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote:
> > On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe <cmccabe@apache.org>
> wrote:
> >
> > > On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
> > > > Hey Colin,
> > > >
> > > > Thanks much for the update. I have a few questions below:
> > > >
> > > > 1. I am not very sure that we need Fetch Session Epoch. It seems that
> > > > Fetch
> > > > Session Epoch is only needed to help leader distinguish between "a
> full
> > > > fetch request" and "a full fetch request and request a new
> incremental
> > > > fetch session". Alternatively, follower can also indicate "a full
> fetch
> > > > request and request a new incremental fetch session" by setting Fetch
> > > > Session ID to -1 without using Fetch Session Epoch. Does this make
> sense?
> > >
> > > Hi Dong,
> > >
> > > The fetch session epoch is very important for ensuring correctness.  It
> > > prevents corrupted or incomplete fetch data due to network reordering
> or
> > > loss.
> > >
> > > For example, consider a scenario where the follower sends a fetch
> > > request to the leader.  The leader responds, but the response is lost
> > > because of network problems which affected the TCP session.  In that
> > > case, the follower must establish a new TCP session and re-send the
> > > incremental fetch request.  But the leader does not know that the
> > > follower didn't receive the previous incremental fetch response.  It is
> > > only the incremental fetch epoch which lets the leader know that it
> > > needs to resend that data, and not data which comes afterwards.
> > >
> > > You could construct similar scenarios with message reordering,
> > > duplication, etc.  Basically, this is a stateful protocol on an
> > > unreliable network, and you need to know whether the follower got the
> > > previous data you sent before you move on.  And you need to handle
> > > issues like duplicated or delayed requests.  These issues do not affect
> > > the full fetch request, because it is not stateful-- any full fetch
> > > request can be understood and properly responded to in isolation.
> > >
> >
> > Thanks for the explanation. This makes sense. On the other hand I would
> > be interested in learning more about whether Becket's solution can help
> > simplify the protocol by not having the echo field and whether that is
> > worth doing.
>
> Hi Dong,
>
> I commented about this in the other thread.  A solution which doesn't
> maintain session information doesn't work here.
>
> >
> >
> >
> > >
> > > >
> > > > 2. It is said that Incremental FetchRequest will include partitions
> whose
> > > > fetch offset or maximum number of fetch bytes has been changed. If
> > > > follower's logStartOffet of a partition has changed, should this
> > > > partition also be included in the next FetchRequest to the leader?
> > > Otherwise, it
> > > > may affect the handling of DeleteRecordsRequest because leader may
> not
> > > know
> > > > the corresponding data has been deleted on the follower.
> > >
> > > Yeah, the follower should include the partition if the logStartOffset
> > > has changed.  That should be spelled out on the KIP.  Fixed.
> > >
> > > >
> > > > 3. In the section "Per-Partition Data", a partition is not considered
> > > > dirty if its log start offset has changed. Later in the section
> > > "FetchRequest
> > > > Changes", it is said that incremental fetch responses will include a
> > > > partition if its logStartOffset has changed. It seems inconsistent.
> Can
> > > > you update the KIP to clarify it?
> > > >
> > >
> > > In the "Per-Partition Data" section, it does say that logStartOffset
> > > changes make a partition dirty, though, right?  The first bullet point
> > > is:
> > >
> > > > * The LogCleaner deletes messages, and this changes the log start
> offset
> > > of the partition on the leader., or
> > >
> >
> > Ah I see. I think I didn't notice this because statement assumes that the
> > LogStartOffset in the leader only changes due to LogCleaner. In fact the
> > LogStartOffset can change on the leader due to either log retention and
> > DeleteRecordsRequest. I haven't verified whether LogCleaner can change
> > LogStartOffset though. It may be a bit better to just say that a
> > partition is considered dirty if LogStartOffset changes.
>
> I agree.  It should be straightforward to just resend the partition if
> logStartOffset changes.
>
> >
> >
> > >
> > > > 4. In "Fetch Session Caching" section, it is said that each broker
> has a
> > > > limited number of slots. How is this number determined? Does this
> require
> > > > a new broker config for this number?
> > >
> > > Good point.  I added two broker configuration parameters to control
> this
> > > number.
> > >
> >
> > I am curious to see whether we can avoid some of these new configs. For
> > example, incremental.fetch.session.cache.slots.per.broker is probably
> not
> > necessary because if a leader knows that a FetchRequest comes from a
> > follower, we probably want the leader to always cache the information
> > from that follower. Does this make sense?
>
> Yeah, maybe we can avoid having
> incremental.fetch.session.cache.slots.per.broker.
>
> >
> > Maybe we can discuss the config later after there is agreement on how the
> > protocol would look like.
> >
> >
> > >
> > > > What is the error code if broker does
> > > > not have new log for the incoming FetchRequest?
> > >
> > > Hmm, is there a typo in this question?  Maybe you meant to ask what
> > > happens if there is no new cache slot for the incoming FetchRequest?
> > > That's not an error-- the incremental fetch session ID just gets set to
> > > 0, indicating no incremental fetch session was created.
> > >
> >
> > Yeah there is a typo. You have answered my question.
> >
> >
> > >
> > > >
> > > > 5. Can you clarify what happens if follower adds a partition to the
> > > > ReplicaFetcherThread after receiving LeaderAndIsrRequest? Does leader
> > > > needs to generate a new session for this ReplicaFetcherThread or
> does it
> > > re-use
> > > > the existing session?  If it uses a new session, is the old session
> > > > actively deleted from the slot?
> > >
> > > The basic idea is that you can't make changes, except by sending a full
> > > fetch request.  However, perhaps we can allow the client to re-use its
> > > existing session ID.  If the client sets sessionId = id, epoch = 0, it
> > > could re-initialize the session.
> > >
> >
> > Yeah I agree with the basic idea. We probably want to understand more
> > detail about how this works later.
>
> Sounds good.  I updated the KIP with this information.  A
> re-initialization should be exactly the same as an initialization,
> except that it reuses an existing ID.
>
> best,
> Colin
>
>
> >
> >
> > >
> > > >
> > > >
> > > > BTW, I think it may be useful if the KIP can include the example
> workflow
> > > > of how this feature will be used in case of partition change and so
> on.
> > >
> > > Yeah, that might help.
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > > On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe <cmccabe@apache.org>
> > > > wrote:
> > > >
> > > > > I updated the KIP with the ideas we've been discussing.
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > > On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote:
> > > > > > On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote:
> > > > > > > Hi Colin, thank you  for this KIP, it can become a really
> useful
> > > thing.
> > > > > > >
> > > > > > > I just scanned through the discussion so far and wanted
to
> start a
> > > > > > > thread to make as decision about keeping the
> > > > > > > cache with the Connection / Session or having some sort
of UUID
> > > indN
> > > > > exed
> > > > > > > global Map.
> > > > > > >
> > > > > > > Sorry if that has been settled already and I missed it.
In this
> > > case
> > > > > > > could anyone point me to the discussion?
> > > > > >
> > > > > > Hi Jan,
> > > > > >
> > > > > > I don't think anyone has discussed the idea of tying the cache
> to an
> > > > > > individual TCP session yet.  I agree that since the cache is
> > > intended to
> > > > > > be used only by a single follower or client, it's an interesting
> > > thing
> > > > > > to think about.
> > > > > >
> > > > > > I guess the obvious disadvantage is that whenever your TCP
> session
> > > > > > drops, you have to make a full fetch request rather than an
> > > incremental
> > > > > > one.  It's not clear to me how often this happens in practice
--
> it
> > > > > > probably depends a lot on the quality of the network.  From
a
> code
> > > > > > perspective, it might also be a bit difficult to access data
> > > associated
> > > > > > with the Session from classes like KafkaApis (although we could
> > > refactor
> > > > > > it to make this easier).
> > > > > >
> > > > > > It's also clear that even if we tie the cache to the session,
we
> > > still
> > > > > > have to have limits on the number of caches we're willing to
> create.
> > > > > > And probably we should reserve some cache slots for each
> follower, so
> > > > > > that clients don't take all of them.
> > > > > >
> > > > > > >
> > > > > > > Id rather see a protocol in which the client is hinting
the
> broker
> > > > > that,
> > > > > > > he is going to use the feature instead of a client
> > > > > > > realizing that the broker just offered the feature (regardless
> of
> > > > > > > protocol version which should only indicate that the feature
> > > > > > > would be usable).
> > > > > >
> > > > > > Hmm.  I'm not sure what you mean by "hinting."  I do think that
> the
> > > > > > server should have the option of not accepting incremental
> requests
> > > from
> > > > > > specific clients, in order to save memory space.
> > > > > >
> > > > > > > This seems to work better with a per
> > > > > > > connection/session attached Metadata than with a Map and
could
> > > allow
> > > > > for
> > > > > > > easier client implementations.
> > > > > > > It would also make Client-side code easier as there wouldn't
> be any
> > > > > > > Cache-miss error Messages to handle.
> > > > > >
> > > > > > It is nice not to have to handle cache-miss responses, I agree.
> > > > > > However, TCP sessions aren't exposed to most of our client-side
> code.
> > > > > > For example, when the Producer creates a message and hands it
> off to
> > > the
> > > > > > NetworkClient, the NC will transparently re-connect and re-send
a
> > > > > > message if the first send failed.  The higher-level code will
> not be
> > > > > > informed about whether the TCP session was re-established,
> whether an
> > > > > > existing TCP session was used, and so on.  So overall I would
> still
> > > lean
> > > > > > towards not coupling this to the TCP session...
> > > > > >
> > > > > > best,
> > > > > > Colin
> > > > > >
> > > > > > >
> > > > > > >   Thank you again for the KIP. And again, if this was clarified
> > > already
> > > > > > > please drop me a hint where I could read about it.
> > > > > > >
> > > > > > > Best Jan
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On 21.11.2017 22:02, Colin McCabe wrote:
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I created a KIP to improve the scalability and latency
of
> > > > > FetchRequest:
> > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
> > > > > Partition+Scalability
> > > > > > > >
> > > > > > > > Please take a look.
> > > > > > > >
> > > > > > > > cheers,
> > > > > > > > Colin
> > > > > > >
> > > > >
> > >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message