kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <mj...@apache.org>
Subject Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy
Date Thu, 03 Sep 2020 01:28:25 GMT
Thanks for the input Sophie. Those are all good points and I fully agree
with them.

When saying "pausing the processing threads" I only considered them in
`RUNNING` and thought we figure out the detail on the PR... Excellent catch!

Changing state transitions is to some extend backward incompatible, but
I think (IIRC) we did it in the past and I personally tend to find it
ok. That's why we cover those changes in a KIP.

-Matthias

On 9/2/20 6:18 PM, Sophie Blee-Goldman wrote:
> If we're going to add a new GLOBAL_RESTORING state to the KafkaStreams FSM,
> maybe it would make sense to add a new plain RESTORING state that we
> transition
> to when restoring non-global state stores following a rebalance. Right now
> all restoration
> occurs within the REBALANCING state, which is pretty misleading.
> Applications that
> have large amounts of state to restore will appear to be stuck rebalancing
> according to
> the state listener, when in fact the rebalance has completed long ago.
> Given that there
> are very much real scenarios where you actually *are *stuck rebalancing, it
> seems useful to
> distinguish plain restoration from more insidious cases that may require
> investigation and/or
> intervention.
> 
> I don't mean to hijack this KIP, I just think it would be odd to introduce
> GLOBAL_RESTORING
> when there is no other kind of RESTORING state. One question this brings
> up, and I
> apologize if this has already been addressed, is what to do when we are
> restoring
> both normal and global state stores? It sounds like we plan to pause the
> StreamThreads
> entirely, but there doesn't seem to be any reason not to allow regular
> state restoration -- or
> even standby processing -- while the global state is restoring.Given the
> current effort to move
> restoration & standbys to a separate thread, allowing them to continue
> while pausing
> only the StreamThread seems quite natural.
> 
> Assuming that we actually do allow both types of restoration to occur at
> the same time,
> and if we did add a plain RESTORING state as well, which state should we
> end up in?
> AFAICT the main reason for having a distinct {GLOBAL_}RESTORING state is to
> alert
> users of the non-progress of their active tasks. In both cases, the active
> task is unable
> to continue until restoration has complete, so why distinguish between the
> two at all?
> Would it make sense to avoid a special GLOBAL_RESTORING state and just
> introduce
> a single unified RESTORING state to cover both the regular and global case?
> Just a thought
> 
> My only concern is that this might be considered a breaking change: users
> might be
> looking for the REBALANCING -> RUNNING transition specifically in order to
> alert when
> the application has started up, and we would no long go directly from
> REBALANCING to
>  RUNNING. I think we actually did/do this ourselves in a number of
> integration tests and
> possibly in some examples. That said, it seems more appropriate to just
> listen for
> the RUNNING state rather than for a specific transition, and we should
> encourage users
> to do so rather than go out of our way to support transition-type state
> listeners.
> 
> Cheers,
> Sophie
> 
> On Wed, Sep 2, 2020 at 5:53 PM Matthias J. Sax <mjsax@apache.org> wrote:
> 
>> I think this makes sense.
>>
>> When we introduce this new state, we might also tackle the jira a
>> mentioned. If there is a global thread, on startup of a `KafakStreams`
>> client we should not transit to `REBALANCING` but to the new state, and
>> maybe also make the "bootstrapping" non-blocking.
>>
>> I guess it's worth to mention this in the KIP.
>>
>> Btw: The new state for KafkaStreams should also be part of the KIP as it
>> is a public API change, too.
>>
>>
>> -Matthias
>>
>> On 8/29/20 9:37 AM, John Roesler wrote:
>>> Hi Navinder,
>>>
>>> Thanks for the ping. Yes, that all sounds right to me. The name
>> “RESTORING_GLOBAL” sounds fine, too.
>>>
>>> I think as far as warnings go, we’d just propose to mention it in the
>> javadoc of the relevant methods that the given topics should be compacted.
>>>
>>> Thanks!
>>> -John
>>>
>>> On Fri, Aug 28, 2020, at 12:42, Navinder Brar wrote:
>>>> Gentle ping.
>>>>
>>>> ~ Navinder
>>>>     On Wednesday, 19 August, 2020, 06:59:58 pm IST, Navinder Brar
>>>> <navinder_brar@yahoo.com.invalid> wrote:
>>>>
>>>>
>>>> Thanks Matthias & John,
>>>>
>>>>
>>>>
>>>> I am glad we are converging towards an understanding. So, to summarize,
>>>>
>>>> we will still keep treating this change in KIP and instead of providing
>> a reset
>>>>
>>>> strategy, we will cleanup, and reset to earliest and build the state.
>>>>
>>>> When we hit the exception and we are building the state, we will stop
>> all
>>>>
>>>> processing and change the state of KafkaStreams to something like
>>>>
>>>> “RESTORING_GLOBAL” or the like.
>>>>
>>>>
>>>>
>>>> How do we plan to educate users on the non desired effects of using
>>>>
>>>> non-compacted global topics? (via the KIP itself?)
>>>>
>>>>
>>>> +1 on changing the KTable behavior, reset policy for global, connecting
>>>> processors to global for a later stage when demanded.
>>>>
>>>> Regards,
>>>> Navinder
>>>>     On Wednesday, 19 August, 2020, 01:00:58 pm IST, Matthias J. Sax
>>>> <mjsax@apache.org> wrote:
>>>>
>>>>  Your observation is correct. Connecting (regular) stores to processors
>>>> is necessary to "merge" sub-topologies into single ones if a store is
>>>> shared. -- For global stores, the structure of the program does not
>>>> change and thus connecting srocessors to global stores is not required.
>>>>
>>>> Also given our experience with restoring regular state stores (ie,
>>>> partial processing of task that don't need restore), it seems better to
>>>> pause processing and move all CPU and network resources to the global
>>>> thread to rebuild the global store as soon as possible instead of
>>>> potentially slowing down the restore in order to make progress on some
>>>> tasks.
>>>>
>>>> Of course, if we collect real world experience and it becomes an issue,
>>>> we could still try to change it?
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 8/18/20 3:31 PM, John Roesler wrote:
>>>>> Thanks Matthias,
>>>>>
>>>>> Sounds good. I'm on board with no public API change and just
>>>>> recovering instead of crashing.
>>>>>
>>>>> Also, to be clear, I wouldn't drag KTables into it; I was
>>>>> just trying to wrap my head around the congruity of our
>>>>> choice for GlobalKTable with respect to KTable.
>>>>>
>>>>> I agree that whatever we decide to do would probably also
>>>>> resolve KAFKA-7380.
>>>>>
>>>>> Moving on to discuss the behavior change, I'm wondering if
>>>>> we really need to block all the StreamThreads. It seems like
>>>>> we only need to prevent processing on any task that's
>>>>> connected to the GlobalStore.
>>>>>
>>>>> I just took a look at the topology building code, and it
>>>>> actually seems that connections to global stores don't need
>>>>> to be declared. That's a bummer, since it means that we
>>>>> really do have to stop all processing while the global
>>>>> thread catches up.
>>>>>
>>>>> Changing this seems like it'd be out of scope right now, but
>>>>> I bring it up in case I'm wrong and it actually is possible
>>>>> to know which specific tasks need to be synchronized with
>>>>> which global state stores. If we could know that, then we'd
>>>>> only have to block some of the tasks, not all of the
>>>>> threads.
>>>>>
>>>>> Thanks,
>>>>> -John
>>>>>
>>>>>
>>>>> On Tue, 2020-08-18 at 14:10 -0700, Matthias J. Sax wrote:
>>>>>> Thanks for the discussion.
>>>>>>
>>>>>> I agree that this KIP is justified in any case -- even if we don't
>>>>>> change public API, as the change in behavior is significant.
>>>>>>
>>>>>> A better documentation for cleanup policy is always good (even if
I am
>>>>>> not aware of any concrete complaints atm that users were not aware
of
>>>>>> the implications). Of course, for a regular KTable, one can
>>>>>> enable/disable the source-topic-changelog optimization and thus can
>> use
>>>>>> a non-compacted topic for this case, what is quite a difference to
>>>>>> global stores/tables; so maybe it's worth to point out this difference
>>>>>> explicitly.
>>>>>>
>>>>>> As mentioned before, the main purpose of the original Jira was to
>> avoid
>>>>>> the crash situation but to allow for auto-recovering while it was
an
>>>>>> open question if it makes sense / would be useful to allow users
to
>>>>>> specify a custom reset policy instead of using a hard-coded "earliest"
>>>>>> strategy. -- It seem it's still unclear if it would be useful and
thus
>>>>>> it might be best to not add it for now -- we can still add it later
if
>>>>>> there are concrete use-cases that need this feature.
>>>>>>
>>>>>> @John: I actually agree that it's also questionable to allow a custom
>>>>>> reset policy for KTables... Not sure if we want to drag this question
>>>>>> into this KIP though?
>>>>>>
>>>>>> So it seem, we all agree that we actually don't need any public API
>>>>>> changes, but we only want to avoid crashing?
>>>>>>
>>>>>> For this case, to preserve the current behavior that guarantees that
>> the
>>>>>> global store/table is always loaded first, it seems we need to have
a
>>>>>> stop-the-world mechanism for the main `StreamThreads` for this case
--
>>>>>> do we need to add a new state to KafkaStreams client for this case?
>>>>>>
>>>>>> Having a new state might also be helpful for
>>>>>> https://issues.apache.org/jira/browse/KAFKA-7380 ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 8/17/20 7:34 AM, John Roesler wrote:
>>>>>>> Hi Navinder,
>>>>>>>
>>>>>>> I see what you mean about the global consumer being similar
>>>>>>> to the restore consumer.
>>>>>>>
>>>>>>> I also agree that automatically performing the recovery
>>>>>>> steps should be strictly an improvement over the current
>>>>>>> situation.
>>>>>>>
>>>>>>> Also, yes, it would be a good idea to make it clear that the
>>>>>>> global topic should be compacted in order to ensure correct
>>>>>>> semantics. It's the same way with input topics for KTables;
>>>>>>> we rely on users to ensure the topics are compacted, and if
>>>>>>> they aren't, then the execution semantics will be broken.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> -John
>>>>>>>
>>>>>>> On Sun, 2020-08-16 at 11:44 +0000, Navinder Brar wrote:
>>>>>>>> Hi John,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks for your inputs. Since, global topics are in a way
their own
>> changelog, wouldn’t the global consumers be more akin to restore consumers
>> than the main consumer?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I am also +1 on catching the exception and setting it to
the
>> earliest for now. Whenever an instance starts, currently global stream
>> thread(if available) goes to RUNNING before stream threads are started so
>> that means the global state is available when the processing by stream
>> threads start. So, with the new change of catching the exception, cleaning
>> store and resetting to earlier would probably be “stop the world” as you
>> said John, as I think we will have to pause the stream threads till the
>> whole global state is recovered. I assume it is "stop the world" right now
>> as well, since now also if an InvalidOffsetException comes, we throw
>> streams exception and the user has to clean up and handle all this manually
>> and when that instance will start, it will restore global state first.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I had an additional thought to this whole problem, would
it be
>> helpful to educate the users that global topics should have cleanup policy
>> as compact, so that this invalid offset exception never arises for them.
>> Assume for example, that the cleanup policy in global topic is "delete" and
>> it has deleted k1, k2 keys(via retention.ms) although all the instances
>> had already consumed them so they are in all global stores and all other
>> instances are up to date on the global data(so no InvalidOffsetException).
>> Now, a new instance is added to the cluster, and we have already lost k1,
>> k2 from the global topic so it will start consuming from the earliest point
>> in the global topic. So, wouldn’t this global store on the new instance has
>> 2 keys less than all the other global stores already available in the
>> cluster? Please let me know if I am missing something. Thanks.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Navinder
>>>>>>>>
>>>>>>>>
>>>>>>>>     On Friday, 14 August, 2020, 10:03:42 am IST, John Roesler
<
>> vvcephei@apache.org> wrote:
>>>>>>>>
>>>>>>>>   Hi all,
>>>>>>>>
>>>>>>>> It seems like the main motivation for this proposal is satisfied
if
>> we just implement some recovery mechanism instead of crashing. If the
>> mechanism is going to be pausing all the threads until the state is
>> recovered, then it still seems like a big enough behavior change to warrant
>> a KIP still.
>>>>>>>>
>>>>>>>> I have to confess I’m a little unclear on why a custom
reset policy
>> for a global store, table, or even consumer might be considered wrong. It’s
>> clearly wrong for the restore consumer, but the global consumer seems more
>> semantically akin to the main consumer than the restore consumer.
>>>>>>>>
>>>>>>>> In other words, if it’s wrong to reset a GlobalKTable from
latest,
>> shouldn’t it also be wrong for a KTable, for exactly the same reason? It
>> certainly seems like it would be an odd choice, but I’ve seen many choices
>> I thought were odd turn out to have perfectly reasonable use cases.
>>>>>>>>
>>>>>>>> As far as the PAPI global store goes, I could see adding
the option
>> to configure it, since as Matthias pointed out, there’s really no specific
>> semantics for the PAPI. But if automatic recovery is really all Navinder
>> wanted, the I could also see deferring this until someone specifically
>> wants it.
>>>>>>>>
>>>>>>>> So the tl;dr is, if we just want to catch the exception and
rebuild
>> the store by seeking to earliest with no config or API changes, then I’m +1.
>>>>>>>>
>>>>>>>> I’m wondering if we can improve on the “stop the world”
effect of
>> rebuilding the global store, though. It seems like we could put our heads
>> together and come up with a more fine-grained approach to maintaining the
>> right semantics during recovery while still making some progress.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> John
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Aug 9, 2020, at 02:04, Navinder Brar wrote:
>>>>>>>>> Hi Matthias,
>>>>>>>>>
>>>>>>>>> IMHO, now as you explained using
>> ‘global.consumer.auto.offset.reset’ is
>>>>>>>>> not as straightforward
>>>>>>>>> as it seems and it might change the existing behavior
for users
>> without
>>>>>>>>> they releasing it, I also
>>>>>>>>>
>>>>>>>>> think that we should change the behavior inside global
stream
>> thread to
>>>>>>>>> not die on
>>>>>>>>>
>>>>>>>>> InvalidOffsetException and instead clean and rebuild
the state
>> from the
>>>>>>>>> earliest. On this, as you
>>>>>>>>>
>>>>>>>>> mentioned that we would need to pause the stream threads
till the
>>>>>>>>> global store is completely restored.
>>>>>>>>>
>>>>>>>>> Without it, there will be incorrect processing results
if they are
>>>>>>>>> utilizing a global store during processing.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> So, basically we can divide the use-cases into 4 parts.
>>>>>>>>>
>>>>>>>>>     - PAPI based global stores (will have the earliest
hardcoded)
>>>>>>>>>     - PAPI based state stores (already has auto.reset.config)
>>>>>>>>>     - DSL based GlobalKTables (will have earliest hardcoded)
>>>>>>>>>     - DSL based KTables (will continue with auto.reset.config)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> So, this would mean that we are not changing any existing
>> behaviors
>>>>>>>>> with this if I am right.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I guess we could improve the code to actually log a warning
for
>> this
>>>>>>>>>
>>>>>>>>> case, similar to what we do for some configs already
(cf
>>>>>>>>>
>>>>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS).
>>>>>>>>>
>>>>>>>>>>> I like this idea. In case we go ahead with the
above approach
>> and if we can’t
>>>>>>>>>
>>>>>>>>> deprecate it, we should educate users that this config
doesn’t
>> work.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Looking forward to hearing thoughts from others as well.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> - Navinder    On Tuesday, 4 August, 2020, 05:07:59 am
IST,
>> Matthias J.
>>>>>>>>> Sax <mjsax@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>   Navinder,
>>>>>>>>>
>>>>>>>>> thanks for updating the KIP. I think the motivation section
is not
>>>>>>>>> totally accurate (what is not your fault though, as the
history of
>> how
>>>>>>>>> we handle this case is intertwined...) For example,
>> "auto.offset.reset"
>>>>>>>>> is hard-coded for the global consumer to "none" and using
>>>>>>>>> "global.consumer.auto.offset.reset" has no effect (cf
>>>>>>>>>
>> https://kafka.apache.org/25/documentation/streams/developer-guide/config-streams.html#default-values
>> )
>>>>>>>>>
>>>>>>>>> Also, we could not even really deprecate the config as
mentioned in
>>>>>>>>> rejected alternatives sections, because we need
>> `auto.offset.reset` for
>>>>>>>>> the main consumer -- and adding a prefix is independent
of it.
>> Also,
>>>>>>>>> because we ignore the config, it's is also deprecated/removed
if
>> you wish.
>>>>>>>>>
>>>>>>>>> I guess we could improve the code to actually log a warning
for
>> this
>>>>>>>>> case, similar to what we do for some configs already
(cf
>>>>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> The other question is about compatibility with regard
to default
>>>>>>>>> behavior: if we want to reintroduce
>> `global.consumer.auto.offset.reset`
>>>>>>>>> this basically implies that we need to respect
>> `auto.offset.reset`, too.
>>>>>>>>> Remember, that any config without prefix is applied to
all clients
>> that
>>>>>>>>> support this config. Thus, if a user does not limit the
scope of
>> the
>>>>>>>>> config to the main consumer (via
>> `main.consumer.auto.offset.reset`) but
>>>>>>>>> uses the non-prefix versions and sets it to "latest"
(and relies
>> on the
>>>>>>>>> current behavior that `auto.offset.reset` is "none",
and
>> effectively
>>>>>>>>> "earliest" on the global consumer), the user might end
up with a
>>>>>>>>> surprise as the global consumer behavior would switch
from
>> "earliest" to
>>>>>>>>> "latest" (most likely unintentionally). Bottom line is,
that users
>> might
>>>>>>>>> need to change configs to preserve the old behavior...
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> However, before we discuss those details, I think we
should
>> discuss the
>>>>>>>>> topic in a broader context first:
>>>>>>>>>
>>>>>>>>>   - for a GlobalKTable, does it even make sense from
a correctness
>> point
>>>>>>>>> of view, to allow users to set a custom reset policy?
It seems you
>>>>>>>>> currently don't propose this in the KIP, but as you don't
mention
>> it
>>>>>>>>> explicitly it's unclear if that on purpose of an oversight?
>>>>>>>>>
>>>>>>>>>   - Should we treat global stores differently to GlobalKTables
and
>> allow
>>>>>>>>> for more flexibility (as the PAPI does not really provide
any
>> semantic
>>>>>>>>> contract). It seems that is what you propose in the KIP.
We should
>>>>>>>>> discuss if this flexibility does make sense or not for
the PAPI,
>> or if
>>>>>>>>> we should apply the same reasoning about correctness
we use for
>> KTables
>>>>>>>>> to global stores? To what extend are/should they be different?
>>>>>>>>>
>>>>>>>>>   - If we support auto.offset.reset for global store,
how should we
>>>>>>>>> handle the initial bootstrapping of the store/table (that
is
>> hard-coded
>>>>>>>>> atm)? Should we skip it if the policy is "latest" and
start with an
>>>>>>>>> empty state? Note that we did consider this behavior
incorrect via
>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6121 and
thus I am
>> wondering
>>>>>>>>> why should we change it back again?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Finally, the main motivation for the Jira ticket was
to let the
>> runtime
>>>>>>>>> auto-recover instead of dying as it does currently. If
we decide
>> that a
>>>>>>>>> custom reset policy does actually not make sense, we
can just
>> change the
>>>>>>>>> global-thread to not die any longer on an `InvalidOffsetException`
>> but
>>>>>>>>> rebuild the state automatically. This would be "only"
a behavior
>> change
>>>>>>>>> but does not require any public API changes. -- For this
case, we
>> should
>>>>>>>>> also think about the synchronization with the main processing
>> threads?
>>>>>>>>> On startup we bootstrap the global stores before processing
>> happens.
>>>>>>>>> Thus, if an `InvalidOffsetException` happen and the global
thread
>> dies,
>>>>>>>>> the main threads cannot access the global stores any
longer an
>> also die.
>>>>>>>>> If we re-build the state though, do we need to pause
the main
>> thread
>>>>>>>>> during this phase?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 8/2/20 8:48 AM, Navinder Brar wrote:
>>>>>>>>>> Hi John,
>>>>>>>>>>
>>>>>>>>>> I have updated the KIP to make the motivation more
clear. In a
>> nutshell, we will use the already existing config
>> "global.consumer.auto.offset.reset" for users to set a blanket reset policy
>> for all global topics and add a new interface to set per-topic reset policy
>> for each global topic(for which we specifically need this KIP). There was a
>> point raised from Matthias above to always reset to earliest by cleaning
>> the stores and seekToBeginning in case of InvalidOffsetException. We can go
>> with that route as well and I don't think it would need a KIP as if we are
>> not providing users an option to have blanket reset policy on global
>> topics, then a per-topic override would also not be required(the KIP is
>> required basically for that). Although, I think if users have an option to
>> choose reset policy for StreamThread then the option should be provided for
>> GlobalStreamThread as well and if we don't want to use the
>> "global.consumer.auto.offset.reset" then we would need to deprecate it
>> because currently it's not serving any purpose. For now, I have added it in
>> rejected alternatives but we can discuss this.
>>>>>>>>>>
>>>>>>>>>> On the query that I had for Guozhang, thanks to Matthias
we have
>> fixed it last week as part of KAFKA-10306.
>>>>>>>>>>
>>>>>>>>>> ~Navinder
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>     On Sunday, 26 July, 2020, 07:37:34 pm IST, Navinder
Brar <
>> navinder_brar@yahoo.com.invalid> wrote:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Sorry, it took some time to respond back.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> “but I thought we would pass the config through
to the client.”
>>>>>>>>>>
>>>>>>>>>>>> @John, sure we can use the config in GloablStreamThread,
that
>> could be one of the way to solve it.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> @Matthias, sure cleaning the store and recreating
is one way but
>> since we are giving an option to reset in StreamThread why the
>> implementation should be different in GlobalStreamThread. I think we should
>> use the global.consumer.auto.offset.reset config to accept the reset
>> strategy opted by the user although I would be ok with just cleaning and
>> resetting to the latest as well for now. Currently, we throw a
>> StreamsException in case of InvalidOffsetException in GlobalStreamThread so
>> just resetting would still be better than what happens currently.
>>>>>>>>>>
>>>>>>>>>> Matthias, I found this comment in StreamBuilder for
GlobalKTable
>> ‘* Note that {@link GlobalKTable} always applies {@code
>> "auto.offset.reset"} strategy {@code "earliest"} regardless of the
>> specified value in {@link StreamsConfig} or {@link Consumed}.’
>>>>>>>>>> So, I guess we are already cleaning up and recreating
for
>> GlobalKTable from earliest offset.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> @Guozhan while looking at the code, I also noticed
a TODO:
>> pending in GlobalStateManagerImpl, when InvalidOffsetException is thrown.
>> Earlier, we were directly clearing the store here and recreating from
>> scratch but that code piece is removed now. Are you working on a follow-up
>> PR for this or just handling the reset in GlobalStreamThread should be
>> sufficient?
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Navinder
>>>>>>>>>>
>>>>>>>>>>     On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias
J. Sax <
>> mjsax@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>   Atm, the config should be ignored and the global-consumer
>> should use
>>>>>>>>>> "none" in a hard-coded way.
>>>>>>>>>>
>>>>>>>>>> However, if am still wondering if we actually want/need
to allow
>> users
>>>>>>>>>> to specify the reset policy? It might be worth to
consider, to
>> just
>>>>>>>>>> change the behavior: catch the exception, log an
ERROR (for
>> information
>>>>>>>>>> purpose), wipe the store, seekToBeginning(), and
recreate the
>> store?
>>>>>>>>>>
>>>>>>>>>> Btw: if we want to allow users to set the reset policy,
this
>> should be
>>>>>>>>>> possible via the config, or via overwriting the config
in the
>> method
>>>>>>>>>> itself. Thus, we would need to add the new overloaded
method to
>>>>>>>>>> `Topology` and `StreamsBuilder`.
>>>>>>>>>>
>>>>>>>>>> Another question to ask: what about GlobalKTables?
Should they
>> behave
>>>>>>>>>> the same? An alternative design could be, to allow
users to
>> specify a
>>>>>>>>>> flexible reset policy for global-stores, but not
for
>> GlobalKTables and
>>>>>>>>>> use the strategy suggested above for this case.
>>>>>>>>>>
>>>>>>>>>> Thoughts?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 7/2/20 2:14 PM, John Roesler wrote:
>>>>>>>>>>> Hi Navinder,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the response. I’m sorry if I’m
being dense... You
>> said we are not currently using the config, but I thought we would pass the
>> config through to the client.  Can you confirm whether or not the existing
>> config works for your use case?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> John
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jun 28, 2020, at 14:09, Navinder Brar
wrote:
>>>>>>>>>>>> Sorry my bad. Found it.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Prefix used to override {@link KafkaConsumer
consumer} configs
>> for the
>>>>>>>>>>>> global consumer client from
>>>>>>>>>>>>
>>>>>>>>>>>> * the general consumer client configs. The
override precedence
>> is the
>>>>>>>>>>>> following (from highest to lowest precedence):
>>>>>>>>>>>> * 1. global.consumer.[config-name]..
>>>>>>>>>>>> public static final String GLOBAL_CONSUMER_PREFIX
=
>> "global.consumer.";
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> So, that's great. We already have a config
exposed to reset
>> offsets for
>>>>>>>>>>>> global topics via global.consumer.auto.offset.reset
just that
>> we are
>>>>>>>>>>>> not actually using it inside GlobalStreamThread
to reset.
>>>>>>>>>>>>
>>>>>>>>>>>> -Navinder
>>>>>>>>>>>>     On Monday, 29 June, 2020, 12:24:21 am
IST, Navinder Brar
>>>>>>>>>>>> <navinder_brar@yahoo.com.invalid> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>   Hi John,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your feedback.
>>>>>>>>>>>> 1. I think there is some confusion on my
first point, the enum
>> I am
>>>>>>>>>>>> sure we can use the same one but the external
config which
>> controls the
>>>>>>>>>>>> resetting in global stream thread either
we can the same one
>> which
>>>>>>>>>>>> users use for source topics(StreamThread)
or we can provide a
>> new one
>>>>>>>>>>>> which specifically controls global topics.
For e.g. currently
>> if I get
>>>>>>>>>>>> an InvalidOffsetException in any of my source
topics, I can
>> choose
>>>>>>>>>>>> whether to reset from Earliest or Latest(with
>> auto.offset.reset). Now
>>>>>>>>>>>> either we can use the same option and say
if I get the same
>> exception
>>>>>>>>>>>> for global topics I will follow same resetting.
Or some users
>> might
>>>>>>>>>>>> want to have totally different setting for
both source and
>> global
>>>>>>>>>>>> topics, like for source topic I want resetting
from Latest but
>> for
>>>>>>>>>>>> global topics I want resetting from Earliest
so in that case
>> adding a
>>>>>>>>>>>> new config might be better.
>>>>>>>>>>>>
>>>>>>>>>>>> 2. I couldn't find this config currently
>>>>>>>>>>>> "global.consumer.auto.offset.reset". Infact
in
>> GlobalStreamThread.java
>>>>>>>>>>>> we are throwing a StreamsException for InvalidOffsetException
>> and there
>>>>>>>>>>>> is a test as
>>>>>>>>>>>> well
>> GlobalStreamThreadTest#shouldDieOnInvalidOffsetException(), so I
>>>>>>>>>>>> think this is the config we are trying to
introduce with this
>> KIP.
>>>>>>>>>>>>
>>>>>>>>>>>> -Navinder  On Saturday, 27 June, 2020, 07:03:04
pm IST, John
>> Roesler
>>>>>>>>>>>> <john@vvcephei.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>   Hi Navinder,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for this proposal!
>>>>>>>>>>>>
>>>>>>>>>>>> Regarding your question about whether to
use the same policy
>>>>>>>>>>>> enum or not, the underlying mechanism is
the same, so I think
>>>>>>>>>>>> we can just use the same AutoOffsetReset
enum.
>>>>>>>>>>>>
>>>>>>>>>>>> Can you confirm whether setting the reset
policy config on the
>>>>>>>>>>>> global consumer currently works or not? Based
on my reading
>>>>>>>>>>>> of StreamsConfig, it looks like it would
be:
>>>>>>>>>>>> "global.consumer.auto.offset.reset".
>>>>>>>>>>>>
>>>>>>>>>>>> If that does work, would you still propose
to augment the
>>>>>>>>>>>> Java API?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> -John
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jun 26, 2020, at 23:52, Navinder
Brar wrote:
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> KIP:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have taken over this KIP since it has
been dormant for a
>> long time
>>>>>>>>>>>>> and this looks important for use-cases
that have large global
>> data, so
>>>>>>>>>>>>> rebuilding global stores from scratch
might seem overkill in
>> case of
>>>>>>>>>>>>> InvalidOffsetExecption.
>>>>>>>>>>>>>
>>>>>>>>>>>>> We want to give users the control to
use reset policy(as we do
>> in
>>>>>>>>>>>>> StreamThread) in case they hit invalid
offsets. I have still
>> not
>>>>>>>>>>>>> decided whether to restrict this option
to the same reset
>> policy being
>>>>>>>>>>>>> used by StreamThread(using auto.offset.reset
config) or add
>> another
>>>>>>>>>>>>> reset config specifically for global
stores
>>>>>>>>>>>>> "global.auto.offset.reset" which gives
users more control to
>> choose
>>>>>>>>>>>>> separate policies for global and stream
threads.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I would like to hear your opinions on
the KIP.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Navinder
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>
>>>>
>>
>>
> 


Mime
View raw message