kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eno Thereska <eno.there...@gmail.com>
Subject Re: [DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB
Date Mon, 13 Aug 2018 14:28:55 GMT
Hi Matthias,

Good stuff. Could you comment a bit on how future-proof is this change? For
example, if we want to store both event timestamp "and" processing time in
RocksDB will we then need another interface (e.g. called
KeyValueWithTwoTimestampsStore)?

Thanks
Eno

On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax <matthias@confluent.io>
wrote:

> Thanks for your input Guozhang and John.
>
> I see your point, that the upgrade API is not simple. If you don't
> thinks it's valuable to make generic store upgrades possible (atm), we
> can make the API internal, too. The impact is, that we only support a
> predefined set up upgrades (ie, KV to KVwithTs, Windowed to
> WindowedWithTS etc) for which we implement the internal interfaces.
>
> We can keep the design generic, so if we decide to make it public, we
> don't need to re-invent it. This will also have the advantage, that we
> can add upgrade pattern for other stores later, too.
>
> I also agree, that the `StoreUpgradeBuilder` is a little ugly, but it
> was the only way I could find to design a generic upgrade interface. If
> we decide the hide all the upgrade stuff, `StoreUpgradeBuilder` would
> become an internal interface I guess (don't think we can remove it).
>
> I will wait for more feedback about this and if nobody wants to keep it
> as public API I will update the KIP accordingly. Will add some more
> clarifications for different upgrade patterns in the mean time and fix
> the typos/minor issues.
>
> About adding a new state UPGRADING: maybe we could do that. However, I
> find it particularly difficult to make the estimation when we should
> switch to RUNNING, thus, I am a little hesitant. Using store callbacks
> or just logging the progress including some indication about the "lag"
> might actually be sufficient. Not sure what others think?
>
> About "value before timestamp": no real reason and I think it does not
> make any difference. Do you want to change it?
>
> About upgrade robustness: yes, we cannot control if an instance fails.
> That is what I meant by "we need to write test". The upgrade should be
> able to continuous even is an instance goes down (and we must make sure
> that we don't end up in an invalid state that forces us to wipe out the
> whole store). Thus, we need to write system tests that fail instances
> during upgrade.
>
> For `in_place_offline` upgrade: I don't think we need this mode, because
> people can do this via a single rolling bounce.
>
>  - prepare code and switch KV-Store to KVwithTs-Store
>  - do a single rolling bounce (don't set any upgrade config)
>
> For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store` if we
> remove the `StoreUpgradeBuilder`) will detect that there is only an old
> local KV store w/o TS, will start to restore the new KVwithTs store,
> wipe out the old store and replace with the new store after restore is
> finished, and start processing only afterwards. (I guess we need to
> document this case -- will also add it to the KIP.)
>
>
>
> -Matthias
>
>
>
> On 8/9/18 1:10 PM, John Roesler wrote:
> > Hi Matthias,
> >
> > I think this KIP is looking really good.
> >
> > I have a few thoughts to add to the others:
> >
> > 1. You mentioned at one point users needing to configure
> > `upgrade.mode="null"`. I think this was a typo and you meant to say they
> > should remove the config. If they really have to set it to a string
> "null"
> > or even set it to a null value but not remove it, it would be
> unfortunate.
> >
> > 2. In response to Bill's comment #1 , you said that "The idea is that the
> > upgrade should be robust and not fail. We need to write according tests".
> > I may have misunderstood the conversation, but I don't think it's within
> > our power to say that an instance won't fail. What if one of my computers
> > catches on fire? What if I'm deployed in the cloud and one instance
> > disappears and is replaced by a new one? Or what if one instance goes
> AWOL
> > for a long time and then suddenly returns? How will the upgrade process
> > behave in light of such failures?
> >
> > 3. your thought about making in-place an offline mode is interesting, but
> > it might be a bummer for on-prem users who wish to upgrade online, but
> > cannot just add new machines to the pool. It could be a new upgrade mode
> > "offline-in-place", though...
> >
> > 4. I was surprised to see that a user would need to modify the topology
> to
> > do an upgrade (using StoreUpgradeBuilder). Maybe some of Guozhang's
> > suggestions would remove this necessity.
> >
> > Thanks for taking on this very complex but necessary work.
> >
> > -John
> >
> > On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> >> Hello Matthias,
> >>
> >> Thanks for the updated KIP. Some more comments:
> >>
> >> 1. The current set of proposed API is a bit too complicated, which makes
> >> the upgrade flow from user's perspective also a bit complex. I'd like to
> >> check different APIs and discuss about their needs separately:
> >>
> >>     1.a. StoreProxy: needed for in-place upgrade only, between the first
> >> and second rolling bounce, where the old-versioned stores can handle
> >> new-versioned store APIs. I think such upgrade paths (i.e. from one
> store
> >> type to another) would not be very common: users may want to upgrade
> from a
> >> certain store engine to another, but the interface would likely be
> staying
> >> the same. Hence personally I'd suggest we keep it internally and only
> >> consider exposing it in the future if it does become a common pattern.
> >>
> >>     1.b. ConverterStore / RecordConverter: needed for both in-place and
> >> roll-over upgrade, between the first and second rolling bounces, for the
> >> new versioned store to be able to read old-versioned changelog topics.
> >> Firstly I think we should not expose key in the public APIs but only the
> >> values, since allowing key format changes would break log compaction,
> and
> >> hence would not be compatible anyways. As for value format changes,
> >> personally I think we can also keep its upgrade logic internally as it
> may
> >> not worth generalizing to user customizable logic.
> >>
> >>     1.c. If you agrees with 2.a/b above, then we can also remove "
> >> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the public
> APIs.
> >>
> >>     1.d. Personally I think "ReadOnlyKeyValueWithTimestampStore" is not
> >> needed either given that we are exposing "ValueAndTimestamp" anyways.
> I.e.
> >> it is just a syntax sugar and for IQ, users can always just set a "
> >> QueryableStoreType<ReadOnlyKeyValue<K, ValueAndTimestamp<V>>>" as the
> new
> >> interface does not provide any additional functions.
> >>
> >>
> >> 2. Could we further categorize the upgrade flow for different use cases,
> >> e.g. 1) DSL users where KeyValueWithTimestampStore will be used
> >> automatically for non-windowed aggregate; 2) PAPI users who do not need
> to
> >> use KeyValueWithTimestampStore; 3) PAPI users who do want to switch to
> >> KeyValueWithTimestampStore. Just to give my understanding for 3), the
> >> upgrade flow for users may be simplified as the following (for both
> >> in-place and roll-over):
> >>
> >>     * Update the jar to new version, make code changes from
> KeyValueStore
> >> to KeyValueWithTimestampStore, set upgrade config.
> >>
> >>     * First rolling bounce, and library code can internally use proxy /
> >> converter based on the specified config to handle new APIs with old
> stores,
> >> while let new stores read from old changelog data.
> >>
> >>     * Reset upgrade config.
> >>
> >>     * Second rolling bounce, and the library code automatically turn off
> >> logic for proxy / converter.
> >>
> >>
> >> 3. Some more detailed proposals are needed for when to recommend users
> to
> >> trigger the second rolling bounce. I have one idea to share here: we
> add a
> >> new state to KafkaStreams, say UPGRADING, which is set when 1) upgrade
> >> config is set, and 2) the new stores are still ramping up (for the
> second
> >> part, we can start with some internal hard-coded heuristics to decide
> when
> >> it is close to be ramped up). If either one of it is not true any more,
> it
> >> should transit to RUNNING. Users can then watch on this state, and
> decide
> >> to only trigger the second rebalance when the state has transited from
> >> UPGRADING. They can also choose to cut over while the instance is still
> >> UPGRADING, the downside is that after that the application may have long
> >> restoration phase which is, to user's pov, unavailability periods.
> >>
> >>
> >> Below are just some minor things on the wiki:
> >>
> >> 4. "proxy story" => "proxy store".
> >>
> >> 5. "use the a builder " => "use a builder"
> >>
> >> 6: "we add the record timestamp as a 8-byte (long) prefix to the value":
> >> what's the rationale of putting the timestamp before the value, than
> after
> >> the value?
> >>
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax <matthias@confluent.io>
> >> wrote:
> >>
> >>> Thanks for the feedback Bill. I just update the KIP with some of your
> >>> points.
> >>>
> >>>>> Regarding step 3C of the in-place upgrade (users needing to watch the
> >>>>> restore process), I'm wondering if we want to provide a type of
> >>>>> StateRestoreListener that could signal when the new stores have
> >> reached
> >>>>> parity with the existing old stores and that could be the signal to
> >>> start
> >>>>> second rolling rebalance?
> >>>
> >>> I think we can reuse the existing listeners, thus, I did not include
> >>> anything in the KIP. About a signal to rebalance: this might be tricky.
> >>> If we prepare the store "online", the active task will update the state
> >>> continuously, and thus, state prepare is never finished. It will be the
> >>> users responsibility to do the second rebalance (note, that the second
> >>> rebalance will first finish the last delta of the upgrade to finish the
> >>> upgrade before actual processing resumes). I clarified the KIP with
> this
> >>> regard a little bit.
> >>>
> >>>>> 1. Out of N instances, one fails midway through the process, would we
> >>> allow
> >>>>> the other instances to complete or just fail the entire upgrade?
> >>>
> >>> The idea is that the upgrade should be robust and not fail. We need to
> >>> write according tests.
> >>>
> >>>>> 2. During the second rolling bounce, maybe we could rename the
> current
> >>>>> active directories vs. deleting them right away,  and when all the
> >>> prepare
> >>>>> task directories are successfully migrated then delete the previous
> >>> active
> >>>>> ones.
> >>>
> >>> Ack. Updated the KIP.
> >>>
> >>>>> 3. For the first rolling bounce we pause any processing any new
> >> records
> >>> and
> >>>>> just allow the prepare tasks to restore, then once all prepare tasks
> >>> have
> >>>>> restored, it's a signal for the second round of rolling bounces and
> >>> then as
> >>>>> each task successfully renames its prepare directories and deletes
> the
> >>> old
> >>>>> active task directories, normal processing of records resumes.
> >>>
> >>> The basic idea is to do an online upgrade to avoid downtime. We can
> >>> discuss to offer both options... For the offline upgrade option, we
> >>> could simplify user interaction and trigger the second rebalance
> >>> automatically with the requirement that a user needs to update any
> >> config.
> >>>
> >>> If might actually be worth to include this option: we know from
> >>> experience with state restore, that regular processing slows down the
> >>> restore. For roll_over upgrade, it would be a different story and
> >>> upgrade should not be slowed down by regular processing. Thus, we
> should
> >>> even make in_place an offline upgrade and force people to use roll_over
> >>> if they need onlint upgrade. Might be a fair tradeoff that may simplify
> >>> the upgrade for the user and for the code complexity.
> >>>
> >>> Let's see what other think.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 7/27/18 12:53 PM, Bill Bejeck wrote:
> >>>> Hi Matthias,
> >>>>
> >>>> Thanks for the update and the working prototype, it helps with
> >>>> understanding the KIP.
> >>>>
> >>>> I took an initial pass over this PR, and overall I find the interfaces
> >>> and
> >>>> approach to be reasonable.
> >>>>
> >>>> Regarding step 3C of the in-place upgrade (users needing to watch the
> >>>> restore process), I'm wondering if we want to provide a type of
> >>>> StateRestoreListener that could signal when the new stores have
> reached
> >>>> parity with the existing old stores and that could be the signal to
> >> start
> >>>> second rolling rebalance?
> >>>>
> >>>> Although you solicited feedback on the interfaces involved, I wanted
> to
> >>> put
> >>>> down some thoughts that have come to mind reviewing this KIP again
> >>>>
> >>>> 1. Out of N instances, one fails midway through the process, would we
> >>> allow
> >>>> the other instances to complete or just fail the entire upgrade?
> >>>> 2. During the second rolling bounce, maybe we could rename the current
> >>>> active directories vs. deleting them right away,  and when all the
> >>> prepare
> >>>> task directories are successfully migrated then delete the previous
> >>> active
> >>>> ones.
> >>>> 3. For the first rolling bounce we pause any processing any new
> records
> >>> and
> >>>> just allow the prepare tasks to restore, then once all prepare tasks
> >> have
> >>>> restored, it's a signal for the second round of rolling bounces and
> >> then
> >>> as
> >>>> each task successfully renames its prepare directories and deletes the
> >>> old
> >>>> active task directories, normal processing of records resumes.
> >>>>
> >>>> Thanks,
> >>>> Bill
> >>>>
> >>>>
> >>>>
> >>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax <
> matthias@confluent.io
> >>>
> >>>> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> KIP-268 (rebalance meatadata) is finished and included in AK 2.0
> >>>>> release. Thus, I want to pick up this KIP again to get the RocksDB
> >>>>> upgrade done for 2.1.
> >>>>>
> >>>>> I updated the KIP accordingly and also have a "prove of concept" PR
> >>>>> ready (for "in place" upgrade only):
> >>>>> https://github.com/apache/kafka/pull/5422/
> >>>>>
> >>>>> There a still open questions, but I want to collect early feedback on
> >>>>> the proposed interfaces we need for the store upgrade. Also note,
> that
> >>>>> the KIP now also aim to define a generic upgrade path from any store
> >>>>> format A to any other store format B. Adding timestamps is just a
> >>>>> special case.
> >>>>>
> >>>>> I will continue to work on the PR and refine the KIP in the meantime,
> >>> too.
> >>>>>
> >>>>> Looking forward to your feedback.
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote:
> >>>>>> After some more thoughts, I want to follow John's suggestion and
> >> split
> >>>>>> upgrading the rebalance metadata from the store upgrade.
> >>>>>>
> >>>>>> I extracted the metadata upgrade into it's own KIP:
> >>>>>>
> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%
> >>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
> >>>>>>
> >>>>>> I'll update this KIP accordingly shortly. I also want to consider to
> >>>>>> make the store format upgrade more flexible/generic. Atm, the KIP is
> >>> too
> >>>>>> much tailored to the DSL IMHO and does not encounter PAPI users that
> >> we
> >>>>>> should not force to upgrade the stores. I need to figure out the
> >>> details
> >>>>>> and follow up later.
> >>>>>>
> >>>>>> Please give feedback for the new KIP-268 on the corresponding
> >>> discussion
> >>>>>> thread.
> >>>>>>
> >>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't figure out a
> >> way
> >>>>>> for a single rolling bounce upgrade. But KIP-268 proposes a fix for
> >>>>>> future upgrades. Please share your thoughts.
> >>>>>>
> >>>>>> Thanks for all your feedback!
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote:
> >>>>>>> @John: yes, we would throw if configs are missing (it's an
> >>>>>>> implementation details IMHO and thus I did not include it in the
> >> KIP)
> >>>>>>>
> >>>>>>> @Guozhang:
> >>>>>>>
> >>>>>>> 1) I understand know what you mean. We can certainly, allow all
> >> values
> >>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for `upgrade.from`
> >>>>>>> parameter. I had a similar though once but decided to collapse them
> >>> into
> >>>>>>> one -- will update the KIP accordingly.
> >>>>>>>
> >>>>>>> 2) The idea to avoid any config would be, to always send both
> >> request.
> >>>>>>> If we add a config to eventually disable the old request, we don't
> >>> gain
> >>>>>>> anything with this approach. The question is really, if we are
> >> willing
> >>>>>>> to pay this overhead from 1.2 on -- note, it would be limited to 2
> >>>>>>> versions and not grow further in future releases. More details in
> >> (3)
> >>>>>>>
> >>>>>>> 3) Yes, this approach subsumes (2) for later releases and allows us
> >> to
> >>>>>>> stay with 2 "assignment strategies" we need to register, as the new
> >>>>>>> assignment strategy will allow to "upgrade itself" via "version
> >>>>>>> probing". Thus, (2) would only be a workaround to avoid a config if
> >>>>>>> people upgrade from pre-1.2 releases.
> >>>>>>>
> >>>>>>> Thus, I don't think we need to register new "assignment strategies"
> >>> and
> >>>>>>> send empty subscriptions for older version.
> >>>>>>>
> >>>>>>> 4) I agree that this is a tricky thing to get right with a single
> >>>>>>> rebalance. I share the concern that an application might never
> catch
> >>> up
> >>>>>>> and thus the hot standby will never be ready.
> >>>>>>>
> >>>>>>> Maybe it's better to go with 2 rebalances for store upgrades. If we
> >> do
> >>>>>>> this, we also don't need to go with (2) and can get (3) in place
> for
> >>>>>>> future upgrades. I also think that changes to the metadata are more
> >>>>>>> likely and thus allowing for single rolling bounce for this case is
> >>> more
> >>>>>>> important anyway. If we assume that store upgrade a rare, it might
> >> be
> >>> ok
> >>>>>>> to sacrifice two rolling bounced for this case. It was just an idea
> >> I
> >>>>>>> wanted to share (even if I see the issues).
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote:
> >>>>>>>> Hello Matthias, thanks for your replies.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 1) About the config names: actually I was trying to not expose
> >>>>>>>> implementation details :) My main concern was that in your
> proposal
> >>> the
> >>>>>>>> values need to cover the span of all the versions that are
> actually
> >>>>> using
> >>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a user) am
> >>>>> upgrading
> >>>>>>>> from any versions within this range I need to remember to use the
> >>> value
> >>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version. In my
> >>> suggestion
> >>>>> I
> >>>>>>>> was trying to argue the benefit of just letting users to specify
> >> the
> >>>>> actual
> >>>>>>>> Kafka version she's trying to upgrade from, than specifying a
> range
> >>> of
> >>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc as the
> >> values,
> >>>>> but
> >>>>>>>> still using Kafka versions like broker's `internal.version`
> config.
> >>>>> But if
> >>>>>>>> you were suggesting the same thing, i.e. by "0.10.1.x-1.1.x" you
> >>> meant
> >>>>> to
> >>>>>>>> say users can just specify "0.10.1" or "0.10.2" or "0.11.0" or
> >> "1.1"
> >>>>> which
> >>>>>>>> are all recognizable config values then I think we are actually on
> >>> the
> >>>>> same
> >>>>>>>> page.
> >>>>>>>>
> >>>>>>>> 2) About the "multi-assignment" idea: yes it would increase the
> >>> network
> >>>>>>>> footprint, but not the message size, IF I'm not mis-understanding
> >>> your
> >>>>> idea
> >>>>>>>> of registering multiple assignment. More details:
> >>>>>>>>
> >>>>>>>> In the JoinGroupRequest, in the protocols field we can encode
> >>> multiple
> >>>>>>>> protocols each with their different metadata. The coordinator will
> >>>>> pick the
> >>>>>>>> common one that everyone supports (if there are no common one, it
> >>> will
> >>>>> send
> >>>>>>>> an error back; if there are multiple ones, it will pick the one
> >> with
> >>>>> most
> >>>>>>>> votes, i.e. the one which was earlier in the encoded list). Since
> >> our
> >>>>>>>> current Streams rebalance protocol is still based on the consumer
> >>>>>>>> coordinator, it means our protocol_type would be "consumer", but
> >>>>> instead
> >>>>>>>> the protocol type we can have multiple protocols like "streams",
> >>>>>>>> "streams_v2", "streams_v3" etc. The downside is that we need to
> >>>>> implement a
> >>>>>>>> different assignor class for each version and register all of them
> >> in
> >>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the future if
> >> we
> >>>>>>>> re-factor our implementation to have our own client coordinator
> >> layer
> >>>>> like
> >>>>>>>> Connect did, we can simplify this part of the implementation. But
> >>> even
> >>>>> for
> >>>>>>>> now with the above approach this is still doable.
> >>>>>>>>
> >>>>>>>> On the broker side, the group coordinator will only persist a
> group
> >>>>> with
> >>>>>>>> the selected protocol and its subscription metadata, e.g. if
> >>>>> coordinator
> >>>>>>>> decides to pick "streams_v2" it will only sends that protocol's
> >>>>> metadata
> >>>>>>>> from everyone to the leader to assign, AND when completing the
> >>>>> rebalance it
> >>>>>>>> will also only write the group metadata with that protocol and the
> >>>>>>>> assignment only. In a word, although the network traffic maybe
> >>>>> increased a
> >>>>>>>> bit, it would not be a bummer in our trade-off. One corner
> >> situation
> >>> we
> >>>>>>>> need to consider is how to stop registering very old assignors to
> >>>>> avoid the
> >>>>>>>> network traffic from increasing indefinitely, e.g. if you are
> >> rolling
> >>>>>>>> bounce from v2 to v3, then you'd not need to register v1 assignor
> >>>>> anymore,
> >>>>>>>> but that would unfortunately still require some configs.
> >>>>>>>>
> >>>>>>>> 3) About the  "version probing" idea, I think that's a promising
> >>>>> approach
> >>>>>>>> as well, but if we are going to do the multi-assignment its value
> >>> seems
> >>>>>>>> subsumed? But I'm thinking maybe it can be added on top of
> >>>>> multi-assignment
> >>>>>>>> to save us from still requiring the config to avoid registering
> all
> >>> the
> >>>>>>>> metadata for all version. More details:
> >>>>>>>>
> >>>>>>>> In the JoinGroupRequest, we still register all the assignor but
> for
> >>>>> all old
> >>>>>>>> assignors we do not encode any metadata, i.e. the encoded data
> >> would
> >>>>> be:
> >>>>>>>>
> >>>>>>>> "streams_vN" : "encoded metadata"
> >>>>>>>> "streams_vN-1":empty
> >>>>>>>> "streams_vN-2":empty
> >>>>>>>> ..
> >>>>>>>> "streams_0":empty
> >>>>>>>>
> >>>>>>>> So the coordinator can still safely choose the latest common
> >> version;
> >>>>> and
> >>>>>>>> then when leaders receive the subscription (note it should always
> >>>>> recognize
> >>>>>>>> that version), let's say it is streams_vN-2, if one of the
> >>>>> subscriptions
> >>>>>>>> are empty bytes, it will send the empty assignment with that
> >> version
> >>>>> number
> >>>>>>>> encoded in the metadata. So in the second auto-triggered all
> >> members
> >>>>> would
> >>>>>>>> send the metadata with that version:
> >>>>>>>>
> >>>>>>>> "streams_vN" : empty
> >>>>>>>> "streams_vN-1" : empty
> >>>>>>>> "streams_vN-2" : "encoded metadata"
> >>>>>>>> ..
> >>>>>>>> "streams_0":empty
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> By doing this we would not require any configs for users.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not clear about
> the
> >>>>> details
> >>>>>>>> so probably we'd need to fill that out before making a call. For
> >>>>> example,
> >>>>>>>> you mentioned "If we detect this situation, the Streams
> application
> >>>>> closes
> >>>>>>>> corresponding active tasks as well as "hot standby" tasks, and
> >>>>> re-creates
> >>>>>>>> the new active tasks using the new store." How could we guarantee
> >>> that
> >>>>> the
> >>>>>>>> gap between these two stores will keep decreasing than increasing
> >> so
> >>>>> we'll
> >>>>>>>> eventually achieve the flip point? And also the longer we are
> >> before
> >>>>> the
> >>>>>>>> flip point, the larger we are doubling the storage space, etc.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax <
> >>>>> matthias@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> @John, Guozhang,
> >>>>>>>>>
> >>>>>>>>> thanks a lot for your comments. Very long reply...
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> About upgrading the rebalance metadata:
> >>>>>>>>>
> >>>>>>>>> Another possibility to do this, would be to register multiple
> >>>>> assignment
> >>>>>>>>> strategies for the 1.2 applications. For this case, new instances
> >>>>> would
> >>>>>>>>> be configured to support both and the broker would pick the
> >> version
> >>>>> that
> >>>>>>>>> all instances understand. The disadvantage would be, that we send
> >>> much
> >>>>>>>>> more data (ie, two subscriptions) in each rebalance as long as no
> >>>>> second
> >>>>>>>>> rebalance is done disabling the old protocol. Thus, using this
> >>>>> approach
> >>>>>>>>> would allow to avoid a second rebalance trading-off an increased
> >>>>>>>>> rebalance network footprint (I also assume that this would
> >> increase
> >>>>> the
> >>>>>>>>> message size that is written into __consumer_offsets topic?).
> >>>>> Overall, I
> >>>>>>>>> am not sure if this would be a good tradeoff, but it could avoid
> a
> >>>>>>>>> second rebalance (I have some more thoughts about stores below
> >> that
> >>>>> are
> >>>>>>>>> relevant for single rebalance upgrade).
> >>>>>>>>>
> >>>>>>>>> For future upgrades we might be able to fix this though. I was
> >>>>> thinking
> >>>>>>>>> about the following:
> >>>>>>>>>
> >>>>>>>>> In the current implementation, the leader fails if it gets a
> >>>>>>>>> subscription it does not understand (ie, newer version). We could
> >>>>> change
> >>>>>>>>> this behavior and let the leader send an empty assignment plus
> >> error
> >>>>>>>>> code (including supported version) back to the instance sending
> >> the
> >>>>>>>>> "bad" subscription. This would allow the following logic for an
> >>>>>>>>> application instance:
> >>>>>>>>>
> >>>>>>>>>  - on startup, always send the latest subscription format
> >>>>>>>>>  - if leader understands it, we get an assignment back an start
> >>>>> processing
> >>>>>>>>>  - if leader does not understand it, we get an empty assignment
> >> and
> >>>>>>>>> supported version back
> >>>>>>>>>  - the application unsubscribe()/subscribe()/poll() again and
> >>> sends a
> >>>>>>>>> subscription using the leader's supported version
> >>>>>>>>>
> >>>>>>>>> This protocol would allow to do a single rolling bounce, and
> >>>>> implements
> >>>>>>>>> a "version probing" step, that might result in two executed
> >>>>> rebalances.
> >>>>>>>>> The advantage would be, that the user does not need to set any
> >>> configs
> >>>>>>>>> or do multiple rolling bounces, as Streams takes care of this
> >>>>>>>>> automatically.
> >>>>>>>>>
> >>>>>>>>> One disadvantage would be, that two rebalances happen and that
> for
> >>> an
> >>>>>>>>> error case during rebalance, we loose the information about the
> >>>>>>>>> supported leader version and the "probing step" would happen a
> >>> second
> >>>>> time.
> >>>>>>>>>
> >>>>>>>>> If the leader is eventually updated, it will include it's own
> >>>>> supported
> >>>>>>>>> version in all assignments, to allow a "down graded" application
> >> to
> >>>>>>>>> upgrade its version later. Also, if a application fails, the
> first
> >>>>>>>>> probing would always be successful and only a single rebalance
> >>>>> happens.
> >>>>>>>>> If we use this protocol, I think we don't need any configuration
> >>>>>>>>> parameter for future upgrades.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> About "upgrade.from" vs "internal.protocol.version":
> >>>>>>>>>
> >>>>>>>>> Users would set "upgrade.from" to the release version the
> >>> current/old
> >>>>>>>>> application is using. I think this is simpler, as users know this
> >>>>>>>>> version. If we use "internal.protocol.version" instead, we expose
> >>>>>>>>> implementation details and users need to know the protocol
> version
> >>>>> (ie,
> >>>>>>>>> they need to map from the release version to the protocol
> version;
> >>> ie,
> >>>>>>>>> "I am run 0.11.0 that runs with metadata protocol version 2").
> >>>>>>>>>
> >>>>>>>>> Also the KIP states that for the second rolling bounce, the
> >>>>>>>>> "upgrade.mode" config should be set back to `null` -- and thus,
> >>>>>>>>> "upgrade.from" would not have any effect and is ignored (I will
> >>> update
> >>>>>>>>> the KIP to point out this dependency).
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> About your second point: I'll update the KIP accordingly to
> >> describe
> >>>>>>>>> future updates as well. Both will be different.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> One more point about upgrading the store format. I was thinking
> >>> about
> >>>>>>>>> avoiding the second rolling bounce all together in the future:
> (1)
> >>> the
> >>>>>>>>> goal is to achieve an upgrade with zero downtime (2) this
> required
> >>> to
> >>>>>>>>> prepare the stores as "hot standbys" before we do the switch and
> >>>>> delete
> >>>>>>>>> the old stores. (3) the current proposal does the switch
> >> "globally"
> >>> --
> >>>>>>>>> this is simpler and due to the required second rebalance no
> >>>>> disadvantage.
> >>>>>>>>> However, a global consistent switch over might actually not be
> >>>>> required.
> >>>>>>>>> For "in_place" upgrade, following the protocol from above, we
> >> could
> >>>>>>>>> decouple the store switch and each instance could switch its
> store
> >>>>>>>>> independently from all other instances. After the rolling bounce,
> >> it
> >>>>>>>>> seems to be ok to switch from the old store to the new store
> >> "under
> >>>>> the
> >>>>>>>>> hood" whenever the new store is ready (this could even be done,
> >>> before
> >>>>>>>>> we switch to the new metadata version). Each time we update the
> >> "hot
> >>>>>>>>> standby" we check if it reached the "endOffset"  (or maybe X%
> that
> >>>>> could
> >>>>>>>>> either be hardcoded or configurable). If we detect this
> situation,
> >>> the
> >>>>>>>>> Streams application closes corresponding active tasks as well as
> >>> "hot
> >>>>>>>>> standby" tasks, and re-creates the new active tasks using the new
> >>>>> store.
> >>>>>>>>> (I need to go through the details once again, but it seems to be
> >>>>>>>>> feasible.).
> >>>>>>>>>
> >>>>>>>>> Combining this strategy with the "multiple assignment" idea,
> might
> >>>>> even
> >>>>>>>>> enable us to do an single rolling bounce upgrade from 1.1 -> 1.2.
> >>>>>>>>> Applications would just use the old store, as long as the new
> >> store
> >>> is
> >>>>>>>>> not ready, even if the new metadata version is used already.
> >>>>>>>>>
> >>>>>>>>> For future upgrades, a single rebalance would be sufficient, too,
> >>> even
> >>>>>>>>> if the stores are upgraded. We would not need any config
> >> parameters
> >>> as
> >>>>>>>>> the "probe" step allows us to detect the supported rebalance
> >>> metadata
> >>>>>>>>> version (and we would also not need multiple "assigmnent
> >> strategies"
> >>>>> as
> >>>>>>>>> out own protocol encoded everything we need).
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Let me know what you think.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote:
> >>>>>>>>>> @John:
> >>>>>>>>>>
> >>>>>>>>>> For the protocol version upgrade, it is only for the encoded
> >>> metadata
> >>>>>>>>> bytes
> >>>>>>>>>> protocol, which are just bytes-in bytes-out from Consumer's pov,
> >>> so I
> >>>>>>>>> think
> >>>>>>>>>> this change should be in the Streams layer as well.
> >>>>>>>>>>
> >>>>>>>>>> @Matthias:
> >>>>>>>>>>
> >>>>>>>>>> for 2), I agree that adding a "newest supported version" besides
> >>> the
> >>>>>>>>>> "currently used version for encoding" is a good idea to allow
> >>> either
> >>>>>>>>> case;
> >>>>>>>>>> the key is that in Streams we would likely end up with a mapping
> >>>>> from the
> >>>>>>>>>> protocol version to the other persistent data format versions
> >> such
> >>> as
> >>>>>>>>>> rocksDB, changelog. So with such a map we can actually achieve
> >> both
> >>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the upgraded protocol
> >>>>> version's
> >>>>>>>>>> corresponding data format does not change, e.g. 0.10.0 -> 0.10.1
> >>>>> leaders
> >>>>>>>>>> can choose to use the newer version in the first rolling bounce
> >>>>> directly
> >>>>>>>>>> and we can document to users that they would not need to set
> >>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the upgraded
> >> protocol
> >>>>>>>>> version
> >>>>>>>>>> does indicate the data format changes, e.g. 1.1 -> 1.2, and then
> >> we
> >>>>> can
> >>>>>>>>>> document that "upgrade.mode" needs to be set in the first
> rolling
> >>>>> bounce
> >>>>>>>>>> and reset in the second.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Besides that, some additional comments:
> >>>>>>>>>>
> >>>>>>>>>> 1) I still think "upgrade.from" is less intuitive for users to
> >> set
> >>>>> than
> >>>>>>>>>> "internal.protocol.version" where for the latter users only need
> >> to
> >>>>> set a
> >>>>>>>>>> single version, while the Streams will map that version to the
> >>>>> Streams
> >>>>>>>>>> assignor's behavior as well as the data format. But maybe I did
> >> not
> >>>>> get
> >>>>>>>>>> your idea about how the  "upgrade.from" config will be set,
> >> because
> >>>>> in
> >>>>>>>>>> your Compatibility section how the upgrade.from config will be
> >> set
> >>>>> for
> >>>>>>>>>> these two rolling bounces are not very clear: for example,
> should
> >>>>> user
> >>>>>>>>>> reset it to null in the second rolling bounce?
> >>>>>>>>>>
> >>>>>>>>>> 2) In the upgrade path description, rather than talking about
> >>>>> specific
> >>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just categorize all
> >>> the
> >>>>>>>>>> possible scenarios, even for future upgrade versions, what
> should
> >>> be
> >>>>> the
> >>>>>>>>>> standard operations? The categorized we can summarize to would
> be
> >>>>>>>>> (assuming
> >>>>>>>>>> user upgrade from version X to version Y, where X and Y are
> Kafka
> >>>>>>>>> versions,
> >>>>>>>>>> with the corresponding supported protocol version x and y):
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> a. x == y, i.e. metadata protocol does not change, and hence no
> >>>>>>>>> persistent
> >>>>>>>>>> data formats have changed.
> >>>>>>>>>>
> >>>>>>>>>> b. x != y, but all persistent data format remains the same.
> >>>>>>>>>>
> >>>>>>>>>> b. x !=y, AND some persistene data format like RocksDB format,
> >>>>> changelog
> >>>>>>>>>> format, has been changed.
> >>>>>>>>>>
> >>>>>>>>>> c. special case: we may need some special handling logic when
> >>>>> "current
> >>>>>>>>>> version" or "newest supported version" are not available in the
> >>>>> protocol,
> >>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> under the above scenarios, how many rolling bounces users need
> to
> >>>>>>>>> execute?
> >>>>>>>>>> how they should set the configs in each rolling bounce? and how
> >>>>> Streams
> >>>>>>>>>> library will execute in these cases?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Guozhang
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax <
> >>>>> matthias@confluent.io>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Ted,
> >>>>>>>>>>>
> >>>>>>>>>>> I still consider changing the KIP to include it right away --
> if
> >>>>> not,
> >>>>>>>>>>> I'll create a JIRA. Need to think it through in more detail
> >> first.
> >>>>>>>>>>>
> >>>>>>>>>>> (Same for other open questions like interface names -- I
> collect
> >>>>>>>>>>> feedback and update the KIP after we reach consensus :))
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote:
> >>>>>>>>>>>> Thanks for the details, Matthias.
> >>>>>>>>>>>>
> >>>>>>>>>>>> bq. change the metadata protocol only if a future release,
> >>> encoding
> >>>>>>>>> both
> >>>>>>>>>>> used
> >>>>>>>>>>>> and supported version might be an advantage
> >>>>>>>>>>>>
> >>>>>>>>>>>> Looks like encoding both versions wouldn't be implemented in
> >> this
> >>>>> KIP.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Please consider logging a JIRA with the encoding proposal.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <
> >>>>> matthias@confluent.io
> >>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> @Bill: I think a filter predicate should be part of user
> code.
> >>> And
> >>>>>>>>> even
> >>>>>>>>>>>>> if we want to add something like this, I would prefer to do
> it
> >>> in
> >>>>> a
> >>>>>>>>>>>>> separate KIP.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @James: I would love to avoid a second rolling bounce. But
> >> from
> >>> my
> >>>>>>>>>>>>> understanding it would not be possible.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The purpose of the second rolling bounce is indeed to switch
> >>> from
> >>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to switch from
> >> the
> >>>>> old
> >>>>>>>>>>>>> store to the new store (this happens after the last instance
> >>>>> bounces a
> >>>>>>>>>>>>> second time).
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The problem with one round of rolling bounces is, that it's
> >>>>> unclear
> >>>>>>>>> when
> >>>>>>>>>>>>> to which from version 2 to version 3. The
> >>>>> StreamsPartitionsAssignor is
> >>>>>>>>>>>>> stateless by design, and thus, the information which version
> >> it
> >>>>> should
> >>>>>>>>>>>>> use must be passed in from externally -- and we want to use
> >> the
> >>>>>>>>>>>>> StreamsConfig to pass in this information.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> During upgrade, all new instanced have no information about
> >> the
> >>>>>>>>> progress
> >>>>>>>>>>>>> of the upgrade (ie, how many other instanced got upgrades
> >>>>> already).
> >>>>>>>>>>>>> Therefore, it's not safe for them to send a version 3
> >>>>> subscription.
> >>>>>>>>> The
> >>>>>>>>>>>>> leader also has this limited view on the world and can only
> >> send
> >>>>>>>>> version
> >>>>>>>>>>>>> 2 assignments back.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can simplify the
> >>>>> upgrade.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> We did consider to change the metadata to make later upgrades
> >>> (ie,
> >>>>>>>>> from
> >>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we change the
> >> metadata
> >>> or
> >>>>>>>>>>>>> storage format again -- as long as we don't change it, a
> >> single
> >>>>>>>>> rolling
> >>>>>>>>>>>>> bounce is sufficient), by encoding "used version" and
> >> "supported
> >>>>>>>>>>>>> version". This would allow the leader to switch to the new
> >>> version
> >>>>>>>>>>>>> earlier and without a second rebalance: leader would receive
> >>> "used
> >>>>>>>>>>>>> version == old" and "supported version = old/new" -- as long
> >> as
> >>> at
> >>>>>>>>> least
> >>>>>>>>>>>>> one instance sends a "supported version = old" leader sends
> >> old
> >>>>>>>>> version
> >>>>>>>>>>>>> assignment back. However, encoding both version would allow
> >> that
> >>>>> the
> >>>>>>>>>>>>> leader can send a new version assignment back, right after
> the
> >>>>> first
> >>>>>>>>>>>>> round or rebalance finished (all instances send "supported
> >>>>> version =
> >>>>>>>>>>>>> new"). However, there are still two issues with this:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1) if we switch to the new format right after the last
> >> instance
> >>>>>>>>> bounced,
> >>>>>>>>>>>>> the new stores might not be ready to be used -- this could
> >> lead
> >>> to
> >>>>>>>>>>>>> "downtime" as store must be restored before processing can
> >>> resume.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2) Assume an instance fails and is restarted again. At this
> >>>>> point, the
> >>>>>>>>>>>>> instance will still have "upgrade mode" enabled and thus
> sends
> >>>>> the old
> >>>>>>>>>>>>> protocol data. However, it would be desirable to never fall
> >> back
> >>>>> to
> >>>>>>>>> the
> >>>>>>>>>>>>> old protocol after the switch to the new protocol.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The second issue is minor and I guess if users set-up the
> >>> instance
> >>>>>>>>>>>>> properly it could be avoided. However, the first issue would
> >>>>> prevent
> >>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we consider
> >> that
> >>> we
> >>>>>>>>> might
> >>>>>>>>>>>>> change the metadata protocol only if a future release,
> >> encoding
> >>>>> both
> >>>>>>>>>>>>> used and supported version might be an advantage in the
> future
> >>>>> and we
> >>>>>>>>>>>>> could consider to add this information in 1.2 release to
> >> prepare
> >>>>> for
> >>>>>>>>>>> this.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Btw: monitoring the log, is also only required to give the
> >>>>> instances
> >>>>>>>>>>>>> enough time to prepare the stores in new format. If you would
> >> do
> >>>>> the
> >>>>>>>>>>>>> second rolling bounce before this, it would still work --
> >>>>> however, you
> >>>>>>>>>>>>> might see app "downtime" as the new store must be fully
> >> restored
> >>>>>>>>> before
> >>>>>>>>>>>>> processing can resume.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Does this make sense?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote:
> >>>>>>>>>>>>>> Matthias,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> For all the upgrade paths, is it possible to get rid of the
> >> 2nd
> >>>>>>>>> rolling
> >>>>>>>>>>>>> bounce?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> For the in-place upgrade, it seems like primary difference
> >>>>> between
> >>>>>>>>> the
> >>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to decide
> >>>>> whether to
> >>>>>>>>>>> send
> >>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3.  (Actually,
> >>>>> there is
> >>>>>>>>>>>>> another difference mentioned in that the KIP says that the
> 2nd
> >>>>> rolling
> >>>>>>>>>>>>> bounce should happen after all new state stores are created
> by
> >>> the
> >>>>>>>>>>>>> background thread. However, within the 2nd rolling bounce, we
> >>> say
> >>>>> that
> >>>>>>>>>>>>> there is still a background thread, so it seems like is no
> >>> actual
> >>>>>>>>>>>>> requirement to wait for the new state stores to be created.)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal with
> >>> mixed-mode
> >>>>>>>>>>> (having
> >>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer group). It
> >>> seems
> >>>>>>>>> like
> >>>>>>>>>>> we
> >>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic
> >>>>> (somehow/somewhere)
> >>>>>>>>>>> such
> >>>>>>>>>>>>> that:
> >>>>>>>>>>>>>> * Instances send Subscription Version 2 until all instances
> >> are
> >>>>>>>>> running
> >>>>>>>>>>>>> the new code.
> >>>>>>>>>>>>>> * Once all the instances are running the new code, then one
> >> at
> >>> a
> >>>>>>>>> time,
> >>>>>>>>>>>>> the instances start sending Subscription V3. Leader still
> >> hands
> >>>>> out
> >>>>>>>>>>>>> Assignment Version 2, until all new state stores are ready.
> >>>>>>>>>>>>>> * Once all instances report that new stores are ready,
> Leader
> >>>>> sends
> >>>>>>>>> out
> >>>>>>>>>>>>> Assignment Version 3.
> >>>>>>>>>>>>>> * Once an instance receives an Assignment Version 3, it can
> >>>>> delete
> >>>>>>>>> the
> >>>>>>>>>>>>> old state store.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Doing it that way seems like it would reduce a lot of
> >>>>>>>>>>>>> operator/deployment overhead. No need to do 2 rolling
> >> restarts.
> >>> No
> >>>>>>>>> need
> >>>>>>>>>>> to
> >>>>>>>>>>>>> monitor logs for state store rebuild. You just deploy it, and
> >>> the
> >>>>>>>>>>> instances
> >>>>>>>>>>>>> update themselves.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The thing that made me think of this is that the "2 rolling
> >>>>> bounces"
> >>>>>>>>> is
> >>>>>>>>>>>>> similar to what Kafka brokers have to do changes in
> >>>>>>>>>>>>> inter.broker.protocol.version and log.message.format.version.
> >>> And
> >>>>> in
> >>>>>>>>> the
> >>>>>>>>>>>>> broker case, it seems like it would be possible (with some
> >> work
> >>> of
> >>>>>>>>>>> course)
> >>>>>>>>>>>>> to modify kafka to allow us to do similar auto-detection of
> >>> broker
> >>>>>>>>>>>>> capabilities and automatically do a switchover from old/new
> >>>>> versions.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> -James
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <
> bbejeck@gmail.com
> >>>
> >>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Matthias,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I do have one question regarding the retrieval methods on
> >> the
> >>>>> new
> >>>>>>>>>>>>>>> interfaces.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Would want to consider adding one method with a Predicate
> >> that
> >>>>> would
> >>>>>>>>>>>>> allow
> >>>>>>>>>>>>>>> for filtering records by the timestamp stored with the
> >> record?
> >>>>> Or
> >>>>>>>>> is
> >>>>>>>>>>>>> this
> >>>>>>>>>>>>>>> better left for users to implement themselves once the data
> >>> has
> >>>>> been
> >>>>>>>>>>>>>>> retrieved?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>> Bill
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <
> yuzhihong@gmail.com
> >>>
> >>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Matthias:
> >>>>>>>>>>>>>>>> For my point #1, I don't have preference as to which
> >>> separator
> >>>>> is
> >>>>>>>>>>>>> chosen.
> >>>>>>>>>>>>>>>> Given the background you mentioned, current choice is
> good.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> For #2, I think my proposal is better since it is closer
> to
> >>>>> English
> >>>>>>>>>>>>>>>> grammar.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Would be good to listen to what other people think.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax <
> >>>>>>>>>>> matthias@confluent.io
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for the comments!
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> @Guozhang:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance metadata
> upgrade
> >>> fix
> >>>>>>>>>>>>>>>>> (addressing the mentioned
> >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054) It
> >> give a
> >>>>> first
> >>>>>>>>>>>>>>>>> impression how the metadata upgrade works including a
> >> system
> >>>>> test:
> >>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I can share other PRs as soon as they are ready. I agree
> >>> that
> >>>>> the
> >>>>>>>>>>> KIP
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> complex am I ok with putting out more code to give better
> >>>>>>>>> discussion
> >>>>>>>>>>>>>>>>> context.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> @Ted:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the
> >>>>>>>>> `processing.guarantee`
> >>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and `exactly_once`
> >> as
> >>>>>>>>> values.
> >>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs dash but I
> >>> prefer
> >>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we can also
> >> change
> >>>>> it to
> >>>>>>>>>>>>> `-`.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> About the interface name: I am fine either way -- I
> >> stripped
> >>>>> the
> >>>>>>>>>>>>> `With`
> >>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be good to get
> >>>>> feedback
> >>>>>>>>>>> from
> >>>>>>>>>>>>>>>>> others and pick the name the majority prefers.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> @John:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> We can certainly change it. I agree that it would not
> >> make a
> >>>>>>>>>>>>> difference.
> >>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the two version
> >>> might
> >>>>>>>>>>>>> introduce
> >>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I don't hit an
> >>>>> issue
> >>>>>>>>> with
> >>>>>>>>>>>>>>>>> putting the `-v2` to the store directory instead of
> >>>>> `rocksdb-v2`
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote:
> >>>>>>>>>>>>>>>>>> Hey Matthias,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several questions queued
> >>> up,
> >>>>> but
> >>>>>>>>>>> they
> >>>>>>>>>>>>>>>>> were
> >>>>>>>>>>>>>>>>>> all in the "rejected alternatives" section... oh, well.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> One very minor thought re changing the state directory
> >> from
> >>>>>>>>>>>>>>>>> "/<state.dir>/<
> >>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" to
> >>>>> "/<state.dir>/<
> >>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb-v2/storeName/": if
> you
> >>>>> put the
> >>>>>>>>>>>>> "v2"
> >>>>>>>>>>>>>>>>>> marker on the storeName part of the path (i.e.,
> >>>>> "/<state.dir>/<
> >>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName-v2/"), then
> >>> you
> >>>>> get
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>> benefits without altering the high-level directory
> >>> structure.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> It may not matter, but I could imagine people running
> >>>>> scripts to
> >>>>>>>>>>>>>>>> monitor
> >>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other such use
> >> cases.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>> -John
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <
> >>> yuzhihong@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Matthias:
> >>>>>>>>>>>>>>>>>>> Nicely written KIP.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ? Underscore may
> >>>>> sometimes
> >>>>>>>>> be
> >>>>>>>>>>>>> miss
> >>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more friendly to
> >>> user.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> public interface ReadOnlyKeyValueTimestampStore<K, V>
> {
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name for
> >> the
> >>>>>>>>> class ?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang <
> >>>>>>>>> wangguoz@gmail.com
> >>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch section and it
> >> looks
> >>>>> good
> >>>>>>>>> to
> >>>>>>>>>>>>> me,
> >>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you also share it
> >> here
> >>>>> so
> >>>>>>>>> that
> >>>>>>>>>>>>>>>>> people
> >>>>>>>>>>>>>>>>>>>> can take a look?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like this
> >> there
> >>>>> are
> >>>>>>>>>>> always
> >>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it is better
> to
> >>>>> have
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> implementation in parallel along with the design
> >>>>> discussion :)
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax <
> >>>>>>>>>>>>>>>> matthias@confluent.io
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams API to
> allow
> >>>>> storing
> >>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the basis to
> >>>>> resolve
> >>>>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests).
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about this!
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message