From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: [DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB
Date Fri, 09 Mar 2018 22:27:17 GMT
@Bill: I think a filter predicate should be part of user code. And even
if we want to add something like this, I would prefer to do it in a
separate KIP.

@James: I would love to avoid a second rolling bounce. But from my
understanding it would not be possible.

The purpose of the second rolling bounce is indeed to switch from
version 2 to 3. It also has a second purpose, to switch from the old
store to the new store (this happens after the last instance bounces a
second time).

The problem with one round of rolling bounces is, that it's unclear when
to which from version 2 to version 3. The StreamsPartitionsAssignor is
stateless by design, and thus, the information which version it should
use must be passed in from externally -- and we want to use the
StreamsConfig to pass in this information.

During upgrade, all new instanced have no information about the progress
of the upgrade (ie, how many other instanced got upgrades already).
Therefore, it's not safe for them to send a version 3 subscription. The
leader also has this limited view on the world and can only send version
2 assignments back.

Thus, for the 1.2 upgrade, I don't think we can simplify the upgrade.

We did consider to change the metadata to make later upgrades (ie, from
1.2 to 1.x) simpler though (for the case we change the metadata or
storage format again -- as long as we don't change it, a single rolling
bounce is sufficient), by encoding "used version" and "supported
version". This would allow the leader to switch to the new version
earlier and without a second rebalance: leader would receive "used
version == old" and "supported version = old/new" -- as long as at least
one instance sends a "supported version = old" leader sends old version
assignment back. However, encoding both version would allow that the
leader can send a new version assignment back, right after the first
round or rebalance finished (all instances send "supported version =
new"). However, there are still two issues with this:

1) if we switch to the new format right after the last instance bounced,
the new stores might not be ready to be used -- this could lead to
"downtime" as store must be restored before processing can resume.

2) Assume an instance fails and is restarted again. At this point, the
instance will still have "upgrade mode" enabled and thus sends the old
protocol data. However, it would be desirable to never fall back to the
old protocol after the switch to the new protocol.

The second issue is minor and I guess if users set-up the instance
properly it could be avoided. However, the first issue would prevent
"zero downtime" upgrades. Having said this, if we consider that we might
change the metadata protocol only if a future release, encoding both
used and supported version might be an advantage in the future and we
could consider to add this information in 1.2 release to prepare for this.

Btw: monitoring the log, is also only required to give the instances
enough time to prepare the stores in new format. If you would do the
second rolling bounce before this, it would still work -- however, you
might see app "downtime" as the new store must be fully restored before
processing can resume.

Does this make sense?


