kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Roesler <vvcep...@apache.org>
Subject Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy
Date Mon, 17 Aug 2020 14:34:24 GMT
Hi Navinder,

I see what you mean about the global consumer being similar
to the restore consumer.

I also agree that automatically performing the recovery
steps should be strictly an improvement over the current
situation.

Also, yes, it would be a good idea to make it clear that the
global topic should be compacted in order to ensure correct
semantics. It's the same way with input topics for KTables;
we rely on users to ensure the topics are compacted, and if
they aren't, then the execution semantics will be broken.

Thanks,
-John

On Sun, 2020-08-16 at 11:44 +0000, Navinder Brar wrote:
> Hi John,
> 
> 
> 
> 
> 
> 
> 
> Thanks for your inputs. Since, global topics are in a way their own changelog, wouldn’t
the global consumers be more akin to restore consumers than the main consumer? 
> 
> 
> 
> 
> 
> 
> 
> I am also +1 on catching the exception and setting it to the earliest for now. Whenever
an instance starts, currently global stream thread(if available) goes to RUNNING before stream
threads are started so that means the global state is available when the processing by stream
threads start. So, with the new change of catching the exception, cleaning store and resetting
to earlier would probably be “stop the world” as you said John, as I think we will have
to pause the stream threads till the whole global state is recovered. I assume it is "stop
the world" right now as well, since now also if an InvalidOffsetException comes, we throw
streams exception and the user has to clean up and handle all this manually and when that
instance will start, it will restore global state first.
> 
> 
> 
> 
> 
> 
> 
> I had an additional thought to this whole problem, would it be helpful to educate the
users that global topics should have cleanup policy as compact, so that this invalid offset
exception never arises for them. Assume for example, that the cleanup policy in global topic
is "delete" and it has deleted k1, k2 keys(via retention.ms) although all the instances had
already consumed them so they are in all global stores and all other instances are up to date
on the global data(so no InvalidOffsetException). Now, a new instance is added to the cluster,
and we have already lost k1, k2 from the global topic so it will start consuming from the
earliest point in the global topic. So, wouldn’t this global store on the new instance has
2 keys less than all the other global stores already available in the cluster? Please let
me know if I am missing something. Thanks.
> 
> 
> 
> 
> 
> 
> 
> Regards,
> 
> Navinder
> 
> 
>     On Friday, 14 August, 2020, 10:03:42 am IST, John Roesler <vvcephei@apache.org>
wrote:  
>  
>  Hi all,
> 
> It seems like the main motivation for this proposal is satisfied if we just implement
some recovery mechanism instead of crashing. If the mechanism is going to be pausing all the
threads until the state is recovered, then it still seems like a big enough behavior change
to warrant a KIP still. 
> 
> I have to confess I’m a little unclear on why a custom reset policy for a global store,
table, or even consumer might be considered wrong. It’s clearly wrong for the restore consumer,
but the global consumer seems more semantically akin to the main consumer than the restore
consumer. 
> 
> In other words, if it’s wrong to reset a GlobalKTable from latest, shouldn’t it also
be wrong for a KTable, for exactly the same reason? It certainly seems like it would be an
odd choice, but I’ve seen many choices I thought were odd turn out to have perfectly reasonable
use cases. 
> 
> As far as the PAPI global store goes, I could see adding the option to configure it,
since as Matthias pointed out, there’s really no specific semantics for the PAPI. But if
automatic recovery is really all Navinder wanted, the I could also see deferring this until
someone specifically wants it.
> 
> So the tl;dr is, if we just want to catch the exception and rebuild the store by seeking
to earliest with no config or API changes, then I’m +1.
> 
> I’m wondering if we can improve on the “stop the world” effect of rebuilding the
global store, though. It seems like we could put our heads together and come up with a more
fine-grained approach to maintaining the right semantics during recovery while still making
some progress.  
> 
> Thanks,
> John
> 
> 
> On Sun, Aug 9, 2020, at 02:04, Navinder Brar wrote:
> > Hi Matthias,
> > 
> > IMHO, now as you explained using ‘global.consumer.auto.offset.reset’ is 
> > not as straightforward 
> > as it seems and it might change the existing behavior for users without 
> > they releasing it, I also 
> > 
> > think that we should change the behavior inside global stream thread to 
> > not die on 
> > 
> > InvalidOffsetException and instead clean and rebuild the state from the 
> > earliest. On this, as you 
> > 
> > mentioned that we would need to pause the stream threads till the 
> > global store is completely restored. 
> > 
> > Without it, there will be incorrect processing results if they are 
> > utilizing a global store during processing. 
> > 
> > 
> > 
> > So, basically we can divide the use-cases into 4 parts.
> >     
> >     - PAPI based global stores (will have the earliest hardcoded)
> >     - PAPI based state stores (already has auto.reset.config)
> >     - DSL based GlobalKTables (will have earliest hardcoded)
> >     - DSL based KTables (will continue with auto.reset.config)
> > 
> > 
> > 
> > So, this would mean that we are not changing any existing behaviors 
> > with this if I am right.
> > 
> > 
> > 
> > I guess we could improve the code to actually log a warning for this
> > 
> > case, similar to what we do for some configs already (cf
> > 
> > StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS).
> > 
> > > > I like this idea. In case we go ahead with the above approach and if we
can’t 
> > 
> > deprecate it, we should educate users that this config doesn’t work.
> > 
> > 
> > 
> > Looking forward to hearing thoughts from others as well.
> >   
> > 
> > - Navinder    On Tuesday, 4 August, 2020, 05:07:59 am IST, Matthias J. 
> > Sax <mjsax@apache.org> wrote:  
> >   
> >   Navinder,
> > 
> > thanks for updating the KIP. I think the motivation section is not
> > totally accurate (what is not your fault though, as the history of how
> > we handle this case is intertwined...) For example, "auto.offset.reset"
> > is hard-coded for the global consumer to "none" and using
> > "global.consumer.auto.offset.reset" has no effect (cf
> > https://kafka.apache.org/25/documentation/streams/developer-guide/config-streams.html#default-values)
> > 
> > Also, we could not even really deprecate the config as mentioned in
> > rejected alternatives sections, because we need `auto.offset.reset` for
> > the main consumer -- and adding a prefix is independent of it. Also,
> > because we ignore the config, it's is also deprecated/removed if you wish.
> > 
> > I guess we could improve the code to actually log a warning for this
> > case, similar to what we do for some configs already (cf
> > StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS).
> > 
> > 
> > The other question is about compatibility with regard to default
> > behavior: if we want to reintroduce `global.consumer.auto.offset.reset`
> > this basically implies that we need to respect `auto.offset.reset`, too.
> > Remember, that any config without prefix is applied to all clients that
> > support this config. Thus, if a user does not limit the scope of the
> > config to the main consumer (via `main.consumer.auto.offset.reset`) but
> > uses the non-prefix versions and sets it to "latest" (and relies on the
> > current behavior that `auto.offset.reset` is "none", and effectively
> > "earliest" on the global consumer), the user might end up with a
> > surprise as the global consumer behavior would switch from "earliest" to
> > "latest" (most likely unintentionally). Bottom line is, that users might
> > need to change configs to preserve the old behavior...
> > 
> > 
> > 
> > 
> > However, before we discuss those details, I think we should discuss the
> > topic in a broader context first:
> > 
> >   - for a GlobalKTable, does it even make sense from a correctness point
> > of view, to allow users to set a custom reset policy? It seems you
> > currently don't propose this in the KIP, but as you don't mention it
> > explicitly it's unclear if that on purpose of an oversight?
> > 
> >   - Should we treat global stores differently to GlobalKTables and allow
> > for more flexibility (as the PAPI does not really provide any semantic
> > contract). It seems that is what you propose in the KIP. We should
> > discuss if this flexibility does make sense or not for the PAPI, or if
> > we should apply the same reasoning about correctness we use for KTables
> > to global stores? To what extend are/should they be different?
> > 
> >   - If we support auto.offset.reset for global store, how should we
> > handle the initial bootstrapping of the store/table (that is hard-coded
> > atm)? Should we skip it if the policy is "latest" and start with an
> > empty state? Note that we did consider this behavior incorrect via
> > https://issues.apache.org/jira/browse/KAFKA-6121 and thus I am wondering
> > why should we change it back again?
> > 
> > 
> > Finally, the main motivation for the Jira ticket was to let the runtime
> > auto-recover instead of dying as it does currently. If we decide that a
> > custom reset policy does actually not make sense, we can just change the
> > global-thread to not die any longer on an `InvalidOffsetException` but
> > rebuild the state automatically. This would be "only" a behavior change
> > but does not require any public API changes. -- For this case, we should
> > also think about the synchronization with the main processing threads?
> > On startup we bootstrap the global stores before processing happens.
> > Thus, if an `InvalidOffsetException` happen and the global thread dies,
> > the main threads cannot access the global stores any longer an also die.
> > If we re-build the state though, do we need to pause the main thread
> > during this phase?
> > 
> > 
> > 
> > -Matthias
> > 
> > 
> > 
> > On 8/2/20 8:48 AM, Navinder Brar wrote:
> > > Hi John,
> > > 
> > > I have updated the KIP to make the motivation more clear. In a nutshell, we
will use the already existing config "global.consumer.auto.offset.reset" for users to set
a blanket reset policy for all global topics and add a new interface to set per-topic reset
policy for each global topic(for which we specifically need this KIP). There was a point raised
from Matthias above to always reset to earliest by cleaning the stores and seekToBeginning
in case of InvalidOffsetException. We can go with that route as well and I don't think it
would need a KIP as if we are not providing users an option to have blanket reset policy on
global topics, then a per-topic override would also not be required(the KIP is required basically
for that). Although, I think if users have an option to choose reset policy for StreamThread
then the option should be provided for GlobalStreamThread as well and if we don't want to
use the "global.consumer.auto.offset.reset" then we would need to deprecate it because currently
it's not serving any purpose. For now, I have added it in rejected alternatives but we can
discuss this.
> > > 
> > > On the query that I had for Guozhang, thanks to Matthias we have fixed it last
week as part of KAFKA-10306.
> > > 
> > > ~Navinder
> > >   
> > > 
> > >     On Sunday, 26 July, 2020, 07:37:34 pm IST, Navinder Brar <navinder_brar@yahoo.com.invalid>
wrote:  
> > >   
> > >   
> > > Hi,
> > > 
> > > 
> > > 
> > > Sorry, it took some time to respond back.
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > “but I thought we would pass the config through to the client.”
> > > 
> > > > > @John, sure we can use the config in GloablStreamThread, that could
be one of the way to solve it.
> > > 
> > > 
> > > 
> > > 
> > > 
> > > @Matthias, sure cleaning the store and recreating is one way but since we are
giving an option to reset in StreamThread why the implementation should be different in GlobalStreamThread.
I think we should use the global.consumer.auto.offset.reset config to accept the reset strategy
opted by the user although I would be ok with just cleaning and resetting to the latest as
well for now. Currently, we throw a StreamsException in case of InvalidOffsetException in
GlobalStreamThread so just resetting would still be better than what happens currently. 
> > > 
> > > Matthias, I found this comment in StreamBuilder for GlobalKTable ‘* Note
that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
regardless of the specified value in {@link StreamsConfig} or {@link Consumed}.’ 
> > > So, I guess we are already cleaning up and recreating for GlobalKTable from
earliest offset.
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > @Guozhan while looking at the code, I also noticed a TODO: pending in GlobalStateManagerImpl,
when InvalidOffsetException is thrown. Earlier, we were directly clearing the store here and
recreating from scratch but that code piece is removed now. Are you working on a follow-up
PR for this or just handling the reset in GlobalStreamThread should be sufficient?
> > > 
> > > Regards,
> > > Navinder
> > > 
> > >     On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias J. Sax <mjsax@apache.org>
wrote:  
> > >   
> > >   Atm, the config should be ignored and the global-consumer should use
> > > "none" in a hard-coded way.
> > > 
> > > However, if am still wondering if we actually want/need to allow users
> > > to specify the reset policy? It might be worth to consider, to just
> > > change the behavior: catch the exception, log an ERROR (for information
> > > purpose), wipe the store, seekToBeginning(), and recreate the store?
> > > 
> > > Btw: if we want to allow users to set the reset policy, this should be
> > > possible via the config, or via overwriting the config in the method
> > > itself. Thus, we would need to add the new overloaded method to
> > > `Topology` and `StreamsBuilder`.
> > > 
> > > Another question to ask: what about GlobalKTables? Should they behave
> > > the same? An alternative design could be, to allow users to specify a
> > > flexible reset policy for global-stores, but not for GlobalKTables and
> > > use the strategy suggested above for this case.
> > > 
> > > Thoughts?
> > > 
> > > 
> > > -Matthias
> > > 
> > > 
> > > On 7/2/20 2:14 PM, John Roesler wrote:
> > > > Hi Navinder,
> > > > 
> > > > Thanks for the response. I’m sorry if I’m being dense... You said
we are not currently using the config, but I thought we would pass the config through to the
client.  Can you confirm whether or not the existing config works for your use case?
> > > > 
> > > > Thanks,
> > > > John
> > > > 
> > > > On Sun, Jun 28, 2020, at 14:09, Navinder Brar wrote:
> > > > > Sorry my bad. Found it.
> > > > > 
> > > > > 
> > > > > 
> > > > > Prefix used to override {@link KafkaConsumer consumer} configs for
the 
> > > > > global consumer client from
> > > > > 
> > > > > * the general consumer client configs. The override precedence is
the 
> > > > > following (from highest to lowest precedence):
> > > > > * 1. global.consumer.[config-name]..
> > > > > public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";
> > > > > 
> > > > > 
> > > > > 
> > > > > So, that's great. We already have a config exposed to reset offsets
for 
> > > > > global topics via global.consumer.auto.offset.reset just that we
are 
> > > > > not actually using it inside GlobalStreamThread to reset.
> > > > > 
> > > > > -Navinder
> > > > >     On Monday, 29 June, 2020, 12:24:21 am IST, Navinder Brar 
> > > > > <navinder_brar@yahoo.com.invalid> wrote:  
> > > > >   
> > > > >   Hi John,
> > > > > 
> > > > > Thanks for your feedback. 
> > > > > 1. I think there is some confusion on my first point, the enum I
am 
> > > > > sure we can use the same one but the external config which controls
the 
> > > > > resetting in global stream thread either we can the same one which

> > > > > users use for source topics(StreamThread) or we can provide a new
one 
> > > > > which specifically controls global topics. For e.g. currently if
I get 
> > > > > an InvalidOffsetException in any of my source topics, I can choose

> > > > > whether to reset from Earliest or Latest(with auto.offset.reset).
Now 
> > > > > either we can use the same option and say if I get the same exception

> > > > > for global topics I will follow same resetting. Or some users might

> > > > > want to have totally different setting for both source and global

> > > > > topics, like for source topic I want resetting from Latest but for

> > > > > global topics I want resetting from Earliest so in that case adding
a 
> > > > > new config might be better.
> > > > > 
> > > > > 2. I couldn't find this config currently 
> > > > > "global.consumer.auto.offset.reset". Infact in GlobalStreamThread.java

> > > > > we are throwing a StreamsException for InvalidOffsetException and
there 
> > > > > is a test as 
> > > > > well GlobalStreamThreadTest#shouldDieOnInvalidOffsetException(),
so I 
> > > > > think this is the config we are trying to introduce with this KIP.
> > > > > 
> > > > > -Navinder  On Saturday, 27 June, 2020, 07:03:04 pm IST, John Roesler

> > > > > <john@vvcephei.org> wrote:  
> > > > >   
> > > > >   Hi Navinder,
> > > > > 
> > > > > Thanks for this proposal!
> > > > > 
> > > > > Regarding your question about whether to use the same policy
> > > > > enum or not, the underlying mechanism is the same, so I think
> > > > > we can just use the same AutoOffsetReset enum.
> > > > > 
> > > > > Can you confirm whether setting the reset policy config on the
> > > > > global consumer currently works or not? Based on my reading
> > > > > of StreamsConfig, it looks like it would be:
> > > > > "global.consumer.auto.offset.reset".
> > > > > 
> > > > > If that does work, would you still propose to augment the
> > > > > Java API?
> > > > > 
> > > > > Thanks,
> > > > > -John
> > > > > 
> > > > > On Fri, Jun 26, 2020, at 23:52, Navinder Brar wrote:
> > > > > > Hi,
> > > > > > 
> > > > > > KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy
> > > > > > 
> > > > > > I have taken over this KIP since it has been dormant for a long
time 
> > > > > > and this looks important for use-cases that have large global
data, so 
> > > > > > rebuilding global stores from scratch might seem overkill in
case of 
> > > > > > InvalidOffsetExecption.
> > > > > > 
> > > > > > We want to give users the control to use reset policy(as we
do in 
> > > > > > StreamThread) in case they hit invalid offsets. I have still
not 
> > > > > > decided whether to restrict this option to the same reset policy
being 
> > > > > > used by StreamThread(using auto.offset.reset config) or add
another 
> > > > > > reset config specifically for global stores 
> > > > > > "global.auto.offset.reset" which gives users more control to
choose 
> > > > > > separate policies for global and stream threads.
> > > > > > 
> > > > > > I would like to hear your opinions on the KIP.
> > > > > > 
> > > > > > 
> > > > > > -Navinder
> > >    
> > > 
> >  


Mime
View raw message