kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback
Date Thu, 22 Jun 2017 02:13:12 GMT
Bill,

I'm wondering why we need the `StateRestoreNotification` while still having
`StateRestoreListener`, could the above setup achievable just with
`StateRestoreListener.onRestoreStart / onRestoreEnd`? I.e. it seems the
later can subsume any use cases intended for the former API.

Guozhang

On Mon, Jun 19, 2017 at 3:23 PM, Bill Bejeck <bbejeck@gmail.com> wrote:

> I'm going to update the KIP with new interface StateRestoreNotification
> containing two methods, startRestore and endRestore.
>
> While naming is very similar to methods already proposed on the
> StateRestoreListener, the intent of these methods is not for user
> notification of restore status.  Instead these new methods are for internal
> use by the state store to perform any required setup and teardown work due
> to a batch restoration process.
>
> Here's one current use case: when using RocksDB we should optimize for a
> bulk load by setting Options.prepareForBulkload().
>
>    1. If the database has already been opened, we'll need to close it, set
>    the "prepareForBulkload" and re-open the database.
>    2. Once the restore is completed we'll need to close and re-open the
>    database with the "prepareForBulkload" option turned off.
>
> While we are mentioning the RocksDB use case above, the addition of this
> interface is not specific to any specific implementation of a persistent
> state store.
>
> Additionally, a separate interface is needed so that any user can implement
> the state restore notification feature regardless of the state restore
> callback used.
>
> I'll also remove the "getStateRestoreListener" method and stick with the
> notion of a "global" restore listener for now.
>
> On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bbejeck@gmail.com> wrote:
>
> > Yes it is, more of an oversight on my part, I'll remove it from the KIP.
> >
> >
> > On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax <matthias@confluent.io
> >
> > wrote:
> >
> >> Hi,
> >>
> >> I thinks for now it's good enough to start with a single global restore
> >> listener. We can incrementally improve this later on if required. Of
> >> course, if it's easy to do right away we can also be more fine grained.
> >> But for KTable, we might want to add this after getting rid of all the
> >> overloads we have atm.
> >>
> >> One question: what is the purpose of parameter "endOffset" in
> >> #onRestoreEnd() -- isn't this the same value as provided in
> >> #onRestoreStart() ?
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 6/15/17 6:18 AM, Bill Bejeck wrote:
> >> > Thinking about the custom StateRestoreListener approach and having a
> get
> >> > method on the interface will really only work for custom state stores.
> >> >
> >> > So we'll need to provide another way for users to set behavior with
> >> > provided state stores.  The only option that comes to mind now is also
> >> > adding a parameter to the StateStoreSupplier.
> >> >
> >> >
> >> > Bill
> >> >
> >> >
> >> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bbejeck@gmail.com>
> wrote:
> >> >
> >> >> Guozhang,
> >> >>
> >> >> Thanks for the comments.
> >> >>
> >> >> 1.  As for the granularity, I agree that having one global
> >> >> StateRestoreListener could be restrictive.  But I think it's
> important
> >> to
> >> >> have a "setStateRestoreListener" on KafkaStreams as this allows users
> >> to
> >> >> define an anonymous instance that has access to local scope for
> >> reporting
> >> >> purposes.  This is a similar pattern we use for
> >> >> KafkaStreams.setStateListener.
> >> >>
> >> >> As an alternative, what if we add a method to the
> >> BatchingStateRestoreCallback
> >> >> interface named "getStateStoreListener".   Then in an abstract
> adapter
> >> >> class we return null from getStateStoreListener.   But if users want
> to
> >> >> supply a different StateRestoreListener strategy per callback they
> >> would
> >> >> simply override the method to return an actual instance.
> >> >>
> >> >> WDYT?
> >> >>
> >> >> 2.  I'll make the required updates to pass in the ending offset at
> the
> >> >> start as well as the actual name of the state store.
> >> >>
> >> >> Bill
> >> >>
> >> >>
> >> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <wangguoz@gmail.com>
> >> wrote:
> >> >>
> >> >>> Thanks Bill for the updated wiki. I have a couple of more comments:
> >> >>>
> >> >>> 1. Setting StateRestoreListener on the KafkaStreams granularity
may
> >> not be
> >> >>> sufficient, as in the listener callback we do not which store it
is
> >> >>> restoring right now: if the topic is a changelog topic then from
the
> >> >>> `TopicPartition` we may be able to infer the state store name,
but
> if
> >> the
> >> >>> topic is the source topic read as a KTable then we may not know
> which
> >> >>> store
> >> >>> it is restoring right now; plus forcing users to infer the state
> store
> >> >>> name
> >> >>> from the topic partition name would not be intuitive as well. Plus
> for
> >> >>> different stores the listener may be implemented differently, and
> >> setting
> >> >>> a
> >> >>> global listener would force users to branch on the topic-partition
> >> names,
> >> >>> similarly to what we did in the global timestamp extractor. On
the
> >> other
> >> >>> hand, I also agree that setting the listener on the per-store
> >> granularity
> >> >>> may be a bit cumbersome since if users want to override it on a
> >> specific
> >> >>> store it needs to expose some APIs maybe at StateStoreSupplier.
So
> >> would
> >> >>> love to hear other people's opinions.
> >> >>>
> >> >>> If we think that different implemented restoring callback may be
> less
> >> >>> common, then I'd suggest at least replace the `TopicPartition`
> >> parameter
> >> >>> with the `String` store name and the `TaskId`?
> >> >>>
> >> >>> 2. I think we can pass in the `long endOffset` in the
> `onRestoreStart`
> >> >>> function as well, as we will have read the endOffset already by
> then;
> >> >>> otherwise users can still not be able to track the restoration
> >> progress
> >> >>> (e.g. how much percentage I have been restoring so far, to estimate
> >> on how
> >> >>> long I still need to wait).
> >> >>>
> >> >>>
> >> >>> Guozhang
> >> >>>
> >> >>>
> >> >>>
> >> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <bbejeck@gmail.com>
> >> wrote:
> >> >>>
> >> >>>> Eno,
> >> >>>>
> >> >>>> Thanks for the comments.
> >> >>>>
> >> >>>> 1. As for having both restore and restoreAll, I kept the restore
> >> method
> >> >>> for
> >> >>>> backward compatibility as that is what is used by current
> >> implementing
> >> >>>> classes. However as I think about it makes things cleaner to
have a
> >> >>> single
> >> >>>> restore method taking a collection. I'll wait for others to
weigh
> in,
> >> >>> but
> >> >>>> I'm leaning towards having a single restore method.
> >> >>>>
> >> >>>> 2. The "onBatchRestored" method is for keeping track of the
restore
> >> >>> process
> >> >>>> as we load records from each poll request.
> >> >>>>
> >> >>>>    For example if the change log contained 5000 records and
> >> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method
would
> >> get
> >> >>>> called 5 times each time with the ending offset of the last
record
> in
> >> >>> the
> >> >>>> batch and the count    of the batch.   I'll update the KIP
to add
> >> >>> comments
> >> >>>> above the interface methods.
> >> >>>>
> >> >>>>
> >> >>>> Thanks,
> >> >>>> Bill
> >> >>>>
> >> >>>>
> >> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <
> >> eno.thereska@gmail.com>
> >> >>>> wrote:
> >> >>>>
> >> >>>>> Thanks Bill,
> >> >>>>>
> >> >>>>> A couple of questions:
> >> >>>>>
> >> >>>>>
> >> >>>> 1. why do we need both restore and restoreAll, why can't we
just
> have
> >> >>> one,
> >> >>>>> that takes a collection (i.e., restore all)? Are there
cases when
> >> >>> people
> >> >>>>> want to restore one at a time? In that case, they could
still use
> >> >>>>> restoreAll with just 1 record in the collection right?
> >> >>>>>
> >> >>>>> 2. I don't quite get "onBatchRestored". Could you put a
small
> >> comment
> >> >>> on
> >> >>>>> top of all three methods. An example might help here.
> >> >>>>>
> >> >>>>> Thanks
> >> >>>>> Eno
> >> >>>>>
> >> >>>>>
> >> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bbejeck@gmail.com>
wrote:
> >> >>>>>>
> >> >>>>>> Guozhang, Damian thanks for the comments.
> >> >>>>>>
> >> >>>>>> Giving developers the ability to hook into StateStore
recovery
> >> >>> phases
> >> >>>> was
> >> >>>>>> part of my original intent. However the state the KIP
is in now
> >> >>> won't
> >> >>>>>> provide this functionality.
> >> >>>>>>
> >> >>>>>> As a result I'll be doing a significant revision of
KIP-167.
> I'll
> >> >>> be
> >> >>>>> sure
> >> >>>>>> to incorporate all your comments in the new revision.
> >> >>>>>>
> >> >>>>>> Thanks,
> >> >>>>>> Bill
> >> >>>>>>
> >> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <damian.guy@gmail.com
> >
> >> >>>> wrote:
> >> >>>>>>
> >> >>>>>>> I'm largely in agreement with what Guozhang has
suggested, i.e.,
> >> >>>>>>> StateRestoreContext shouldn't have any setters
on it and also
> need
> >> >>> to
> >> >>>>> have
> >> >>>>>>> the end offset available such that people can use
it derive
> >> >>> progress.
> >> >>>>>>> Slightly different, maybe the StateRestoreContext
interface
> could
> >> >>> be:
> >> >>>>>>>
> >> >>>>>>> long beginOffset()
> >> >>>>>>> long endOffset()
> >> >>>>>>> long currentOffset()
> >> >>>>>>>
> >> >>>>>>> One further thing, this currently doesn't provide
developers the
> >> >>>>> ability to
> >> >>>>>>> hook into this information if they are using the
built-in
> >> >>> StateStores.
> >> >>>>> Is
> >> >>>>>>> this something we should be considering?
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wangguoz@gmail.com>
> >> >>> wrote:
> >> >>>>>>>
> >> >>>>>>>> Thanks for the updated KIP Bill, I have a couple
of comments:
> >> >>>>>>>>
> >> >>>>>>>> 1) I'm assuming beginRestore / endRestore is
called only once
> per
> >> >>>> store
> >> >>>>>>>> throughout the whole restoration process, and
restoreAll is
> >> called
> >> >>>> per
> >> >>>>>>>> batch. In that case I feel we can set the StateRestoreContext
> as
> >> a
> >> >>>>> second
> >> >>>>>>>> parameter in restoreAll and in endRestore as
well, and let the
> >> >>>> library
> >> >>>>> to
> >> >>>>>>>> set the corresponding values instead and only
let users to read
> >> >>>> (since
> >> >>>>>>> the
> >> >>>>>>>> collection of key-value pairs do not contain
offset information
> >> >>>> anyways
> >> >>>>>>>> users cannot really set the offset). The "lastOffsetRestored"
> >> >>> would
> >> >>>> be
> >> >>>>>>> the
> >> >>>>>>>> starting offset when called on `beginRestore`.
> >> >>>>>>>>
> >> >>>>>>>> 2) Users who wants to implement their own batch
restoration
> >> >>> callbacks
> >> >>>>>>> would
> >> >>>>>>>> now need to implement both `restore` and `restoreAll`
while
> they
> >> >>>> either
> >> >>>>>>> let
> >> >>>>>>>> `restoreAll` to call `restore` or implement
the logic in
> >> >>> `restoreAll`
> >> >>>>>>> only
> >> >>>>>>>> and never call `restore`. Maybe we can provide
two abstract
> impl
> >> >>> of
> >> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore
/
> >> >>> endRestore as
> >> >>>>>>>> no-ops, with one callback implementing `restoreAll`
to call
> >> >>> abstract
> >> >>>>>>>> `restore` while the other implement `restore`
to throw "not
> >> >>> supported
> >> >>>>>>>> exception" and keep `restoreAll` abstract.
> >> >>>>>>>>
> >> >>>>>>>> 3) I think we can also return the "offset limit"
in
> >> >>>>> StateRestoreContext,
> >> >>>>>>>> which is important for users to track the restoration
progress
> >> >>> since
> >> >>>>>>>> otherwise they could not tell how many percent
of restoration
> has
> >> >>>>>>>> completed.  I.e.:
> >> >>>>>>>>
> >> >>>>>>>> public interface BatchingStateRestoreCallback
extends
> >> >>>>>>> StateRestoreCallback
> >> >>>>>>>> {
> >> >>>>>>>>
> >> >>>>>>>>   void restoreAll(Collection<KeyValue<byte[],
byte []>>
> records,
> >> >>>>>>>> StateRestoreContext
> >> >>>>>>>> restoreContext);
> >> >>>>>>>>
> >> >>>>>>>>   void beginRestore(StateRestoreContext restoreContext);
> >> >>>>>>>>
> >> >>>>>>>>   void endRestore(StateRestoreContext restoreContext);
> >> >>>>>>>> }
> >> >>>>>>>>
> >> >>>>>>>> public interface StateRestoreContext {
> >> >>>>>>>>
> >> >>>>>>>>  long lastOffsetRestored();
> >> >>>>>>>>
> >> >>>>>>>>  long endOffsetToRestore();
> >> >>>>>>>>
> >> >>>>>>>>  int numberRestored();
> >> >>>>>>>> }
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> Guozhang
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck
<bbejeck@gmail.com
> >
> >> >>>> wrote:
> >> >>>>>>>>
> >> >>>>>>>>> Guozhang, Matthias,
> >> >>>>>>>>>
> >> >>>>>>>>> Thanks for the comments.  I have updated
the KIP, (JIRA title
> >> and
> >> >>>>>>>>> description as well).
> >> >>>>>>>>>
> >> >>>>>>>>> I had thought about introducing a separate
interface
> altogether,
> >> >>> but
> >> >>>>>>>>> extending the current one makes more sense.
> >> >>>>>>>>>
> >> >>>>>>>>> As for intermediate callbacks based on
time or number of
> >> >>> records, I
> >> >>>>>>> think
> >> >>>>>>>>> the latest update to the KIP addresses
this point of querying
> >> for
> >> >>>>>>>>> intermediate results, but it would be per
batch restored.
> >> >>>>>>>>>
> >> >>>>>>>>> Thanks,
> >> >>>>>>>>> Bill
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski
<
> jim@jagunet.com>
> >> >>>>> wrote:
> >> >>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias
J. Sax <
> >> >>>>>>> matthias@confluent.io>
> >> >>>>>>>>>> wrote:
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> With regard to backward compatibility,
we should not change
> >> the
> >> >>>>>>>> current
> >> >>>>>>>>>>> interface, but add a new interface
that extends the current
> >> >>> one.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> ++1
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> --
> >> >>>>>>>> -- Guozhang
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> -- Guozhang
> >> >>>
> >> >>
> >> >>
> >> >
> >>
> >>
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message