On 3/9/18 11:36 AM, James Cheng wrote:
> Matthias,
> For all the upgrade paths, is it possible to get rid of the 2nd rolling bounce?
> For the in-place upgrade, it seems like primary difference between the 1st rolling bounce
and the 2nd rolling bounce is to decide whether to send Subscription Version 2 or Subscription
Version 3.  (Actually, there is another difference mentioned in that the KIP says that the
2nd rolling bounce should happen after all new state stores are created by the background
thread. However, within the 2nd rolling bounce, we say that there is still a background thread,
so it seems like is no actual requirement to wait for the new state stores to be created.)
> The 2nd rolling bounce already knows how to deal with mixed-mode (having both Version
2 and Version 3 in the same consumer group). It seems like we could get rid of the 2nd bounce
if we added logic (somehow/somewhere) such that:
> * Instances send Subscription Version 2 until all instances are running the new code.
> * Once all the instances are running the new code, then one at a time, the instances
start sending Subscription V3. Leader still hands out Assignment Version 2, until all new
state stores are ready.
> * Once all instances report that new stores are ready, Leader sends out Assignment Version
> * Once an instance receives an Assignment Version 3, it can delete the old state store.
> Doing it that way seems like it would reduce a lot of operator/deployment overhead. No
need to do 2 rolling restarts. No need to monitor logs for state store rebuild. You just deploy
it, and the instances update themselves.
> What do you think?
> The thing that made me think of this is that the "2 rolling bounces" is similar to what
Kafka brokers have to do changes in inter.broker.protocol.version and log.message.format.version.
And in the broker case, it seems like it would be possible (with some work of course) to modify
kafka to allow us to do similar auto-detection of broker capabilities and automatically do
a switchover from old/new versions. 
> -James
>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <bbejeck@gmail.com> wrote:
>> Matthias,
>> Thanks for the KIP, it's a +1 from me.
>> I do have one question regarding the retrieval methods on the new
>> interfaces.
>> Would want to consider adding one method with a Predicate that would allow
>> for filtering records by the timestamp stored with the record?  Or is this
>> better left for users to implement themselves once the data has been
>> retrieved?
>> Thanks,
>> Bill
>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <yuzhihong@gmail.com> wrote:
>>> Matthias:
>>> For my point #1, I don't have preference as to which separator is chosen.
>>> Given the background you mentioned, current choice is good.
>>> For #2, I think my proposal is better since it is closer to English
>>> grammar.
>>> Would be good to listen to what other people think.
>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax <matthias@confluent.io>
>>> wrote:
>>>> Thanks for the comments!
>>>> @Guozhang:
>>>> So far, there is one PR for the rebalance metadata upgrade fix
>>>> (addressing the mentioned
>>>> https://issues.apache.org/jira/browse/KAFKA-6054) It give a first
>>>> impression how the metadata upgrade works including a system test:
>>>> https://github.com/apache/kafka/pull/4636
>>>> I can share other PRs as soon as they are ready. I agree that the KIP is
>>>> complex am I ok with putting out more code to give better discussion
>>>> context.
>>>> @Ted:
>>>> I picked `_` instead of `-` to align with the `processing.guarantee`
>>>> parameter that accepts `at_least_one` and `exactly_once` as values.
>>>> Personally, I don't care about underscore vs dash but I prefer
>>>> consistency. If you feel strong about it, we can also change it to `-`.
>>>> About the interface name: I am fine either way -- I stripped the `With`
>>>> to keep the name a little shorter. Would be good to get feedback from
>>>> others and pick the name the majority prefers.
>>>> @John:
>>>> We can certainly change it. I agree that it would not make a difference.
>>>> I'll dig into the code to see if any of the two version might introduce
>>>> undesired complexity and update the KIP if I don't hit an issue with
>>>> putting the `-v2` to the store directory instead of `rocksdb-v2`
>>>> -Matthias
>>>> On 3/8/18 2:44 PM, John Roesler wrote:
>>>>> Hey Matthias,
>>>>> The KIP looks good to me. I had several questions queued up, but they
>>>> were
>>>>> all in the "rejected alternatives" section... oh, well.
>>>>> One very minor thought re changing the state directory from
>>>> "/<state.dir>/<
>>>>> application.id>/<task.id>/rocksdb/storeName/" to "/<state.dir>/<
>>>>> application.id>/<task.id>/rocksdb-v2/storeName/": if you put
the "v2"
>>>>> marker on the storeName part of the path (i.e., "/<state.dir>/<
>>>>> application.id>/<task.id>/rocksdb/storeName-v2/"), then you
get the
>>> same
>>>>> benefits without altering the high-level directory structure.
>>>>> It may not matter, but I could imagine people running scripts to
>>> monitor
>>>>> rocksdb disk usage for each task, or other such use cases.
>>>>> Thanks,
>>>>> -John
>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <yuzhihong@gmail.com> wrote:
>>>>>> Matthias:
>>>>>> Nicely written KIP.
>>>>>> "in_place" : can this be "in-place" ? Underscore may sometimes be
>>>>>> typed (as '-'). I think using '-' is more friendly to user.
>>>>>> public interface ReadOnlyKeyValueTimestampStore<K, V> {
>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name for the class ?
>>>>>> Thanks
>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang <wangguoz@gmail.com>
>>>> wrote:
>>>>>>> Hello Matthias, thanks for the KIP.
>>>>>>> I've read through the upgrade patch section and it looks good
to me,
>>> if
>>>>>> you
>>>>>>> already have a WIP PR for it could you also share it here so
>>>> people
>>>>>>> can take a look?
>>>>>>> I'm +1 on the KIP itself. But large KIPs like this there are
>>>> some
>>>>>>> devil hidden in the details, so I think it is better to have
>>>>>>> implementation in parallel along with the design discussion :)
>>>>>>> Guozhang
>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax <
>>> matthias@confluent.io
>>>>>>> wrote:
>>>>>>>> Hi,
>>>>>>>> I want to propose KIP-258 for the Streams API to allow storing
>>>>>>>> timestamps in RocksDB. This feature is the basis to resolve
>>>>>>>> tickets (issues and feature requests).
>>>>>>>> Looking forward to your comments about this!
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>>>>>>>> -Matthias
>>>>>>> --
>>>>>>> -- Guozhang

