From dev-return-105762-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Tue Jul 16 15:22:30 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 36C6D18064E for ; Tue, 16 Jul 2019 17:22:30 +0200 (CEST) Received: (qmail 16616 invoked by uid 500); 16 Jul 2019 15:22:27 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 16602 invoked by uid 99); 16 Jul 2019 15:22:27 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Jul 2019 15:22:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id C9A6A1A3188 for ; Tue, 16 Jul 2019 15:22:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.199 X-Spam-Level: X-Spam-Status: No, score=-0.199 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent.io Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id lJcZqwo4ultc for ; Tue, 16 Jul 2019 15:22:25 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.221.47; helo=mail-wr1-f47.google.com; envelope-from=john@confluent.io; receiver= Received: from mail-wr1-f47.google.com (mail-wr1-f47.google.com [209.85.221.47]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id ABDD7BC7AD for ; Tue, 16 Jul 2019 15:22:24 +0000 (UTC) Received: by mail-wr1-f47.google.com with SMTP id p17so21383770wrf.11 for ; Tue, 16 Jul 2019 08:22:24 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent.io; s=google; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=dFj7uRC3/x4+sVbYNT5wv+5A5vepJmnKx1+p+uHeRIc=; b=AKmA/A4oyMMuCjecnIlgeTYBc2vpfvn+5ebs3y+3j32/14nOxMcO0i74T7FcjahVOS ZVimWGCTd+Jc8elDXYncpIZ734YOfLoSc4QvNsa6AzJucbslorCULOAonCB3kyB3AuWi v72JqQkvL7ZZHfCouF+90F5nzIFrlRjHSBjrWC+0n1KAgWxfxxH2IqHXOQhSdYLSDNjb OfZznktnXstWpLXi7C5mk3EMmQZ+ft+j/NwMXZY+q2Ka2x//LGbfsGIicUyx/wFMl0yE nQvGhhut7uPNxW9L0/54SKFkt6smDQIoSeCcU1CB/AD3MnO1HGxk+2JzAMOvAR0UMDN/ 7keQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=dFj7uRC3/x4+sVbYNT5wv+5A5vepJmnKx1+p+uHeRIc=; b=FTn6pawMiKQPJVmsXwUF23g7Yqy4eZ/3/Hqf+K+AugapKPWfsKvwS5db6rQT1mVNqE L8jiQ7T6vLg2GOZ13rBLCaHDnSnRwQu3xWS2nXwxXyuAfCCuA5f0mUAoaiBQikVkdrBP /NKSAtc90Dna1hlARGMHUXxuPvVHvOhXBMryWvE2n6OcA3gzEbdEDW4Qx9x2yaoXye4S fwquMfYS0IdjFIaXY04rZ6PaffDdP7AFhfHZxRFEwx6dzW4pxGcjEE+/ohgLXQqZzjYu ZZ7T0Oa/IzFhg7ATw7g8iwmbEI1DLJL4fLNGNxKMXd2M5SKQ3pwkvTD3kA5FfDIR3+dI Pevg== X-Gm-Message-State: APjAAAUrV0pP+ZRsjsm0aVM4+y12iyhriuSAjbPhMZYtaapRVEIYkfZM xY9v5vUNfgGgY+UilOFnTpjS4bCSitJqX6wwIzvWV78ItmY= X-Google-Smtp-Source: APXvYqwysrGuzAb09oSnaHoGK6L0fHc8uNRaCIKmVeB4qyyV5oy0W+p7m91SIzYYjlVlLV8WRrQjW8I5wLjWD52thgk= X-Received: by 2002:a5d:56d0:: with SMTP id m16mr35368215wrw.276.1563290543339; Tue, 16 Jul 2019 08:22:23 -0700 (PDT) MIME-Version: 1.0 References: <7f55dc0f06fa4a6187b19eb9567fafaf@sp0555.alliander.local> In-Reply-To: From: John Roesler Date: Tue, 16 Jul 2019 10:22:12 -0500 Message-ID: Subject: Re: Behaviour of KStreamKTableJoinProcessor To: dev@kafka.apache.org Content-Type: text/plain; charset="UTF-8" Hi again (again), Ties, Sorry for the confusion, but I was talking to someone else about this, and I started to make a ticket to fix it, and realized once I started looking into it that there is actually no repartition topic for a stream-globalTable join. So, if you do something like: ===== public static void main(String[] args) { final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream left = streamsBuilder.stream("left").selectKey((ok, ov) -> "newK"+ok); final GlobalKTable right = streamsBuilder.globalTable("right"); final KStream> join = left.join(right, (ok, ov) -> ok, KeyValue::new); join.to("out"); final Topology build = streamsBuilder.build(); System.out.println(build.describe()); } ==== (namely, the selectKey on the stream) Then, you should get the result you expect. Sorry again for my multiple replies. -John On Mon, Jul 15, 2019 at 11:35 AM John Roesler wrote: > > Hi again, Ties, > > I think I spoke too soon and also misread your email. > > By any chance, are you doing a join of a KStream and a GlobalKTable? > > In this case, it would make perfect sense to do what you're doing, but > unfortunately the current implementation doesn't support it. > > Your workaround would be to use KStream.selectKey on the left side to > pick a key before the join. Unfortunately, this will create a > repartition topic that is unnecessary when you're joining with a > GlobalKTable. > > On the other hand, you could at that point switch to a regular > KStream/KTable join and reduce the memory/storage requirements, as > each node won't have to host the whole global data set anymore. > > Please feel free to share your code in some form to clear up the > situation in case I got it wrong again. > > Thanks, > -John > > On Mon, Jul 15, 2019 at 10:48 AM John Roesler wrote: > > > > Hi Ties, > > > > You're on the right track. You need to use `KTable.map` ahead of the > > join to select the new key. This will allow Streams to make sure the > > data is correctly partitioned to perform the join. > > > > Thanks, > > -John > > > > On Mon, Jul 15, 2019 at 10:07 AM Ven, Ties Jens van de > > wrote: > > > > > > I recently started working with kafka streams and I noticed some odd behavior. > > > > > > I was using a KTable left join with a null key, and ofcourse this will not work, since it will join based on keys. > > > But I also supplied a KeyValueMapper, which takes a property from the value and returns this as key, and uses this value to join. > > > > > > It turns out that in the code, it firsts checks if there is a null key, and if so, it skips. > > > Would it be more logical to check the result of the keyMapper for null instead of the actual key? > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java > > > > > > Kind regards > > > > > > Ties