kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bill Bejeck <bbej...@gmail.com>
Subject Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback
Date Wed, 14 Jun 2017 21:39:10 GMT
Guozhang,

Thanks for the comments.

1.  As for the granularity, I agree that having one global
StateRestoreListener could be restrictive.  But I think it's important to
have a "setStateRestoreListener" on KafkaStreams as this allows users to
define an anonymous instance that has access to local scope for reporting
purposes.  This is a similar pattern we use for
KafkaStreams.setStateListener.

As an alternative, what if we add a method to
the BatchingStateRestoreCallback interface named
"getStateStoreListener".   Then
in an abstract adapter class we return null from getStateStoreListener.
But if users want to supply a different StateRestoreListener strategy per
callback they would simply override the method to return an actual
instance.

WDYT?

2.  I'll make the required updates to pass in the ending offset at the
start as well as the actual name of the state store.

Bill


On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Thanks Bill for the updated wiki. I have a couple of more comments:
>
> 1. Setting StateRestoreListener on the KafkaStreams granularity may not be
> sufficient, as in the listener callback we do not which store it is
> restoring right now: if the topic is a changelog topic then from the
> `TopicPartition` we may be able to infer the state store name, but if the
> topic is the source topic read as a KTable then we may not know which store
> it is restoring right now; plus forcing users to infer the state store name
> from the topic partition name would not be intuitive as well. Plus for
> different stores the listener may be implemented differently, and setting a
> global listener would force users to branch on the topic-partition names,
> similarly to what we did in the global timestamp extractor. On the other
> hand, I also agree that setting the listener on the per-store granularity
> may be a bit cumbersome since if users want to override it on a specific
> store it needs to expose some APIs maybe at StateStoreSupplier. So would
> love to hear other people's opinions.
>
> If we think that different implemented restoring callback may be less
> common, then I'd suggest at least replace the `TopicPartition` parameter
> with the `String` store name and the `TaskId`?
>
> 2. I think we can pass in the `long endOffset` in the `onRestoreStart`
> function as well, as we will have read the endOffset already by then;
> otherwise users can still not be able to track the restoration progress
> (e.g. how much percentage I have been restoring so far, to estimate on how
> long I still need to wait).
>
>
> Guozhang
>
>
>
> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <bbejeck@gmail.com> wrote:
>
> > Eno,
> >
> > Thanks for the comments.
> >
> > 1. As for having both restore and restoreAll, I kept the restore method
> for
> > backward compatibility as that is what is used by current implementing
> > classes. However as I think about it makes things cleaner to have a
> single
> > restore method taking a collection. I'll wait for others to weigh in, but
> > I'm leaning towards having a single restore method.
> >
> > 2. The "onBatchRestored" method is for keeping track of the restore
> process
> > as we load records from each poll request.
> >
> >    For example if the change log contained 5000 records and
> > MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method would get
> > called 5 times each time with the ending offset of the last record in the
> > batch and the count    of the batch.   I'll update the KIP to add
> comments
> > above the interface methods.
> >
> >
> > Thanks,
> > Bill
> >
> >
> > On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <eno.thereska@gmail.com>
> > wrote:
> >
> > > Thanks Bill,
> > >
> > > A couple of questions:
> > >
> > >
> > 1. why do we need both restore and restoreAll, why can't we just have
> one,
> > > that takes a collection (i.e., restore all)? Are there cases when
> people
> > > want to restore one at a time? In that case, they could still use
> > > restoreAll with just 1 record in the collection right?
> > >
> > > 2. I don't quite get "onBatchRestored". Could you put a small comment
> on
> > > top of all three methods. An example might help here.
> > >
> > > Thanks
> > > Eno
> > >
> > >
> > > > On 8 Jun 2017, at 18:05, Bill Bejeck <bbejeck@gmail.com> wrote:
> > > >
> > > > Guozhang, Damian thanks for the comments.
> > > >
> > > > Giving developers the ability to hook into StateStore recovery phases
> > was
> > > > part of my original intent. However the state the KIP is in now won't
> > > > provide this functionality.
> > > >
> > > > As a result I'll be doing a significant revision of KIP-167.  I'll be
> > > sure
> > > > to incorporate all your comments in the new revision.
> > > >
> > > > Thanks,
> > > > Bill
> > > >
> > > > On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <damian.guy@gmail.com>
> > wrote:
> > > >
> > > >> I'm largely in agreement with what Guozhang has suggested, i.e.,
> > > >> StateRestoreContext shouldn't have any setters on it and also need
> to
> > > have
> > > >> the end offset available such that people can use it derive
> progress.
> > > >> Slightly different, maybe the StateRestoreContext interface could
> be:
> > > >>
> > > >> long beginOffset()
> > > >> long endOffset()
> > > >> long currentOffset()
> > > >>
> > > >> One further thing, this currently doesn't provide developers the
> > > ability to
> > > >> hook into this information if they are using the built-in
> StateStores.
> > > Is
> > > >> this something we should be considering?
> > > >>
> > > >>
> > > >> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wangguoz@gmail.com>
> wrote:
> > > >>
> > > >>> Thanks for the updated KIP Bill, I have a couple of comments:
> > > >>>
> > > >>> 1) I'm assuming beginRestore / endRestore is called only once
per
> > store
> > > >>> throughout the whole restoration process, and restoreAll is called
> > per
> > > >>> batch. In that case I feel we can set the StateRestoreContext
as a
> > > second
> > > >>> parameter in restoreAll and in endRestore as well, and let the
> > library
> > > to
> > > >>> set the corresponding values instead and only let users to read
> > (since
> > > >> the
> > > >>> collection of key-value pairs do not contain offset information
> > anyways
> > > >>> users cannot really set the offset). The "lastOffsetRestored"
would
> > be
> > > >> the
> > > >>> starting offset when called on `beginRestore`.
> > > >>>
> > > >>> 2) Users who wants to implement their own batch restoration
> callbacks
> > > >> would
> > > >>> now need to implement both `restore` and `restoreAll` while they
> > either
> > > >> let
> > > >>> `restoreAll` to call `restore` or implement the logic in
> `restoreAll`
> > > >> only
> > > >>> and never call `restore`. Maybe we can provide two abstract impl
of
> > > >>> BatchingStateRestoreCallbacks which does beginRestore / endRestore
> as
> > > >>> no-ops, with one callback implementing `restoreAll` to call
> abstract
> > > >>> `restore` while the other implement `restore` to throw "not
> supported
> > > >>> exception" and keep `restoreAll` abstract.
> > > >>>
> > > >>> 3) I think we can also return the "offset limit" in
> > > StateRestoreContext,
> > > >>> which is important for users to track the restoration progress
> since
> > > >>> otherwise they could not tell how many percent of restoration
has
> > > >>> completed.  I.e.:
> > > >>>
> > > >>> public interface BatchingStateRestoreCallback extends
> > > >> StateRestoreCallback
> > > >>> {
> > > >>>
> > > >>>   void restoreAll(Collection<KeyValue<byte[], byte []>>
records,
> > > >>> StateRestoreContext
> > > >>> restoreContext);
> > > >>>
> > > >>>   void beginRestore(StateRestoreContext restoreContext);
> > > >>>
> > > >>>   void endRestore(StateRestoreContext restoreContext);
> > > >>> }
> > > >>>
> > > >>> public interface StateRestoreContext {
> > > >>>
> > > >>>  long lastOffsetRestored();
> > > >>>
> > > >>>  long endOffsetToRestore();
> > > >>>
> > > >>>  int numberRestored();
> > > >>> }
> > > >>>
> > > >>>
> > > >>> Guozhang
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <bbejeck@gmail.com>
> > wrote:
> > > >>>
> > > >>>> Guozhang, Matthias,
> > > >>>>
> > > >>>> Thanks for the comments.  I have updated the KIP, (JIRA title
and
> > > >>>> description as well).
> > > >>>>
> > > >>>> I had thought about introducing a separate interface altogether,
> but
> > > >>>> extending the current one makes more sense.
> > > >>>>
> > > >>>> As for intermediate callbacks based on time or number of records,
> I
> > > >> think
> > > >>>> the latest update to the KIP addresses this point of querying
for
> > > >>>> intermediate results, but it would be per batch restored.
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Bill
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <jim@jagunet.com>
> > > wrote:
> > > >>>>
> > > >>>>>
> > > >>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
> > > >> matthias@confluent.io>
> > > >>>>> wrote:
> > > >>>>>>
> > > >>>>>> With regard to backward compatibility, we should not
change the
> > > >>> current
> > > >>>>>> interface, but add a new interface that extends the
current one.
> > > >>>>>>
> > > >>>>>
> > > >>>>> ++1
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> -- Guozhang
> > > >>>
> > > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message