kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Roesler <j...@confluent.io>
Subject Re: Behaviour of KStreamKTableJoinProcessor
Date Tue, 16 Jul 2019 15:22:12 GMT
Hi again (again), Ties,

Sorry for the confusion, but I was talking to someone else about this,
and I started to make a ticket to fix it, and realized once I started
looking into it that there is actually no repartition topic for a
stream-globalTable join.

So, if you do something like:

=====
public static void main(String[] args) {
    final StreamsBuilder streamsBuilder = new StreamsBuilder();
    final KStream<Object, Object> left =
streamsBuilder.stream("left").selectKey((ok, ov) -> "newK"+ok);
    final GlobalKTable<Object, Object> right =
streamsBuilder.globalTable("right");

    final KStream<Object, KeyValue<Object, Object>> join =
left.join(right, (ok, ov) -> ok, KeyValue::new);

    join.to("out");

    final Topology build = streamsBuilder.build();

    System.out.println(build.describe());
}
====

(namely, the selectKey on the stream)

Then, you should get the result you expect.

Sorry again for my multiple replies.
-John

On Mon, Jul 15, 2019 at 11:35 AM John Roesler <john@confluent.io> wrote:
>
> Hi again, Ties,
>
> I think I spoke too soon and also misread your email.
>
> By any chance, are you doing a join of a KStream and a GlobalKTable?
>
> In this case, it would make perfect sense to do what you're doing, but
> unfortunately the current implementation doesn't support it.
>
> Your workaround would be to use KStream.selectKey on the left side to
> pick a key before the join. Unfortunately, this will create a
> repartition topic that is unnecessary when you're joining with a
> GlobalKTable.
>
> On the other hand, you could at that point switch to a regular
> KStream/KTable join and reduce the memory/storage requirements, as
> each node won't have to host the whole global data set anymore.
>
> Please feel free to share your code in some form to clear up the
> situation in case I got it wrong again.
>
> Thanks,
> -John
>
> On Mon, Jul 15, 2019 at 10:48 AM John Roesler <john@confluent.io> wrote:
> >
> > Hi Ties,
> >
> > You're on the right track. You need to use `KTable.map` ahead of the
> > join to select the new key. This will allow Streams to make sure the
> > data is correctly partitioned to perform the join.
> >
> > Thanks,
> > -John
> >
> > On Mon, Jul 15, 2019 at 10:07 AM Ven, Ties Jens van de
> > <ties.van.de.ven@alliander.com> wrote:
> > >
> > > I recently started working with kafka streams and I noticed some odd behavior.
> > >
> > > I was using a KTable left join with a null key, and ofcourse this will not
work, since it will join based on keys.
> > > But I also supplied a KeyValueMapper, which takes a property from the value
and returns this as key, and uses this value to join.
> > >
> > > It turns out that in the code, it firsts checks if there is a null key, and
if so, it skips.
> > > Would it be more logical to check the result of the keyMapper for null instead
of the actual key?
> > >
> > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
> > >
> > > Kind regards
> > >
> > > Ties

Mime
View raw message