kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: KStream: KTable-KTable leftJoin with key only on RHS of join generates null in joined table
Date Thu, 07 Jul 2016 17:02:25 GMT
Hello,

The problem is that, in stream processing when you have a new record coming
from the big table tB and does not find a match with the small table (i.e.
only a sub set of keys in practice) in tA, you cannot tell which case of
the following is true:

1. there was a matching record in tA, which gets deleted before this tB
record arrives. In this case we need to output a null record indicating to
"negate" the previous join result.
2. there was never a matching record in tA. In this case we actually do not
need to output anything.

As I mentioned, the root cause is that tables in stream processing can be
updated while it is on-going. As for Kafka Streams implementation, the if a
null record is received in the downstream operators while there is no
existing record for that key in the materialized view, then it is treated
as a no-op.


Guozhang


On Wed, Jul 6, 2016 at 3:59 AM, Philippe Derome <phderome@gmail.com> wrote:

> thanks, I understand that it's not modelled the same way as database joins.
>
> Let's take an example of inner join of very small population set (NBA
> rookies or US senators) with larger table (data on zip codes). Let's assume
> we want to identify the crime rate of zip codes where current senators live
> or the median income of zip codes where NBA rookies currently live. These
> small elite population samples will likely never live in the 50% poorest
> zip codes in US (although exceptionally some might) and NBA rookies will
> not live far from their team home base (Maine, Alaska, Hawaii, North
> Dakota) so many zip codes will not match and are expected to never match.
> So, I don't see that the keys representing such zip codes will become
> eventually consistent.
>
> One can imagine an application that makes case of census data (with many
> zip codes) and interested in many such statistics for several such small
> "elite" populations and then the irrelevant zip codes with null records
> find their ways multiple times in the data pipeline.
>
> I cannot think of a more egregious example where one table has billions of
> keys and the other only a handful that would match but I'd assume that such
> use cases could be natural.
>
> It seems to me that the null keys should be output to represent a record
> deletion in the resulting table, but not a near miss on data selection.
>
> On Tue, Jul 5, 2016 at 12:44 AM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > Hello,
> >
> > The KTable join semantics is not exactly the same with that of a RDBMS.
> You
> > can fine detailed semantics in the web docs (search for Joining Streams):
> >
> >
> >
> http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl
> >
> > In a nutshell, the joiner will be triggered only if both / left / either
> of
> > the joining streams has the matching record with the key of the incoming
> > received record (so the input values of the joiner could not be null /
> can
> > be null for only the other value / can be null on either values, but not
> > both), and otherwise a pair of {join-key, null} is output. We made this
> > design deliberately just to make sure that "table-table joins are
> > eventually consistent". This gives a kind of resilience to late arrival
> of
> > records that a late arrival in either stream can "update" the join
> result.
> >
> >
> > Guozhang
> >
> > On Mon, Jul 4, 2016 at 6:10 PM, Philippe Derome <phderome@gmail.com>
> > wrote:
> >
> > > Same happens for regular join, keys that appear only in one stream will
> > > make it to output KTable tC with a null for either input stream. I
> guess
> > > it's related to Kafka-3911 Enforce ktable Materialization or umbrella
> > JIRA
> > > 3909, Queryable state for Kafka Streams?
> > >
> > > On Mon, Jul 4, 2016 at 8:45 PM, Philippe Derome <phderome@gmail.com>
> > > wrote:
> > >
> > > > If we have two streams A and B for which we associate tables tA and
> tB,
> > > > then create a table tC as ta.leftJoin(tB, <some value joiner>) and
> then
> > > we
> > > > have a key kB in stream B but never made it to tA nor tC, do we need
> to
> > > > inject a pair (k,v) of (kB, null) into resulting change log for tC ?
> > > >
> > > > It sounds like it is definitely necessary if key kB is present in
> table
> > > tC
> > > > but if not, why add it?
> > > >
> > > > I have an example that reproduces this and would like to know if it
> is
> > > > considered normal, sub-optimal, or a defect. I don't view it as
> normal
> > > for
> > > > time being, particularly considering stream A as having very few keys
> > > and B
> > > > as having many, which could lead to an unnecessary large change log
> for
> > > C.
> > > >
> > > > Phil
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message