kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jan Filipiak <Jan.Filip...@trivago.com>
Subject Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.
Date Wed, 05 Dec 2018 22:40:20 GMT
If they don't find the time:
They usually take the opposite path from me :D
so the answer would be clear.

hence my suggestion to vote.


On 04.12.2018 21:06, Adam Bellemare wrote:
> Hi Guozhang and Matthias
>
> I know both of you are quite busy, but we've gotten this KIP to a point
> where we need more guidance on the API (perhaps a bit of a tie-breaker, if
> you will). If you have anyone else you may think should look at this,
> please tag them accordingly.
>
> The scenario is as such:
>
> Current Option:
> API:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces
> 1) Rekey the data to CombinedKey, and shuffles it to the partition with the
> foreignKey (repartition 1)
> 2) Join the data
> 3) Shuffle the data back to the original node (repartition 2)
> 4) Resolve out-of-order arrival / race condition due to foreign-key changes.
>
> Alternate Option:
> Perform #1 and #2 above, and return a KScatteredTable.
> - It would be keyed on a wrapped key function: <CombinedKey<KO, K>, VR> (KO
> = Other Table Key, K = This Table Key, VR = Joined Result)
> - KScatteredTable.resolve() would perform #3 and #4 but otherwise a user
> would be able to perform additional functions directly from the
> KScatteredTable (TBD - currently out of scope).
> - John's analysis 2-emails up is accurate as to the tradeoffs.
>
> Current Option is coded as-is. Alternate option is possible, but will
> require for implementation details to be made in the API and some exposure
> of new data structures into the API (ie: CombinedKey).
>
> I appreciate any insight into this.
>
> Thanks.
>
> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare <adam.bellemare@gmail.com>
> wrote:
>
>> Hi John
>>
>> Thanks for your feedback and assistance. I think your summary is accurate
>> from my perspective. Additionally, I would like to add that there is a risk
>> of inconsistent final states without performing the resolution. This is a
>> major concern for me as most of the data I have dealt with is produced by
>> relational databases. We have seen a number of cases where a user in the
>> Rails UI has modified the field (foreign key), realized they made a
>> mistake, and then updated the field again with a new key. The events are
>> propagated out as they are produced, and as such we have had real-world
>> cases where these inconsistencies were propagated downstream as the final
>> values due to the race conditions in the fanout of the data.
>>
>> This solution that I propose values correctness of the final result over
>> other factors.
>>
>> We could always move this function over to using a KScatteredTable
>> implementation in the future, and simply deprecate it this join API in
>> time. I think I would like to hear more from some of the other major
>> committers on which course of action they would think is best before any
>> more coding is done.
>>
>> Thanks again
>>
>> Adam
>>
>>
>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler <john@confluent.io> wrote:
>>
>>> Hi Jan and Adam,
>>>
>>> Wow, thanks for doing that test, Adam. Those results are encouraging.
>>>
>>> Thanks for your performance experience as well, Jan. I agree that avoiding
>>> unnecessary join outputs is especially important when the fan-out is so
>>> high. I suppose this could also be built into the implementation we're
>>> discussing, but it wouldn't have to be specified in the KIP (since it's an
>>> API-transparent optimization).
>>>
>>> As far as whether or not to re-repartition the data, I didn't bring it up
>>> because it sounded like the two of you agreed to leave the KIP as-is,
>>> despite the disagreement.
>>>
>>> If you want my opinion, I feel like both approaches are reasonable.
>>> It sounds like Jan values more the potential for developers to optimize
>>> their topologies to re-use the intermediate nodes, whereas Adam places
>>> more
>>> value on having a single operator that people can use without extra steps
>>> at the end.
>>>
>>> Personally, although I do find it exceptionally annoying when a framework
>>> gets in my way when I'm trying to optimize something, it seems better to
>>> go
>>> for a single operation.
>>> * Encapsulating the internal transitions gives us significant latitude in
>>> the implementation (for example, joining only at the end, not in the
>>> middle
>>> to avoid extra data copying and out-of-order resolution; how we represent
>>> the first repartition keys (combined keys vs. value vectors), etc.). If we
>>> publish something like a KScatteredTable with the right-partitioned joined
>>> data, then the API pretty much locks in the implementation as well.
>>> * The API seems simpler to understand and use. I do mean "seems"; if
>>> anyone
>>> wants to make the case that KScatteredTable is actually simpler, I think
>>> hypothetical usage code would help. From a relational algebra perspective,
>>> it seems like KTable.join(KTable) should produce a new KTable in all
>>> cases.
>>> * That said, there might still be room in the API for a different
>>> operation
>>> like what Jan has proposed to scatter a KTable, and then do things like
>>> join, re-group, etc from there... I'm not sure; I haven't thought through
>>> all the consequences yet.
>>>
>>> This is all just my opinion after thinking over the discussion so far...
>>> -John
>>>
>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare <adam.bellemare@gmail.com>
>>> wrote:
>>>
>>>> Updated the PR to take into account John's feedback.
>>>>
>>>> I did some preliminary testing for the performance of the prefixScan. I
>>>> have attached the file, but I will also include the text in the body
>>> here
>>>> for archival purposes (I am not sure what happens to attached files). I
>>>> also updated the PR and the KIP accordingly.
>>>>
>>>> Summary: It scales exceptionally well for scanning large values of
>>>> records. As Jan mentioned previously, the real issue would be more
>>> around
>>>> processing the resulting records after obtaining them. For instance, it
>>>> takes approximately ~80-120 mS to flush the buffer and a further
>>> ~35-85mS
>>>> to scan 27.5M records, obtaining matches for 2.5M of them. Iterating
>>>> through the records just to generate a simple count takes ~ 40 times
>>> longer
>>>> than the flush + scan combined.
>>>>
>>>>
>>> ============================================================================================
>>>> Setup:
>>>>
>>>>
>>> ============================================================================================
>>>> Java 9 with default settings aside from a 512 MB heap (Xmx512m, Xms512m)
>>>> CPU: i7 2.2 Ghz.
>>>>
>>>> Note: I am using a slightly-modified, directly-accessible Kafka Streams
>>>> RocksDB
>>>> implementation (RocksDB.java, basically just avoiding the
>>>> ProcessorContext).
>>>> There are no modifications to the default RocksDB values provided in the
>>>> 2.1/trunk release.
>>>>
>>>>
>>>> keysize = 128 bytes
>>>> valsize = 512 bytes
>>>>
>>>> Step 1:
>>>> Write X positive matching events: (key = prefix + left-padded
>>>> auto-incrementing integer)
>>>> Step 2:
>>>> Write 10X negative matching events (key = left-padded auto-incrementing
>>>> integer)
>>>> Step 3:
>>>> Perform flush
>>>> Step 4:
>>>> Perform prefixScan
>>>> Step 5:
>>>> Iterate through return Iterator and validate the count of expected
>>> events.
>>>>
>>>>
>>>>
>>> ============================================================================================
>>>> Results:
>>>>
>>>>
>>> ============================================================================================
>>>> X = 1k (11k events total)
>>>> Flush Time = 39 mS
>>>> Scan Time = 7 mS
>>>> 6.9 MB disk
>>>>
>>>>
>>> --------------------------------------------------------------------------------------------
>>>> X = 10k (110k events total)
>>>> Flush Time = 45 mS
>>>> Scan Time = 8 mS
>>>> 127 MB
>>>>
>>>>
>>> --------------------------------------------------------------------------------------------
>>>> X = 100k (1.1M events total)
>>>> Test1:
>>>> Flush Time = 60 mS
>>>> Scan Time = 12 mS
>>>> 678 MB
>>>>
>>>> Test2:
>>>> Flush Time = 45 mS
>>>> Scan Time = 7 mS
>>>> 576 MB
>>>>
>>>>
>>> --------------------------------------------------------------------------------------------
>>>> X = 1MB (11M events total)
>>>> Test1:
>>>> Flush Time = 52 mS
>>>> Scan Time = 19 mS
>>>> 7.2 GB
>>>>
>>>> Test2:
>>>> Flush Time = 84 mS
>>>> Scan Time = 34 mS
>>>> 9.1 GB
>>>>
>>>>
>>> --------------------------------------------------------------------------------------------
>>>> X = 2.5M (27.5M events total)
>>>> Test1:
>>>> Flush Time = 82 mS
>>>> Scan Time = 63 mS
>>>> 17GB - 276 sst files
>>>>
>>>> Test2:
>>>> Flush Time = 116 mS
>>>> Scan Time = 35 mS
>>>> 23GB - 361 sst files
>>>>
>>>> Test3:
>>>> Flush Time = 103 mS
>>>> Scan Time = 82 mS
>>>> 19 GB - 300 sst files
>>>>
>>>>
>>> --------------------------------------------------------------------------------------------
>>>>
>>>> I had to limit my testing on my laptop to X = 2.5M events. I tried to go
>>>> to X = 10M (110M events) but RocksDB was going into the 100GB+ range
>>> and my
>>>> laptop ran out of disk. More extensive testing could be done but I
>>> suspect
>>>> that it would be in line with what we're seeing in the results above.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> At this point in time, I think the only major discussion point is really
>>>> around what Jan and I have disagreed on: repartitioning back + resolving
>>>> potential out of order issues or leaving that up to the client to
>>> handle.
>>>>
>>>>
>>>> Thanks folks,
>>>>
>>>> Adam
>>>>
>>>>
>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <Jan.Filipiak@trivago.com>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On 29.11.2018 15:14, John Roesler wrote:
>>>>>> Hi all,
>>>>>>
>>>>>> Sorry that this discussion petered out... I think the 2.1 release
>>>>> caused an
>>>>>> extended distraction that pushed it off everyone's radar (which was
>>>>>> precisely Adam's concern). Personally, I've also had some extend
>>>>>> distractions of my own that kept (and continue to keep) me
>>> preoccupied.
>>>>>>
>>>>>> However, calling for a vote did wake me up, so I guess Jan was on the
>>>>> right
>>>>>> track!
>>>>>>
>>>>>> I've gone back and reviewed the whole KIP document and the prior
>>>>>> discussion, and I'd like to offer a few thoughts:
>>>>>>
>>>>>> API Thoughts:
>>>>>>
>>>>>> 1. If I read the KIP right, you are proposing a many-to-one join.
>>> Could
>>>>> we
>>>>>> consider naming it manyToOneJoin? Or, if you prefer, flip the design
>>>>> around
>>>>>> and make it a oneToManyJoin?
>>>>>>
>>>>>> The proposed name "joinOnForeignKey" disguises the join type, and it
>>>>> seems
>>>>>> like it might trick some people into using it for a one-to-one join.
>>>>> This
>>>>>> would work, of course, but it would be super inefficient compared to
>>> a
>>>>>> simple rekey-and-join.
>>>>>>
>>>>>> 2. I might have missed it, but I don't think it's specified whether
>>>>> it's an
>>>>>> inner, outer, or left join. I'm guessing an outer join, as
>>> (neglecting
>>>>> IQ),
>>>>>> the rest can be achieved by filtering or by handling it in the
>>>>> ValueJoiner.
>>>>>>
>>>>>> 3. The arg list to joinOnForeignKey doesn't look quite right.
>>>>>> 3a. Regarding Serialized: There are a few different paradigms in
>>> play in
>>>>>> the Streams API, so it's confusing, but instead of three Serialized
>>>>> args, I
>>>>>> think it would be better to have one that allows (optionally) setting
>>>>> the 4
>>>>>> incoming serdes. The result serde is defined by the Materialized. The
>>>>>> incoming serdes can be optional because they might already be
>>> available
>>>>> on
>>>>>> the source KTables, or the default serdes from the config might be
>>>>>> applicable.
>>>>>>
>>>>>> 3b. Is the StreamPartitioner necessary? The other joins don't allow
>>>>> setting
>>>>>> one, and it seems like it might actually be harmful, since the rekey
>>>>>> operation needs to produce results that are co-partitioned with the
>>>>> "other"
>>>>>> KTable.
>>>>>>
>>>>>> 4. I'm fine with the "reserved word" header, but I didn't actually
>>>>> follow
>>>>>> what Matthias meant about namespacing requiring "deserializing" the
>>>>> record
>>>>>> header. The headers are already Strings, so I don't think that
>>>>>> deserialization is required. If we applied the namespace at source
>>> nodes
>>>>>> and stripped it at sink nodes, this would be practically no overhead.
>>>>> The
>>>>>> advantage of the namespace idea is that no public API change wrt
>>> headers
>>>>>> needs to happen, and no restrictions need to be placed on users'
>>>>> headers.
>>>>>>
>>>>>> (Although I'm wondering if we can get away without the header at
>>> all...
>>>>>> stay tuned)
>>>>>>
>>>>>> 5. I also didn't follow the discussion about the HWM table growing
>>>>> without
>>>>>> bound. As I read it, the HWM table is effectively implementing OCC to
>>>>>> resolve the problem you noted with disordering when the rekey is
>>>>>> reversed... particularly notable when the FK changes. As such, it
>>> only
>>>>>> needs to track the most recent "version" (the offset in the source
>>>>>> partition) of each key. Therefore, it should have the same number of
>>>>> keys
>>>>>> as the source table at all times.
>>>>>>
>>>>>> I see that you are aware of KIP-258, which I think might be relevant
>>> in
>>>>> a
>>>>>> couple of ways. One: it's just about storing the timestamp in the
>>> state
>>>>>> store, but the ultimate idea is to effectively use the timestamp as
>>> an
>>>>> OCC
>>>>>> "version" to drop disordered updates. You wouldn't want to use the
>>>>>> timestamp for this operation, but if you were to use a similar
>>>>> mechanism to
>>>>>> store the source offset in the store alongside the re-keyed values,
>>> then
>>>>>> you could avoid a separate table.
>>>>>>
>>>>>> 6. You and Jan have been thinking about this for a long time, so I've
>>>>>> probably missed something here, but I'm wondering if we can avoid the
>>>>> HWM
>>>>>> tracking at all and resolve out-of-order during a final join
>>> instead...
>>>>>>
>>>>>> Let's say we're joining a left table (Integer K: Letter FK, (other
>>>>> data))
>>>>>> to a right table (Letter K: (some data)).
>>>>>>
>>>>>> Left table:
>>>>>> 1: (A, xyz)
>>>>>> 2: (B, asd)
>>>>>>
>>>>>> Right table:
>>>>>> A: EntityA
>>>>>> B: EntityB
>>>>>>
>>>>>> We could do a rekey as you proposed with a combined key, but not
>>>>>> propagating the value at all..
>>>>>> Rekey table:
>>>>>> A-1: (dummy value)
>>>>>> B-2: (dummy value)
>>>>>>
>>>>>> Which we then join with the right table to produce:
>>>>>> A-1: EntityA
>>>>>> B-2: EntityB
>>>>>>
>>>>>> Which gets rekeyed back:
>>>>>> 1: A, EntityA
>>>>>> 2: B, EntityB
>>>>>>
>>>>>> And finally we do the actual join:
>>>>>> Result table:
>>>>>> 1: ((A, xyz), EntityA)
>>>>>> 2: ((B, asd), EntityB)
>>>>>>
>>>>>> The thing is that in that last join, we have the opportunity to
>>> compare
>>>>> the
>>>>>> current FK in the left table with the incoming PK of the right
>>> table. If
>>>>>> they don't match, we just drop the event, since it must be outdated.
>>>>>>
>>>>>
>>>>>> In your KIP, you gave an example in which (1: A, xyz) gets updated to
>>>>> (1:
>>>>>> B, xyz), ultimately yielding a conundrum about whether the final
>>> state
>>>>>> should be (1: null) or (1: joined-on-B). With the algorithm above,
>>> you
>>>>>> would be considering (1: (B, xyz), (A, null)) vs (1: (B, xyz), (B,
>>>>>> EntityB)). It seems like this does give you enough information to
>>> make
>>>>> the
>>>>>> right choice, regardless of disordering.
>>>>>
>>>>> Will check Adams patch, but this should work. As mentioned often I am
>>>>> not convinced on partitioning back for the user automatically. I think
>>>>> this is the real performance eater ;)
>>>>>
>>>>>>
>>>>>>
>>>>>> 7. Last thought... I'm a little concerned about the performance of
>>> the
>>>>>> range scans when records change in the right table. You've said that
>>>>> you've
>>>>>> been using the algorithm you presented in production for a while. Can
>>>>> you
>>>>>> give us a sense of the performance characteristics you've observed?
>>>>>>
>>>>>
>>>>> Make it work, make it fast, make it beautiful. The topmost thing here
>>> is
>>>>> / was correctness. In practice I do not measure the performance of the
>>>>> range scan. Usual cases I run this with is emitting 500k - 1kk rows
>>>>> on a left hand side change. The range scan is just the work you gotta
>>>>> do, also when you pack your data into different formats, usually the
>>>>> rocks performance is very tight to the size of the data and we can't
>>>>> really change that. It is more important for users to prevent useless
>>>>> updates to begin with. My left hand side is guarded to drop changes
>>> that
>>>>> are not going to change my join output.
>>>>>
>>>>> usually it's:
>>>>>
>>>>> drop unused fields and then don't forward if old.equals(new)
>>>>>
>>>>> regarding to the performance of creating an iterator for smaller
>>>>> fanouts, users can still just do a group by first then anyways.
>>>>>
>>>>>
>>>>>
>>>>>> I could only think of one alternative, but I'm not sure if it's
>>> better
>>>>> or
>>>>>> worse... If the first re-key only needs to preserve the original key,
>>>>> as I
>>>>>> proposed in #6, then we could store a vector of keys in the value:
>>>>>>
>>>>>> Left table:
>>>>>> 1: A,...
>>>>>> 2: B,...
>>>>>> 3: A,...
>>>>>>
>>>>>> Gets re-keyed:
>>>>>> A: [1, 3]
>>>>>> B: [2]
>>>>>>
>>>>>> Then, the rhs part of the join would only need a regular single-key
>>>>> lookup.
>>>>>> Of course we have to deal with the problem of large values, as
>>> there's
>>>>> no
>>>>>> bound on the number of lhs records that can reference rhs records.
>>>>> Offhand,
>>>>>> I'd say we could page the values, so when one row is past the
>>>>> threshold, we
>>>>>> append the key for the next page. Then in most cases, it would be a
>>>>> single
>>>>>> key lookup, but for large fan-out updates, it would be one per (max
>>>>> value
>>>>>> size)/(avg lhs key size).
>>>>>>
>>>>>> This seems more complex, though... Plus, I think there's some extra
>>>>>> tracking we'd need to do to know when to emit a retraction. For
>>> example,
>>>>>> when record 1 is deleted, the re-key table would just have (A: [3]).
>>>>> Some
>>>>>> kind of tombstone is needed so that the join result for 1 can also be
>>>>>> retracted.
>>>>>>
>>>>>> That's all!
>>>>>>
>>>>>> Thanks so much to both Adam and Jan for the thoughtful KIP. Sorry the
>>>>>> discussion has been slow.
>>>>>> -John
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak <
>>> Jan.Filipiak@trivago.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Id say you can just call the vote.
>>>>>>>
>>>>>>> that happens all the time, and if something comes up, it just goes
>>> back
>>>>>>> to discuss.
>>>>>>>
>>>>>>> would not expect to much attention with another another email in
>>> this
>>>>>>> thread.
>>>>>>>
>>>>>>> best Jan
>>>>>>>
>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote:
>>>>>>>> Hello Contributors
>>>>>>>>
>>>>>>>> I know that 2.1 is about to be released, but I do need to bump
>>> this to
>>>>>>> keep
>>>>>>>> visibility up. I am still intending to push this through once
>>>>> contributor
>>>>>>>> feedback is given.
>>>>>>>>
>>>>>>>> Main points that need addressing:
>>>>>>>> 1) Any way (or benefit) in structuring the current singular graph
>>> node
>>>>>>> into
>>>>>>>> multiple nodes? It has a whopping 25 parameters right now. I am a
>>> bit
>>>>>>> fuzzy
>>>>>>>> on how the optimizations are supposed to work, so I would
>>> appreciate
>>>>> any
>>>>>>>> help on this aspect.
>>>>>>>>
>>>>>>>> 2) Overall strategy for joining + resolving. This thread has much
>>>>>>> discourse
>>>>>>>> between Jan and I between the current highwater mark proposal and a
>>>>>>> groupBy
>>>>>>>> + reduce proposal. I am of the opinion that we need to strictly
>>> handle
>>>>>>> any
>>>>>>>> chance of out-of-order data and leave none of it up to the
>>> consumer.
>>>>> Any
>>>>>>>> comments or suggestions here would also help.
>>>>>>>>
>>>>>>>> 3) Anything else that you see that would prevent this from moving
>>> to a
>>>>>>> vote?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> Adam
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare <
>>>>>>> adam.bellemare@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Jan
>>>>>>>>>
>>>>>>>>> With the Stores.windowStoreBuilder and
>>> Stores.persistentWindowStore,
>>>>> you
>>>>>>>>> actually only need to specify the amount of segments you want and
>>> how
>>>>>>> large
>>>>>>>>> they are. To the best of my understanding, what happens is that
>>> the
>>>>>>>>> segments are automatically rolled over as new data with new
>>>>> timestamps
>>>>>>> are
>>>>>>>>> created. We use this exact functionality in some of the work done
>>>>>>>>> internally at my company. For reference, this is the hopping
>>> windowed
>>>>>>> store.
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21
>>>>>>>>>
>>>>>>>>> In the code that I have provided, there are going to be two 24h
>>>>>>> segments.
>>>>>>>>> When a record is put into the windowStore, it will be inserted at
>>>>> time
>>>>>>> T in
>>>>>>>>> both segments. The two segments will always overlap by 12h. As
>>> time
>>>>>>> goes on
>>>>>>>>> and new records are added (say at time T+12h+), the oldest segment
>>>>> will
>>>>>>> be
>>>>>>>>> automatically deleted and a new segment created. The records are
>>> by
>>>>>>> default
>>>>>>>>> inserted with the context.timestamp(), such that it is the record
>>>>> time,
>>>>>>> not
>>>>>>>>> the clock time, which is used.
>>>>>>>>>
>>>>>>>>> To the best of my understanding, the timestamps are retained when
>>>>>>>>> restoring from the changelog.
>>>>>>>>>
>>>>>>>>> Basically, this is heavy-handed way to deal with TTL at a
>>>>> segment-level,
>>>>>>>>> instead of at an individual record level.
>>>>>>>>>
>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak <
>>>>> Jan.Filipiak@trivago.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Will that work? I expected it to blow up with ClassCastException
>>> or
>>>>>>>>>> similar.
>>>>>>>>>>
>>>>>>>>>> You either would have to specify the window you fetch/put or
>>> iterate
>>>>>>>>>> across all windows the key was found in right?
>>>>>>>>>>
>>>>>>>>>> I just hope the window-store doesn't check stream-time under the
>>>>> hoods
>>>>>>>>>> that would be a questionable interface.
>>>>>>>>>>
>>>>>>>>>> If it does: did you see my comment on checking all the windows
>>>>> earlier?
>>>>>>>>>> that would be needed to actually give reasonable time gurantees.
>>>>>>>>>>
>>>>>>>>>> Best
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote:
>>>>>>>>>>> Hi Jan
>>>>>>>>>>>
>>>>>>>>>>> Check for  " highwaterMat " in the PR. I only changed the state
>>>>> store,
>>>>>>>>>> not
>>>>>>>>>>> the ProcessorSupplier.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Adam
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <
>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> @Guozhang
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the information. This is indeed something that
>>> will be
>>>>>>>>>>>>> extremely
>>>>>>>>>>>>> useful for this KIP.
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Jan
>>>>>>>>>>>>> Thanks for your explanations. That being said, I will not be
>>>>> moving
>>>>>>>>>> ahead
>>>>>>>>>>>>> with an implementation using reshuffle/groupBy solution as you
>>>>>>>>>> propose.
>>>>>>>>>>>>> That being said, if you wish to implement it yourself off of
>>> my
>>>>>>>>>> current PR
>>>>>>>>>>>>> and submit it as a competitive alternative, I would be more
>>> than
>>>>>>>>>> happy to
>>>>>>>>>>>>> help vet that as an alternate solution. As it stands right
>>> now,
>>>>> I do
>>>>>>>>>> not
>>>>>>>>>>>>> really have more time to invest into alternatives without
>>> there
>>>>>>> being
>>>>>>>>>> a
>>>>>>>>>>>>> strong indication from the binding voters which they would
>>>>> prefer.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>> Hey, total no worries. I think I personally gave up on the
>>> streams
>>>>>>> DSL
>>>>>>>>>> for
>>>>>>>>>>>> some time already, otherwise I would have pulled this KIP
>>> through
>>>>>>>>>> already.
>>>>>>>>>>>> I am currently reimplementing my own DSL based on PAPI.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> I will look at finishing up my PR with the windowed state
>>> store
>>>>> in
>>>>>>> the
>>>>>>>>>>>>> next
>>>>>>>>>>>>> week or so, exercising it via tests, and then I will come back
>>>>> for
>>>>>>>>>> final
>>>>>>>>>>>>> discussions. In the meantime, I hope that any of the binding
>>>>> voters
>>>>>>>>>> could
>>>>>>>>>>>>> take a look at the KIP in the wiki. I have updated it
>>> according
>>>>> to
>>>>>>> the
>>>>>>>>>>>>> latest plan:
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
>>>>>>>>>>>>> Support+non-key+joining+in+KTable
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have also updated the KIP PR to use a windowed store. This
>>>>> could
>>>>>>> be
>>>>>>>>>>>>> replaced by the results of KIP-258 whenever they are
>>> completed.
>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier already updated
>>> in
>>>>> the
>>>>>>>>>> PR?
>>>>>>>>>>>> expected it to change to Windowed<K>,Long Missing something?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang <
>>>>> wangguoz@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533 is the wrong link,
>>>>> as it
>>>>>>>>>> is
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>> corresponding changelog mechanisms. But as part of KIP-258
>>> we do
>>>>>>>>>> want to
>>>>>>>>>>>>>> have "handling out-of-order data for source KTable" such that
>>>>>>>>>> instead of
>>>>>>>>>>>>>> blindly apply the updates to the materialized store, i.e.
>>>>> following
>>>>>>>>>>>>>> offset
>>>>>>>>>>>>>> ordering, we will reject updates that are older than the
>>> current
>>>>>>>>>> key's
>>>>>>>>>>>>>> timestamps, i.e. following timestamp ordering.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang <
>>>>>>> wangguoz@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello Adam,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the final step (i.e.
>>> the
>>>>>>> high
>>>>>>>>>>>>>>> watermark store, now altered to be replaced with a window
>>>>> store),
>>>>>>> I
>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>> another current on-going KIP may actually help:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is for adding the timestamp into a key-value store
>>> (i.e.
>>>>> only
>>>>>>>>>> for
>>>>>>>>>>>>>>> non-windowed KTable), and then one of its usage, as
>>> described
>>>>> in
>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5533, is that
>>> we
>>>>> can
>>>>>>>>>> then
>>>>>>>>>>>>>>> "reject" updates from the source topics if its timestamp is
>>>>>>> smaller
>>>>>>>>>> than
>>>>>>>>>>>>>>> the current key's latest update timestamp. I think it is
>>> very
>>>>>>>>>> similar to
>>>>>>>>>>>>>>> what you have in mind for high watermark based filtering,
>>> while
>>>>>>> you
>>>>>>>>>> only
>>>>>>>>>>>>>>> need to make sure that the timestamps of the joining records
>>>>> are
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> correctly
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> inherited though the whole topology to the final stage.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Note that this KIP is for key-value store and hence
>>>>> non-windowed
>>>>>>>>>> KTables
>>>>>>>>>>>>>>> only, but for windowed KTables we do not really have a good
>>>>>>> support
>>>>>>>>>> for
>>>>>>>>>>>>>>> their joins anyways (
>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-7107)
>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>> think we can just consider non-windowed KTable-KTable
>>> non-key
>>>>>>> joins
>>>>>>>>>> for
>>>>>>>>>>>>>>> now. In which case, KIP-258 should help.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak <
>>>>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Current highwater mark implementation would grow endlessly
>>>>> based
>>>>>>>>>> on
>>>>>>>>>>>>>>>>> primary key of original event. It is a pair of (<this
>>> table
>>>>>>>>>> primary
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> key>,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> <highest offset seen for that key>). This is used to
>>>>> differentiate
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> late arrivals and new updates. My newest proposal would be
>>> to
>>>>>>>>>> replace
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> with a Windowed state store of Duration N. This would allow
>>> the
>>>>>>> same
>>>>>>>>>>>>>>>>> behaviour, but cap the size based on time. This should
>>> allow
>>>>> for
>>>>>>>>>> all
>>>>>>>>>>>>>>>>> late-arriving events to be processed, and should be
>>>>> customizable
>>>>>>>>>> by
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie: perhaps just 10
>>>>> minutes
>>>>>>> of
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> window,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> or perhaps 7 days...).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can do the trick here.
>>>>> Even
>>>>>>>>>> if I
>>>>>>>>>>>>>>>> would still like to see the automatic repartitioning
>>> optional
>>>>>>>>>> since I
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> just reshuffle again. With windowed store I am a little bit
>>>>>>>>>> sceptical
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> how to determine the window. So esentially one could run
>>> into
>>>>>>>>>> problems
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> the rapid change happens near a window border. I will check
>>> you
>>>>>>>>>>>>>>>> implementation in detail, if its problematic, we could
>>> still
>>>>>>> check
>>>>>>>>>>>>>>>> _all_
>>>>>>>>>>>>>>>> windows on read with not to bad performance impact I guess.
>>>>> Will
>>>>>>>>>> let
>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>> know if the implementation would be correct as is. I
>>> wouldn't
>>>>> not
>>>>>>>>>> like
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) => timestamp(A)  <
>>>>>>> timestamp(B).
>>>>>>>>>> I
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> we can't expect that.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> @Jan
>>>>>>>>>>>>>>>>> I believe I understand what you mean now - thanks for the
>>>>>>>>>> diagram, it
>>>>>>>>>>>>>>>>> did really help. You are correct that I do not have the
>>>>> original
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> primary
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> key available, and I can see that if it was available then
>>> you
>>>>>>>>>> would be
>>>>>>>>>>>>>>>>> able to add and remove events from the Map. That being
>>> said,
>>>>> I
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> encourage
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> you to finish your diagrams / charts just for clarity for
>>>>> everyone
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> else.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just really hard work. But
>>> I
>>>>>>>>>> understand
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the original primary
>>>>> key,
>>>>>>> We
>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>> join and Group by implemented our own in PAPI and basically
>>>>> not
>>>>>>>>>> using
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely missed that in
>>> original
>>>>> DSL
>>>>>>>>>> its
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> there and just assumed it. total brain mess up on my end.
>>> Will
>>>>>>>>>> finish
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> chart as soon as i get a quite evening this week.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> My follow up question for you is, won't the Map stay inside
>>>>> the
>>>>>>>>>> State
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Store indefinitely after all of the changes have
>>> propagated?
>>>>>>> Isn't
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> effectively the same as a highwater mark state store?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thing is that if the map is empty, substractor is gonna
>>>>> return
>>>>>>>>>> `null`
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> the key is removed from the keyspace. But there is going to
>>> be
>>>>> a
>>>>>>>>>> store
>>>>>>>>>>>>>>>> 100%, the good thing is that I can use this store directly
>>> for
>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues() is a regular
>>> store,
>>>>>>>>>> satisfying
>>>>>>>>>>>>>>>> all gurantees needed for further groupby / join. The
>>> Windowed
>>>>>>>>>> store is
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> keeping the values, so for the next statefull operation we
>>>>> would
>>>>>>>>>>>>>>>> need to instantiate an extra store. or we have the window
>>>>> store
>>>>>>>>>> also
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> the values then.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Long story short. if we can flip in a custom group by
>>> before
>>>>>>>>>>>>>>>> repartitioning to the original primary key i think it would
>>>>> help
>>>>>>>>>> the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> big time in building efficient apps. Given the original
>>> primary
>>>>>>> key
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> issue I
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> understand that we do not have a solid foundation to build
>>> on.
>>>>>>>>>>>>>>>> Leaving primary key carry along to the user. very
>>>>> unfortunate. I
>>>>>>>>>> could
>>>>>>>>>>>>>>>> understand the decision goes like that. I do not think its
>>> a
>>>>> good
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> decision.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta Dumbre <
>>>>>>>>>>>>>>>>> dumbreprajakta311@gmail.com <mailto:
>>>>> dumbreprajakta311@gmail.com
>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>          please remove me from this group
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>          On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak
>>>>>>>>>>>>>>>>>          <Jan.Filipiak@trivago.com <mailto:
>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>          wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>          > Hi Adam,
>>>>>>>>>>>>>>>>>          >
>>>>>>>>>>>>>>>>>          > give me some time, will make such a chart. last
>>>>> time i
>>>>>>>>>> didn't
>>>>>>>>>>>>>>>>>          get along
>>>>>>>>>>>>>>>>>          > well with giphy and ruined all your charts.
>>>>>>>>>>>>>>>>>          > Hopefully i can get it done today
>>>>>>>>>>>>>>>>>          >
>>>>>>>>>>>>>>>>>          > On 08.09.2018 16:00, Adam Bellemare wrote:
>>>>>>>>>>>>>>>>>          > > Hi Jan
>>>>>>>>>>>>>>>>>          > >
>>>>>>>>>>>>>>>>>          > > I have included a diagram of what I attempted
>>> on
>>>>> the
>>>>>>>>>> KIP.
>>>>>>>>>>>>>>>>>          > >
>>>>>>>>>>>>>>>>>          >
>>>>>>>>>>>>>>>>>
>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
>>>>>>>>>>>>>>>>>
>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate
>>>>>>>>>>>>>>>>>          <
>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
>>>>>>>>>>>>>>>>>
>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate>
>>>>>>>>>>>>>>>>>          > >
>>>>>>>>>>>>>>>>>          > > I attempted this back at the start of my own
>>>>>>>>>> implementation
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>          this
>>>>>>>>>>>>>>>>>          > > solution, and since I could not get it to
>>> work I
>>>>> have
>>>>>>>>>> since
>>>>>>>>>>>>>>>>>          discarded the
>>>>>>>>>>>>>>>>>          > > code. At this point in time, if you wish to
>>>>> continue
>>>>>>>>>> pursuing
>>>>>>>>>>>>>>>>>          for your
>>>>>>>>>>>>>>>>>          > > groupBy solution, I ask that you please
>>> create a
>>>>>>>>>> diagram on
>>>>>>>>>>>>>>>>>          the KIP
>>>>>>>>>>>>>>>>>          > > carefully explaining your solution. Please
>>> feel
>>>>> free
>>>>>>> to
>>>>>>>>>> use
>>>>>>>>>>>>>>>>>          the image I
>>>>>>>>>>>>>>>>>          > > just posted as a starting point. I am having
>>>>> trouble
>>>>>>>>>>>>>>>>>          understanding your
>>>>>>>>>>>>>>>>>          > > explanations but I think that a carefully
>>>>> constructed
>>>>>>>>>> diagram
>>>>>>>>>>>>>>>>>          will clear
>>>>>>>>>>>>>>>>>          > up
>>>>>>>>>>>>>>>>>          > > any misunderstandings. Alternately, please
>>> post a
>>>>>>>>>>>>>>>>>          comprehensive PR with
>>>>>>>>>>>>>>>>>          > > your solution. I can only guess at what you
>>>>> mean, and
>>>>>>>>>> since I
>>>>>>>>>>>>>>>>>          value my
>>>>>>>>>>>>>>>>>          > own
>>>>>>>>>>>>>>>>>          > > time as much as you value yours, I believe it
>>> is
>>>>> your
>>>>>>>>>>>>>>>>>          responsibility to
>>>>>>>>>>>>>>>>>          > > provide an implementation instead of me
>>> trying to
>>>>>>> guess.
>>>>>>>>>>>>>>>>>          > >
>>>>>>>>>>>>>>>>>          > > Adam
>>>>>>>>>>>>>>>>>          > >
>>>>>>>>>>>>>>>>>          > >
>>>>>>>>>>>>>>>>>          > >
>>>>>>>>>>>>>>>>>          > >
>>>>>>>>>>>>>>>>>          > >
>>>>>>>>>>>>>>>>>          > >
>>>>>>>>>>>>>>>>>          > >
>>>>>>>>>>>>>>>>>          > >
>>>>>>>>>>>>>>>>>          > >
>>>>>>>>>>>>>>>>>          > > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak
>>>>>>>>>>>>>>>>>          <Jan.Filipiak@trivago.com <mailto:
>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>          > > wrote:
>>>>>>>>>>>>>>>>>          > >
>>>>>>>>>>>>>>>>>          > >> Hi James,
>>>>>>>>>>>>>>>>>          > >>
>>>>>>>>>>>>>>>>>          > >> nice to see you beeing interested. kafka
>>>>> streams at
>>>>>>>>>> this
>>>>>>>>>>>>>>>>>          point supports
>>>>>>>>>>>>>>>>>          > >> all sorts of joins as long as both streams
>>> have
>>>>> the
>>>>>>>>>> same
>>>>>>>>>>>>>>>>> key.
>>>>>>>>>>>>>>>>>          > >> Adam is currently implementing a join where a
>>>>> KTable
>>>>>>>>>> and a
>>>>>>>>>>>>>>>>>          KTable can
>>>>>>>>>>>>>>>>>          > have
>>>>>>>>>>>>>>>>>          > >> a one to many relation ship (1:n). We exploit
>>>>> that
>>>>>>>>>> rocksdb
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>          > >> datastore that keeps data sorted (At least
>>>>> exposes an
>>>>>>>>>> API to
>>>>>>>>>>>>>>>>>          access the
>>>>>>>>>>>>>>>>>          > >> stored data in a sorted fashion).
>>>>>>>>>>>>>>>>>          > >>
>>>>>>>>>>>>>>>>>          > >> I think the technical caveats are well
>>>>> understood
>>>>>>> now
>>>>>>>>>> and we
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>          > basically
>>>>>>>>>>>>>>>>>          > >> down to philosophy and API Design ( when Adam
>>>>> sees
>>>>>>> my
>>>>>>>>>> newest
>>>>>>>>>>>>>>>>>          message).
>>>>>>>>>>>>>>>>>          > >> I have a lengthy track record of loosing
>>> those
>>>>> kinda
>>>>>>>>>>>>>>>>>          arguments within
>>>>>>>>>>>>>>>>>          > the
>>>>>>>>>>>>>>>>>          > >> streams community and I have no clue why. So
>>> I
>>>>>>>>>> literally
>>>>>>>>>>>>>>>>>          can't wait for
>>>>>>>>>>>>>>>>>          > you
>>>>>>>>>>>>>>>>>          > >> to churn through this thread and give you
>>>>> opinion on
>>>>>>>>>> how we
>>>>>>>>>>>>>>>>>          should
>>>>>>>>>>>>>>>>>          > design
>>>>>>>>>>>>>>>>>          > >> the return type of the oneToManyJoin and how
>>>>> many
>>>>>>>>>> power we
>>>>>>>>>>>>>>>>>          want to give
>>>>>>>>>>>>>>>>>          > to
>>>>>>>>>>>>>>>>>          > >> the user vs "simplicity" (where simplicity
>>> isn't
>>>>>>>>>> really that
>>>>>>>>>>>>>>>>>          as users
>>>>>>>>>>>>>>>>>          > still
>>>>>>>>>>>>>>>>>          > >> need to understand it I argue)
>>>>>>>>>>>>>>>>>          > >>
>>>>>>>>>>>>>>>>>          > >> waiting for you to join in on the discussion
>>>>>>>>>>>>>>>>>          > >>
>>>>>>>>>>>>>>>>>          > >> Best Jan
>>>>>>>>>>>>>>>>>          > >>
>>>>>>>>>>>>>>>>>          > >>
>>>>>>>>>>>>>>>>>          > >>
>>>>>>>>>>>>>>>>>          > >> On 07.09.2018 15:49, James Kwan wrote:
>>>>>>>>>>>>>>>>>          > >>
>>>>>>>>>>>>>>>>>          > >>> I am new to this group and I found this
>>> subject
>>>>>>>>>>>>>>>>>          interesting.  Sounds
>>>>>>>>>>>>>>>>>          > like
>>>>>>>>>>>>>>>>>          > >>> you guys want to implement a join table of
>>> two
>>>>>>>>>> streams? Is
>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>          > somewhere
>>>>>>>>>>>>>>>>>          > >>> I can see the original requirement or
>>> proposal?
>>>>>>>>>>>>>>>>>          > >>>
>>>>>>>>>>>>>>>>>          > >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak
>>>>>>>>>>>>>>>>>          <Jan.Filipiak@trivago.com <mailto:
>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>> wrote:
>>>>>>>>>>>>>>>>>          > >>>>
>>>>>>>>>>>>>>>>>          > >>>>
>>>>>>>>>>>>>>>>>          > >>>> On 05.09.2018 22:17, Adam Bellemare wrote:
>>>>>>>>>>>>>>>>>          > >>>>
>>>>>>>>>>>>>>>>>          > >>>>> I'm currently testing using a Windowed
>>> Store
>>>>> to
>>>>>>>>>> store the
>>>>>>>>>>>>>>>>>          highwater
>>>>>>>>>>>>>>>>>          > >>>>> mark.
>>>>>>>>>>>>>>>>>          > >>>>> By all indications this should work fine,
>>>>> with
>>>>>>> the
>>>>>>>>>> caveat
>>>>>>>>>>>>>>>>>          being that
>>>>>>>>>>>>>>>>>          > it
>>>>>>>>>>>>>>>>>          > >>>>> can
>>>>>>>>>>>>>>>>>          > >>>>> only resolve out-of-order arrival for up
>>> to
>>>>> the
>>>>>>>>>> size of
>>>>>>>>>>>>>>>>>          the window
>>>>>>>>>>>>>>>>>          > (ie:
>>>>>>>>>>>>>>>>>          > >>>>> 24h, 72h, etc). This would remove the
>>>>> possibility
>>>>>>>>>> of it
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> being
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>          > unbounded
>>>>>>>>>>>>>>>>>          > >>>>> in
>>>>>>>>>>>>>>>>>          > >>>>> size.
>>>>>>>>>>>>>>>>>          > >>>>>
>>>>>>>>>>>>>>>>>          > >>>>> With regards to Jan's suggestion, I
>>> believe
>>>>> this
>>>>>>> is
>>>>>>>>>> where
>>>>>>>>>>>>>>>>>          we will
>>>>>>>>>>>>>>>>>          > have
>>>>>>>>>>>>>>>>>          > >>>>> to
>>>>>>>>>>>>>>>>>          > >>>>> remain in disagreement. While I do not
>>>>> disagree
>>>>>>>>>> with your
>>>>>>>>>>>>>>>>>          statement
>>>>>>>>>>>>>>>>>          > >>>>> about
>>>>>>>>>>>>>>>>>          > >>>>> there likely to be additional joins done
>>> in a
>>>>>>>>>> real-world
>>>>>>>>>>>>>>>>>          workflow, I
>>>>>>>>>>>>>>>>>          > do
>>>>>>>>>>>>>>>>>          > >>>>> not
>>>>>>>>>>>>>>>>>          > >>>>> see how you can conclusively deal with
>>>>>>> out-of-order
>>>>>>>>>>>>>>>>> arrival
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>          > >>>>> foreign-key
>>>>>>>>>>>>>>>>>          > >>>>> changes and subsequent joins. I have
>>>>> attempted
>>>>>>> what
>>>>>>>>>> I
>>>>>>>>>>>>>>>>>          think you have
>>>>>>>>>>>>>>>>>          > >>>>> proposed (without a high-water, using
>>>>> groupBy and
>>>>>>>>>> reduce)
>>>>>>>>>>>>>>>>>          and found
>>>>>>>>>>>>>>>>>          > >>>>> that if
>>>>>>>>>>>>>>>>>          > >>>>> the foreign key changes too quickly, or
>>> the
>>>>> load
>>>>>>> on
>>>>>>>>>> a
>>>>>>>>>>>>>>>>>          stream thread
>>>>>>>>>>>>>>>>>          > is
>>>>>>>>>>>>>>>>>          > >>>>> too
>>>>>>>>>>>>>>>>>          > >>>>> high, the joined messages will arrive
>>>>>>> out-of-order
>>>>>>>>>> and be
>>>>>>>>>>>>>>>>>          incorrectly
>>>>>>>>>>>>>>>>>          > >>>>> propagated, such that an intermediate
>>> event
>>>>> is
>>>>>>>>>>>>>>>>> represented
>>>>>>>>>>>>>>>>>          as the
>>>>>>>>>>>>>>>>>          > final
>>>>>>>>>>>>>>>>>          > >>>>> event.
>>>>>>>>>>>>>>>>>          > >>>>>
>>>>>>>>>>>>>>>>>          > >>>> Can you shed some light on your groupBy
>>>>>>>>>> implementation.
>>>>>>>>>>>>>>>>>          There must be
>>>>>>>>>>>>>>>>>          > >>>> some sort of flaw in it.
>>>>>>>>>>>>>>>>>          > >>>> I have a suspicion where it is, I would
>>> just
>>>>> like
>>>>>>> to
>>>>>>>>>>>>>>>>>          confirm. The idea
>>>>>>>>>>>>>>>>>          > >>>> is bullet proof and it must be
>>>>>>>>>>>>>>>>>          > >>>> an implementation mess up. I would like to
>>>>> clarify
>>>>>>>>>> before
>>>>>>>>>>>>>>>>>          we draw a
>>>>>>>>>>>>>>>>>          > >>>> conclusion.
>>>>>>>>>>>>>>>>>          > >>>>
>>>>>>>>>>>>>>>>>          > >>>>    Repartitioning the scattered events
>>> back to
>>>>>>> their
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> original
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>          > >>>>> partitions is the only way I know how to
>>>>>>> conclusively
>>>>>>>>>> deal
>>>>>>>>>>>>>>>>>          with
>>>>>>>>>>>>>>>>>          > >>>>> out-of-order events in a given time frame,
>>>>> and to
>>>>>>>>>> ensure
>>>>>>>>>>>>>>>>>          that the
>>>>>>>>>>>>>>>>>          > data
>>>>>>>>>>>>>>>>>          > >>>>> is
>>>>>>>>>>>>>>>>>          > >>>>> eventually consistent with the input
>>> events.
>>>>>>>>>>>>>>>>>          > >>>>>
>>>>>>>>>>>>>>>>>          > >>>>> If you have some code to share that
>>>>> illustrates
>>>>>>> your
>>>>>>>>>>>>>>>>>          approach, I
>>>>>>>>>>>>>>>>>          > would
>>>>>>>>>>>>>>>>>          > >>>>> be
>>>>>>>>>>>>>>>>>          > >>>>> very grateful as it would remove any
>>>>>>>>>> misunderstandings
>>>>>>>>>>>>>>>>>          that I may
>>>>>>>>>>>>>>>>>          > have.
>>>>>>>>>>>>>>>>>          > >>>>>
>>>>>>>>>>>>>>>>>          > >>>> ah okay you were looking for my code. I
>>> don't
>>>>> have
>>>>>>>>>>>>>>>>>          something easily
>>>>>>>>>>>>>>>>>          > >>>> readable here as its bloated with
>>> OO-patterns.
>>>>>>>>>>>>>>>>>          > >>>>
>>>>>>>>>>>>>>>>>          > >>>> its anyhow trivial:
>>>>>>>>>>>>>>>>>          > >>>>
>>>>>>>>>>>>>>>>>          > >>>> @Override
>>>>>>>>>>>>>>>>>          > >>>>      public T apply(K aggKey, V value, T
>>>>>>> aggregate)
>>>>>>>>>>>>>>>>>          > >>>>      {
>>>>>>>>>>>>>>>>>          > >>>>          Map<U, V> currentStateAsMap =
>>>>>>>>>> asMap(aggregate);
>>>>>>>>>>>>>>>>> <<
>>>>>>>>>>>>>>>>>          imaginary
>>>>>>>>>>>>>>>>>          > >>>>          U toModifyKey =
>>> mapper.apply(value);
>>>>>>>>>>>>>>>>>          > >>>>              << this is the place where
>>> people
>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>          gonna have
>>>>>>>>>>>>>>>>>          > issues
>>>>>>>>>>>>>>>>>          > >>>> and why you probably couldn't do it. we
>>> would
>>>>> need
>>>>>>>>>> to find
>>>>>>>>>>>>>>>>>          a solution
>>>>>>>>>>>>>>>>>          > here.
>>>>>>>>>>>>>>>>>          > >>>> I didn't realize that yet.
>>>>>>>>>>>>>>>>>          > >>>>              << we propagate the field in
>>> the
>>>>>>>>>> joiner, so
>>>>>>>>>>>>>>>>>          that we can
>>>>>>>>>>>>>>>>>          > pick
>>>>>>>>>>>>>>>>>          > >>>> it up in an aggregate. Probably you have
>>> not
>>>>>>> thought
>>>>>>>>>> of
>>>>>>>>>>>>>>>>>          this in your
>>>>>>>>>>>>>>>>>          > >>>> approach right?
>>>>>>>>>>>>>>>>>          > >>>>              << I am very open to find a
>>>>> generic
>>>>>>>>>> solution
>>>>>>>>>>>>>>>>>          here. In my
>>>>>>>>>>>>>>>>>          > >>>> honest opinion this is broken in
>>>>>>> KTableImpl.GroupBy
>>>>>>>>>> that
>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>          looses
>>>>>>>>>>>>>>>>>          > the keys
>>>>>>>>>>>>>>>>>          > >>>> and only maintains the aggregate key.
>>>>>>>>>>>>>>>>>          > >>>>              << I abstracted it away back
>>>>> then way
>>>>>>>>>> before
>>>>>>>>>>>>>>>>> i
>>>>>>>>>>>>>>>>> was
>>>>>>>>>>>>>>>>>          > thinking
>>>>>>>>>>>>>>>>>          > >>>> of oneToMany join. That is why I didn't
>>>>> realize
>>>>>>> its
>>>>>>>>>>>>>>>>>          significance here.
>>>>>>>>>>>>>>>>>          > >>>>              << Opinions?
>>>>>>>>>>>>>>>>>          > >>>>
>>>>>>>>>>>>>>>>>          > >>>>          for (V m : current)
>>>>>>>>>>>>>>>>>          > >>>>          {
>>>>>>>>>>>>>>>>>          > >>>> currentStateAsMap.put(mapper.apply(m), m);
>>>>>>>>>>>>>>>>>          > >>>>          }
>>>>>>>>>>>>>>>>>          > >>>>          if (isAdder)
>>>>>>>>>>>>>>>>>          > >>>>          {
>>>>>>>>>>>>>>>>>          > >>>> currentStateAsMap.put(toModifyKey, value);
>>>>>>>>>>>>>>>>>          > >>>>          }
>>>>>>>>>>>>>>>>>          > >>>>          else
>>>>>>>>>>>>>>>>>          > >>>>          {
>>>>>>>>>>>>>>>>>          > >>>> currentStateAsMap.remove(toModifyKey);
>>>>>>>>>>>>>>>>>          > >>>> if(currentStateAsMap.isEmpty()){
>>>>>>>>>>>>>>>>>          > >>>>                  return null;
>>>>>>>>>>>>>>>>>          > >>>>              }
>>>>>>>>>>>>>>>>>          > >>>>          }
>>>>>>>>>>>>>>>>>          > >>>>          retrun
>>>>> asAggregateType(currentStateAsMap)
>>>>>>>>>>>>>>>>>          > >>>>      }
>>>>>>>>>>>>>>>>>          > >>>>
>>>>>>>>>>>>>>>>>          > >>>>
>>>>>>>>>>>>>>>>>          > >>>>
>>>>>>>>>>>>>>>>>          > >>>>
>>>>>>>>>>>>>>>>>          > >>>>
>>>>>>>>>>>>>>>>>          > >>>> Thanks,
>>>>>>>>>>>>>>>>>          > >>>>> Adam
>>>>>>>>>>>>>>>>>          > >>>>>
>>>>>>>>>>>>>>>>>          > >>>>>
>>>>>>>>>>>>>>>>>          > >>>>>
>>>>>>>>>>>>>>>>>          > >>>>>
>>>>>>>>>>>>>>>>>          > >>>>>
>>>>>>>>>>>>>>>>>          > >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan
>>> Filipiak
>>>>> <
>>>>>>>>>>>>>>>>>          > Jan.Filipiak@trivago.com <mailto:
>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>> wrote:
>>>>>>>>>>>>>>>>>          > >>>>>
>>>>>>>>>>>>>>>>>          > >>>>> Thanks Adam for bringing Matthias to
>>> speed!
>>>>>>>>>>>>>>>>>          > >>>>>> about the differences. I think re-keying
>>>>> back
>>>>>>>>>> should be
>>>>>>>>>>>>>>>>>          optional at
>>>>>>>>>>>>>>>>>          > >>>>>> best.
>>>>>>>>>>>>>>>>>          > >>>>>> I would say we return a KScatteredTable
>>> with
>>>>>>>>>> reshuffle()
>>>>>>>>>>>>>>>>>          returning
>>>>>>>>>>>>>>>>>          > >>>>>> KTable<originalKey,Joined> to make the
>>>>> backwards
>>>>>>>>>>>>>>>>>          repartitioning
>>>>>>>>>>>>>>>>>          > >>>>>> optional.
>>>>>>>>>>>>>>>>>          > >>>>>> I am also in a big favour of doing the
>>> out
>>>>> of
>>>>>>> order
>>>>>>>>>>>>>>>>>          processing using
>>>>>>>>>>>>>>>>>          > >>>>>> group
>>>>>>>>>>>>>>>>>          > >>>>>> by instead high water mark tracking.
>>>>>>>>>>>>>>>>>          > >>>>>> Just because unbounded growth is just
>>> scary
>>>>> + It
>>>>>>>>>> saves
>>>>>>>>>>>>>>>>> us
>>>>>>>>>>>>>>>>>          the header
>>>>>>>>>>>>>>>>>          > >>>>>> stuff.
>>>>>>>>>>>>>>>>>          > >>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>> I think the abstraction of always
>>>>> repartitioning
>>>>>>>>>> back is
>>>>>>>>>>>>>>>>>          just not so
>>>>>>>>>>>>>>>>>          > >>>>>> strong. Like the work has been done
>>> before
>>>>> we
>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>          back and
>>>>>>>>>>>>>>>>>          > >>>>>> grouping
>>>>>>>>>>>>>>>>>          > >>>>>> by something else afterwards is really
>>>>> common.
>>>>>>>>>>>>>>>>>          > >>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>> On 05.09.2018 13:49, Adam Bellemare
>>> wrote:
>>>>>>>>>>>>>>>>>          > >>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>> Hi Matthias
>>>>>>>>>>>>>>>>>          > >>>>>>> Thank you for your feedback, I do
>>>>> appreciate
>>>>>>> it!
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> While name spacing would be possible, it
>>>>> would
>>>>>>>>>> require
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>          > deserialize
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> user headers what implies a runtime
>>>>> overhead.
>>>>>>> I
>>>>>>>>>> would
>>>>>>>>>>>>>>>>>          suggest to
>>>>>>>>>>>>>>>>>          > no
>>>>>>>>>>>>>>>>>          > >>>>>>>> namespace for now to avoid the
>>> overhead.
>>>>> If
>>>>>>> this
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> becomes a
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>          > problem in
>>>>>>>>>>>>>>>>>          > >>>>>>>> the future, we can still add name
>>> spacing
>>>>>>> later
>>>>>>>>>> on.
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> Agreed. I will go with using a reserved
>>>>> string
>>>>>>>>>> and
>>>>>>>>>>>>>>>>>          document it.
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> My main concern about the design it the
>>>>> type of
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>          result KTable:
>>>>>>>>>>>>>>>>>          > If
>>>>>>>>>>>>>>>>>          > >>>>>>> I
>>>>>>>>>>>>>>>>>          > >>>>>>> understood the proposal correctly,
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> In your example, you have table1 and
>>> table2
>>>>>>>>>> swapped.
>>>>>>>>>>>>>>>>>          Here is how it
>>>>>>>>>>>>>>>>>          > >>>>>>> works
>>>>>>>>>>>>>>>>>          > >>>>>>> currently:
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> 1) table1 has the records that contain
>>> the
>>>>>>>>>> foreign key
>>>>>>>>>>>>>>>>>          within their
>>>>>>>>>>>>>>>>>          > >>>>>>> value.
>>>>>>>>>>>>>>>>>          > >>>>>>> table1 input stream: <a,(fk=A,bar=1)>,
>>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>,
>>>>>>>>>>>>>>>>>          > >>>>>>> <c,(fk=B,bar=3)>
>>>>>>>>>>>>>>>>>          > >>>>>>> table2 input stream: <A,X>, <B,Y>
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> 2) A Value mapper is required to extract
>>>>> the
>>>>>>>>>> foreign
>>>>>>>>>>>>>>>>> key.
>>>>>>>>>>>>>>>>>          > >>>>>>> table1 foreign key mapper: ( value =>
>>>>> value.fk
>>>>>>>>>>>>>>>>>          <http://value.fk> )
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> The mapper is applied to each element in
>>>>>>> table1,
>>>>>>>>>> and a
>>>>>>>>>>>>>>>>>          new combined
>>>>>>>>>>>>>>>>>          > >>>>>>> key is
>>>>>>>>>>>>>>>>>          > >>>>>>> made:
>>>>>>>>>>>>>>>>>          > >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>,
>>> <A-b,
>>>>>>>>>>>>>>>>> (fk=A,bar=2)>,
>>>>>>>>>>>>>>>>>          <B-c,
>>>>>>>>>>>>>>>>>          > >>>>>>> (fk=B,bar=3)>
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> 3) The rekeyed events are copartitioned
>>>>> with
>>>>>>>>>> table2:
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> a) Stream Thread with Partition 0:
>>>>>>>>>>>>>>>>>          > >>>>>>> RepartitionedTable1: <A-a,
>>> (fk=A,bar=1)>,
>>>>> <A-b,
>>>>>>>>>>>>>>>>>          (fk=A,bar=2)>
>>>>>>>>>>>>>>>>>          > >>>>>>> Table2: <A,X>
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> b) Stream Thread with Partition 1:
>>>>>>>>>>>>>>>>>          > >>>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)>
>>>>>>>>>>>>>>>>>          > >>>>>>> Table2: <B,Y>
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> 4) From here, they can be joined
>>> together
>>>>>>> locally
>>>>>>>>>> by
>>>>>>>>>>>>>>>>>          applying the
>>>>>>>>>>>>>>>>>          > >>>>>>> joiner
>>>>>>>>>>>>>>>>>          > >>>>>>> function.
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> At this point, Jan's design and my
>>> design
>>>>>>>>>> deviate. My
>>>>>>>>>>>>>>>>>          design goes
>>>>>>>>>>>>>>>>>          > on
>>>>>>>>>>>>>>>>>          > >>>>>>> to
>>>>>>>>>>>>>>>>>          > >>>>>>> repartition the data post-join and
>>> resolve
>>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>          arrival of
>>>>>>>>>>>>>>>>>          > >>>>>>> records,
>>>>>>>>>>>>>>>>>          > >>>>>>> finally returning the data keyed just
>>> the
>>>>>>>>>> original key.
>>>>>>>>>>>>>>>>>          I do not
>>>>>>>>>>>>>>>>>          > >>>>>>> expose
>>>>>>>>>>>>>>>>>          > >>>>>>> the
>>>>>>>>>>>>>>>>>          > >>>>>>> CombinedKey or any of the internals
>>>>> outside of
>>>>>>> the
>>>>>>>>>>>>>>>>>          joinOnForeignKey
>>>>>>>>>>>>>>>>>          > >>>>>>> function. This does make for larger
>>>>> footprint,
>>>>>>>>>> but it
>>>>>>>>>>>>>>>>>          removes all
>>>>>>>>>>>>>>>>>          > >>>>>>> agency
>>>>>>>>>>>>>>>>>          > >>>>>>> for resolving out-of-order arrivals and
>>>>>>> handling
>>>>>>>>>>>>>>>>>          CombinedKeys from
>>>>>>>>>>>>>>>>>          > the
>>>>>>>>>>>>>>>>>          > >>>>>>> user. I believe that this makes the
>>>>> function
>>>>>>> much
>>>>>>>>>>>>>>>>> easier
>>>>>>>>>>>>>>>>>          to use.
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> Let me know if this helps resolve your
>>>>>>> questions,
>>>>>>>>>> and
>>>>>>>>>>>>>>>>>          please feel
>>>>>>>>>>>>>>>>>          > >>>>>>> free to
>>>>>>>>>>>>>>>>>          > >>>>>>> add anything else on your mind.
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> Thanks again,
>>>>>>>>>>>>>>>>>          > >>>>>>> Adam
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM,
>>> Matthias J.
>>>>>>> Sax <
>>>>>>>>>>>>>>>>>          > >>>>>>> matthias@confluent.io <mailto:
>>>>>>>>>> matthias@confluent.io>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> wrote:
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>> Hi,
>>>>>>>>>>>>>>>>>          > >>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> I am just catching up on this thread. I
>>>>> did
>>>>>>> not
>>>>>>>>>> read
>>>>>>>>>>>>>>>>>          everything so
>>>>>>>>>>>>>>>>>          > >>>>>>>> far,
>>>>>>>>>>>>>>>>>          > >>>>>>>> but want to share couple of initial
>>>>> thoughts:
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> Headers: I think there is a fundamental
>>>>>>>>>> difference
>>>>>>>>>>>>>>>>>          between header
>>>>>>>>>>>>>>>>>          > >>>>>>>> usage
>>>>>>>>>>>>>>>>>          > >>>>>>>> in this KIP and KP-258. For 258, we add
>>>>>>> headers
>>>>>>>>>> to
>>>>>>>>>>>>>>>>>          changelog topic
>>>>>>>>>>>>>>>>>          > >>>>>>>> that
>>>>>>>>>>>>>>>>>          > >>>>>>>> are owned by Kafka Streams and nobody
>>>>> else is
>>>>>>>>>> supposed
>>>>>>>>>>>>>>>>>          to write
>>>>>>>>>>>>>>>>>          > into
>>>>>>>>>>>>>>>>>          > >>>>>>>> them. In fact, no user header are
>>> written
>>>>> into
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>          changelog topic
>>>>>>>>>>>>>>>>>          > >>>>>>>> and
>>>>>>>>>>>>>>>>>          > >>>>>>>> thus, there are not conflicts.
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> Nevertheless, I don't see a big issue
>>> with
>>>>>>> using
>>>>>>>>>>>>>>>>>          headers within
>>>>>>>>>>>>>>>>>          > >>>>>>>> Streams.
>>>>>>>>>>>>>>>>>          > >>>>>>>> As long as we document it, we can have
>>>>> some
>>>>>>>>>> "reserved"
>>>>>>>>>>>>>>>>>          header keys
>>>>>>>>>>>>>>>>>          > >>>>>>>> and
>>>>>>>>>>>>>>>>>          > >>>>>>>> users are not allowed to use when
>>>>> processing
>>>>>>>>>> data with
>>>>>>>>>>>>>>>>>          Kafka
>>>>>>>>>>>>>>>>>          > Streams.
>>>>>>>>>>>>>>>>>          > >>>>>>>> IMHO, this should be ok.
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> I think there is a safe way to avoid
>>>>>>> conflicts,
>>>>>>>>>> since
>>>>>>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>          > headers
>>>>>>>>>>>>>>>>>          > >>>>>>>> are
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>>> only needed in internal topics (I
>>> think):
>>>>>>>>>>>>>>>>>          > >>>>>>>>> For internal and changelog topics, we
>>> can
>>>>>>>>>> namespace
>>>>>>>>>>>>>>>>>          all headers:
>>>>>>>>>>>>>>>>>          > >>>>>>>>> * user-defined headers are namespaced
>>> as
>>>>>>>>>> "external."
>>>>>>>>>>>>>>>>> +
>>>>>>>>>>>>>>>>>          headerKey
>>>>>>>>>>>>>>>>>          > >>>>>>>>> * internal headers are namespaced as
>>>>>>>>>> "internal." +
>>>>>>>>>>>>>>>>>          headerKey
>>>>>>>>>>>>>>>>>          > >>>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>>> While name spacing would be possible,
>>> it
>>>>>>> would
>>>>>>>>>>>>>>>>> require
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>          > >>>>>>>> deserialize
>>>>>>>>>>>>>>>>>          > >>>>>>>> user headers what implies a runtime
>>>>> overhead.
>>>>>>> I
>>>>>>>>>> would
>>>>>>>>>>>>>>>>>          suggest to
>>>>>>>>>>>>>>>>>          > no
>>>>>>>>>>>>>>>>>          > >>>>>>>> namespace for now to avoid the
>>> overhead.
>>>>> If
>>>>>>> this
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> becomes a
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>          > problem in
>>>>>>>>>>>>>>>>>          > >>>>>>>> the future, we can still add name
>>> spacing
>>>>>>> later
>>>>>>>>>> on.
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> My main concern about the design it the
>>>>> type
>>>>>>> of
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>          result KTable:
>>>>>>>>>>>>>>>>>          > >>>>>>>> If I
>>>>>>>>>>>>>>>>>          > >>>>>>>> understood the proposal correctly,
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> KTable<K1,V1> table1 = ...
>>>>>>>>>>>>>>>>>          > >>>>>>>> KTable<K2,V2> table2 = ...
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> KTable<K1,V3> joinedTable =
>>>>>>>>>> table1.join(table2,...);
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> implies that the `joinedTable` has the
>>>>> same
>>>>>>> key
>>>>>>>>>> as the
>>>>>>>>>>>>>>>>>          left input
>>>>>>>>>>>>>>>>>          > >>>>>>>> table.
>>>>>>>>>>>>>>>>>          > >>>>>>>> IMHO, this does not work because if
>>> table2
>>>>>>>>>> contains
>>>>>>>>>>>>>>>>>          multiple rows
>>>>>>>>>>>>>>>>>          > >>>>>>>> that
>>>>>>>>>>>>>>>>>          > >>>>>>>> join with a record in table1 (what is
>>> the
>>>>> main
>>>>>>>>>> purpose
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>          > foreign
>>>>>>>>>>>>>>>>>          > >>>>>>>> key
>>>>>>>>>>>>>>>>>          > >>>>>>>> join), the result table would only
>>>>> contain a
>>>>>>>>>> single
>>>>>>>>>>>>>>>>>          join result,
>>>>>>>>>>>>>>>>>          > but
>>>>>>>>>>>>>>>>>          > >>>>>>>> not
>>>>>>>>>>>>>>>>>          > >>>>>>>> multiple.
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> Example:
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> table1 input stream: <A,X>
>>>>>>>>>>>>>>>>>          > >>>>>>>> table2 input stream: <a,(A,1)>,
>>> <b,(A,2)>
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> We use table2 value a foreign key to
>>>>> table1
>>>>>>> key
>>>>>>>>>> (ie,
>>>>>>>>>>>>>>>>>          "A" joins).
>>>>>>>>>>>>>>>>>          > If
>>>>>>>>>>>>>>>>>          > >>>>>>>> the
>>>>>>>>>>>>>>>>>          > >>>>>>>> result key is the same key as key of
>>>>> table1,
>>>>>>> this
>>>>>>>>>>>>>>>>>          implies that the
>>>>>>>>>>>>>>>>>          > >>>>>>>> result can either be <A, join(X,1)> or
>>> <A,
>>>>>>>>>> join(X,2)>
>>>>>>>>>>>>>>>>>          but not
>>>>>>>>>>>>>>>>>          > both.
>>>>>>>>>>>>>>>>>          > >>>>>>>> Because the share the same key,
>>> whatever
>>>>>>> result
>>>>>>>>>> record
>>>>>>>>>>>>>>>>>          we emit
>>>>>>>>>>>>>>>>>          > later,
>>>>>>>>>>>>>>>>>          > >>>>>>>> overwrite the previous result.
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> This is the reason why Jan originally
>>>>> proposed
>>>>>>>>>> to use
>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>          > combination
>>>>>>>>>>>>>>>>>          > >>>>>>>> of
>>>>>>>>>>>>>>>>>          > >>>>>>>> both primary keys of the input tables
>>> as
>>>>> key
>>>>>>> of
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>          output table.
>>>>>>>>>>>>>>>>>          > >>>>>>>> This
>>>>>>>>>>>>>>>>>          > >>>>>>>> makes the keys of the output table
>>> unique
>>>>> and
>>>>>>> we
>>>>>>>>>> can
>>>>>>>>>>>>>>>>>          store both in
>>>>>>>>>>>>>>>>>          > >>>>>>>> the
>>>>>>>>>>>>>>>>>          > >>>>>>>> output table:
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> Result would be <A-a, join(X,1)>, <A-b,
>>>>>>>>>> join(X,2)>
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> Thoughts?
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote:
>>>>>>>>>>>>>>>>>          > >>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>> Just on remark here.
>>>>>>>>>>>>>>>>>          > >>>>>>>>> The high-watermark could be
>>> disregarded.
>>>>> The
>>>>>>>>>> decision
>>>>>>>>>>>>>>>>>          about the
>>>>>>>>>>>>>>>>>          > >>>>>>>>> forward
>>>>>>>>>>>>>>>>>          > >>>>>>>>> depends on the size of the aggregated
>>>>> map.
>>>>>>>>>>>>>>>>>          > >>>>>>>>> Only 1 element long maps would be
>>>>> unpacked
>>>>>>> and
>>>>>>>>>>>>>>>>>          forwarded. 0
>>>>>>>>>>>>>>>>>          > element
>>>>>>>>>>>>>>>>>          > >>>>>>>>> maps
>>>>>>>>>>>>>>>>>          > >>>>>>>>> would be published as delete. Any
>>> other
>>>>> count
>>>>>>>>>>>>>>>>>          > >>>>>>>>> of map entries is in "waiting for
>>> correct
>>>>>>>>>> deletes to
>>>>>>>>>>>>>>>>>          > arrive"-state.
>>>>>>>>>>>>>>>>>          > >>>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare
>>>>> wrote:
>>>>>>>>>>>>>>>>>          > >>>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>>> It does look like I could replace the
>>>>> second
>>>>>>>>>>>>>>>>>          repartition store
>>>>>>>>>>>>>>>>>          > and
>>>>>>>>>>>>>>>>>          > >>>>>>>>>> highwater store with a groupBy and
>>>>> reduce.
>>>>>>>>>> However,
>>>>>>>>>>>>>>>>>          it looks
>>>>>>>>>>>>>>>>>          > like
>>>>>>>>>>>>>>>>>          > >>>>>>>>>> I
>>>>>>>>>>>>>>>>>          > >>>>>>>>>> would
>>>>>>>>>>>>>>>>>          > >>>>>>>>>> still need to store the highwater
>>> value
>>>>>>> within
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>          materialized
>>>>>>>>>>>>>>>>>          > >>>>>>>>>> store,
>>>>>>>>>>>>>>>>>          > >>>>>>>>>>
>>>>>>>>>>>>>>>>>          > >>>>>>>>>> to
>>>>>>>>>>>>>>>>>          > >>>>>>>>> compare the arrival of out-of-order
>>>>> records
>>>>>>>>>> (assuming
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>          > >>>>>>>>> understanding
>>>>>>>>>>>>>>>>>          > >>>>>>>>> of
>>>>>>>>>>>>>>>>>          > >>>>>>>>> THIS is correct...). This in effect is
>>>>> the
>>>>>>> same
>>>>>>>>>> as
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>          design I
>>>>>>>>>>>>>>>>>          > have
>>>>>>>>>>>>>>>>>          > >>>>>>>>> now,
>>>>>>>>>>>>>>>>>          > >>>>>>>>> just with the two tables merged
>>> together.
>>>>>>>>>>>>>>>>>          > >>>>>>>>>
>>>>>>>>>>>>>>>>>          >
>>>>>>>>>>>>>>>>>          >
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
Mime
View raw message