From dev-return-105631-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Thu Jul 11 23:00:25 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id A1B641804BB for ; Fri, 12 Jul 2019 01:00:24 +0200 (CEST) Received: (qmail 13821 invoked by uid 500); 11 Jul 2019 23:00:21 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 13769 invoked by uid 99); 11 Jul 2019 23:00:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jul 2019 23:00:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id A44E6C23B2 for ; Thu, 11 Jul 2019 23:00:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.897 X-Spam-Level: * X-Spam-Status: No, score=1.897 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, KAM_ASCII_DIVIDERS=0.8, PDS_NO_HELO_DNS=1.294, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=0.001, RCVD_IN_MSPIKE_WL=0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent.io Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id FOLJ4lXaVzhu for ; Thu, 11 Jul 2019 23:00:13 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.214.195; helo=mail-pl1-f195.google.com; envelope-from=matthias@confluent.io; receiver= Received: from mail-pl1-f195.google.com (mail-pl1-f195.google.com [209.85.214.195]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 147EFBC761 for ; Thu, 11 Jul 2019 23:00:12 +0000 (UTC) Received: by mail-pl1-f195.google.com with SMTP id m9so3764345pls.8 for ; Thu, 11 Jul 2019 16:00:12 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent.io; s=google; h=subject:from:to:references:cc:openpgp:autocrypt:organization :message-id:date:user-agent:mime-version:in-reply-to; bh=++vuf4qMaYM43xh8ITT1j53OkKrg9PMAjyaPP+An3L4=; b=FFgxA1MG3OYcmErf6TPqWutBvXF51btMQ/J5Kl5m6SCADuwx/uNAhR+8SG0HzXK+V+ XtQPZntNx80MB896ixFp/4/mzWCz7q7lNelDoHjg/orWolEffpUxR5LYOX+loSkVcLOP wgfdLo1Q8WQhq4Ti+jg5uF+EErhOwjfQh3J5hCKG1O16b4OojGIG3d05p3AFoodoHIzR GwWsuYJ8UdgvAjmPMsHdWQa5+bh80lyYaCdLNCfGAakjqLtDENp0can69/3ROTrxWB16 fYMJBPMWEwoPZ39ehZfbgPQKa6Px2mFrSkLWl1p4NcKwbQ482ViazixtgU0CB51t6WF3 M3Lg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:subject:from:to:references:cc:openpgp:autocrypt :organization:message-id:date:user-agent:mime-version:in-reply-to; bh=++vuf4qMaYM43xh8ITT1j53OkKrg9PMAjyaPP+An3L4=; b=N26ZeP20wBE1v85bGBVH958FHLuXcz8/caX4nXZHuwwzEpTvae70bWu/STr98X1RSf 23tI35UHYUWOn8xtCLCFoIdnrb1KlIfGTi93gG8gnySLfpbXGCOylRfqXiTJd7oZMeLh Ip/GHPRh5Tqv5ENoCrErv0tdEj/hjxFJ4g89crL+8B2I+2bQjkzRXP/wddFszi3jdADP qEfd9FeXpT1H1u04Tr6i5id96notxNMiFdqLUHY7l4B6lg4pwrM0ZCIy5BZAzJo48Vac 4pLfSsauD2QqWXfm4veIOD7YsN14zM1AJsA9rSopcV8ZJ0Ls3gwbDY5TM4bsw4JqN9nL 61Bw== X-Gm-Message-State: APjAAAXkNikTLsK9eDupL9pGBqBIhXHHGZwD1DLoy49MgC5chC5f8nM1 6QToB7+FTvhOCn3qoD6tSTjs1QfcCLM= X-Google-Smtp-Source: APXvYqxYBro4XqldCCe0vxx5hxvxjbdGWYHs3OzqnJvC6IVR7uxyj78TbzLTwu804Fwo/CX9TNOpqg== X-Received: by 2002:a17:902:7088:: with SMTP id z8mr7509705plk.125.1562886011390; Thu, 11 Jul 2019 16:00:11 -0700 (PDT) Received: from Matthias-Sax-Macbook-Pro.local (50-0-2-20.static.sonic.net. [50.0.2.20]) by smtp.gmail.com with ESMTPSA id e10sm6621495pfi.173.2019.07.11.16.00.10 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Thu, 11 Jul 2019 16:00:10 -0700 (PDT) Subject: Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream From: "Matthias J. Sax" To: dev@kafka.apache.org References: <03267022-7221-473C-8B44-B26FF8BBB7AC@gmail.com> <186DE1C5-1FEE-4AD1-814F-63AAE0B4A254@gmail.com> <6a7dcef5-40cd-8495-49c6-8cd6bc672341@confluent.io> <322c4932-3ca5-e97b-607b-da53b2b6128a@confluent.io> Cc: iponomarev@mail.ru Openpgp: preference=signencrypt Autocrypt: addr=matthias@confluent.io; prefer-encrypt=mutual; keydata= mQINBFcGWisBEAD4+gj1tJcLJRckkbZjdJpd1347/Zwndn8R6r2X+YYS5EgwzP5OQHl3Q6jl hAISoqBEfDeTJffsxd1wWL+6wKU4Y7zCkH/3aL/7znOlfaewpgJP3x/naawgvnJ0jlPlJtev MlAbG+9P6aEVxYfML59KBtRKzd6OZbSh0VzCJVCvkslv+LZqR94lhA0rArupqe7EO9DuP4/V bvnDxx1dZFtEK4n4wJYsRkF+TuxGClLcfosfM0oHTZeolus+rJTi7wxrbcTOlTmOMW0Wf9rK AobXsSz838RJenQqe3X0s5EBKCoIdI2SCQiTfcJ2JTVt6Ip1IDuEVqMQmtz7i2l3Rlml0GDa gODehmeMczVIBeO0+cppzOEynjQlWLCbJ8XEjISMI87Ied6DGbEYKLnG4ucRjM//8syKI4T+ Z90Y060jhWxrvr2pGqPPaU3qvIXW1D1mchXE1ba4HOdKb6fA7j5NU47WA2YmWRDhfM4exE0Q mD3Jfjfjyuch2rGhT3twSWHk7v5zlINwOfTIfeXvShqxNzJRFf6MudFnFbgmMTo51LmPcXHz 3tUaRNoky/HRpSxU7h145SgltrKSmYgWUnG4Y3qySyiPVKfBUBi/e5dYTk4Y0NDWGhZxOXCs ZV0NQsuoqFD82LrglwECrcdHd2QaKnIX2eKB7j63dMsexFDjewARAQABtCdNYXR0aGlhcyBK LiBTYXggPG1hdHRoaWFzQGNvbmZsdWVudC5pbz6JAjgEEwECACIFAlcGWisCGwMGCwkIBwMC BhUIAgkKCwQWAgMBAh4BAheAAAoJEDwRZiHEirRPWcoP/if/UkALwOvkct/IufpQhJ9qmg/a +QrSkDbPFrkWA7r9aMnaX8C5xuFgFuekT2aMzYYEr3eQwNHeeeEEFUhJQnMsKrjMw55w8+J5 Zhzdg8OKepFT0BOovXzITZcyqBW5JdMjfwEkdztBmPHbvWCCjglSllCvNDorxF1FCYuiL+F3 aBx0SaKaXuNhA4GO2IBY68SQ04ueeSbKbnukyWdAYXObBPBvbcoJXeesX6QvANxSjCqPHRsc czAr1mADzPN58nRXrOYgeonhPROYRlhLyEJM9CnGby/GRb9WfrKFwQVVjpNT0dD9vOvobmEp o4//m6qeXh6xC/egW9vBl63fJNOb/A6T1JdFdU8VWpUofAZtPHE61Y11AmkL7EoY5eUhlL0i jCQ4+K4wERP5NGOBEHe6wfxWoQMnPwnj59N8GylCOMhaVaIYqqRuAssMWyliX8nAj0dO3Hbw EZmBMt/xAAZpXCJU3iDS1G0+fs5LIIBWJodBvjec3DpIlib3PGG6IKPfkwpTfyoStRaPAc+X 085/KhgI8hM1nCxzI/0iQSsuyrJikYcCiB+EukaxTy1TS7O3Ul7Sg6f5f2YXB3jh9rPgZf/C y8iIJDyY+zfJLaYPO3uMEtqW69SWVeLM2viy5pj3MtEzDgC16OLINRdIsibPXRchdv7KN2FZ w4uvgTnZuQINBFypIPoBEACu4ZoR57pUDJZ3UuQR/UQRetv/gZyVwhmx7zL2oA2ZLWn0GwWK ruMRdqFRh2dv0eml7W57GK2RJsvNS2hzPHDteHLgOXl5Hhg+mrP00A2srifiBN9sG1PM5tAd VwGllQcR786IiE90NP2g8C3ThCSNZCFmtVi5hmgoMoad/szOFGN7mCiSzoXETFiwmBFUfrRu Q955IMdWuS47WNBQmh631nRDrrk3hOL3NB6PPIvYHooBrTF9N0hL1qa0t4Eu06z3uiA1o0A1 cpTP7WjHyrc6wq54xTZGYX7r1c1MWQobiUg7I8W16WbZZdrKH1ZPXkHoS7w7uMyhjF5atXl+ APDtZ6fMljJ3JF/9rBgAGgSJGZj8qwDnrrA2CwXIvelztS96wJYg2WXmkH+wrTLAyZEZIpH5 vAAXeIQLV4/Fmx4RZzYVHwzuTfO4jg8Im1C22XgQ0sqbrKMzxa13ivKNBy4oRgxhxsU0iXqP wzjm1uW/t6LA61TGrwW50kNZ3rW3yG60msdRejnO5HC7kuLK9GV6cJArEJT01RRXDEmqqpG9 N7Jmit6gnCP16TPFB+QvKXab+AAwIeG6E9GcSdUAOkdD5PWv+v6XJrRkMqbfEH0j6Hc84g0z 9kCFTmnK0/zjpEhTp3zzoknGGciX1MAfxXoL/5JnukQWH3FRUPbfri3ZzwARAQABiQRyBBgB CAAmFiEEV9+AVI/wFUXaxShcPBFmIcSKtE8FAlypIPoCGwIFCQHhM4ACQAkQPBFmIcSKtE/B dCAEGQEIAB0WIQTyiy7YJwIIXl2i4ZC7w8Foa7nDUQUCXKkg+gAKCRC7w8Foa7nDUVnwD/40 R713CkSP0easwIIOyR3/oVUuTeDgtaBATYXczXLjXSYjpo6ErokhKn8cwSPhaXRGHkBMUk6b 97dQimN5p2+/PdolP8hFRvq3V9ZdcGIiK9Iq5kJlpbAV31r9Vhv7zq2wiym0LXrlwSFf+6Mn RWSIcU2Si1ySwMjXDaO7YlIj7sC/DVnlhGcPAvNgKyEt8ZZYYfWXLyrkhOzfgXInyo71XDQQ hu/8mSQwiFFyVlyw8QIWtrEPJmYHBYoBNp17qprlF/3WIiSaNDGN7Cf4Ft35XyMRLJWKsVSH FYdLJ2xxgrD58NnHXpw5ViWTHRg01CDLMjo21UaIKuLcPxsflSWkpbPULhO3Xpoiee1UxECe YBAQIRSXBdQx33SYiBdDYjBaoe/PmX3nbYf+xKEc2NOW7ClqaX/r8xh0cDiYuzOvJkvjVR6X KCi7VFcXxk7cVV65arnDFyryJ60EpNpf7BzeJxY6Lf4UfGrUNl6C//3XyKa/8eUj7RJlYROt zmSGboRNy0SJXQxThPHAMkbElEZrDe2sp4fsjlFmBDdxHojIyZnwySOrMXYPz9E4lXrVlWhs WLB9/sWWYjKWE0pq5Th6ddLLZM+blCMK6iFyQQXy83uqm9YR+eY0SGakJ1v4IVFfmO9PwqFU juMuk0Dl7IItv9pdUfYjsniPxr6Dz27TwI3NEAC5v7VvBJyilh+Honck5FbR9AUyka6D9wxb zsHWUKSQYRRXn8BfHGC+I9SLhZ+HbEwVG7069LOKz+nwOEdamZ4LDpyeRJRV9W7al2aNR66O yj+/4NG5aduk/WC8bKI6k2YkPu94abMEKHZPLXGMhbjQMlINnQaEcPBXG5FmsfPEIQs2XuMr GehVc8IQI1/o9Xn+4z69fgiXHyIEr6f+gfuSyGOaw/7gczDxxln0zUEZ2G5LUSv/RD2VaX39 PhRy7hoy9Uw4g9ZSs8OsQrjXjGTp9gX409WgDXXIss53jlCPoB59VGOsKWDBd22caG5PsdAY pLiE6JTOOoH3WHyzEshEiFbyQ1nIdFIdGzeNU+r05rtKT249SgX4fw8drym9c07PgjtSRgVa YgifOCOj9vCYgwh7Xny1B+TRX+qK3l3xrB/U6lyftZ9wdY3o2azT8uyKqG2V9nt1lFPmvJTP 2418Dglmqvpioq13zYbOUiC//k6JmSwFZR4SqAmYdsX2dh3DcIGV8CZrP1KcBsnwBgmc6Jqk zmqQ1AnWGbIyn5Bq8Ga2GtaPSUncPW+3iYAaAUWbAXODczMMFfg0UmMisyzqkbrGGymqTgVM sKLgdF+HYVyXZpbOqd/GoUDVeqAphDpypH8LABpVkIjbCnkp5UmM/gE3RU9+OP9cxzk0U7og YrkCDQRcqSFDARAAoPkZcOKC+ajTguBOSfnKykxEM8ITChN98S79pMxj6mSFi+wE3YBi/24H FCLsYcNRhDQ0LhhVDU9/e7PoGmNTFI0Eqe4yvzDVOBnUMyzhudcT3S/pDfWDCaKg/18bBQ1a c9QUJEd9PS6llv1/KUUsfP3HJBFPiJI7j7MC0cpWOf1h+hgQyRhuDn8tKEPUdQ0wdqILo9qN 5dzGf/Ilh9vcnFWdmn3TDS5nC1wFyfbZKhbm+F6paZGIIMVDUg3jI136INmSuidaFv2jJhWv BcugZ0z44s36BSkrIk4jMYN6y4xXkYWDdsyaeMdPGVkWwGkMKhLdchQGIB7LY9S6FuRZWf6Q 1N6gpJcSNX/V7ICrmUNn1zSBUXiL5oh1bhwQi4qvFS2aMzhDjcf7/Qx0LJGlGmBwB4lvyfd0 pk65Kwaqc08uNhnjlJtsdnbWfiWtRW8Y2FEwn/VGST/PiqknBYrfNh84jXpRIFVM76pduxY7 vqI8y5BZyL0o/UQkso82T+K9+V7L5k48MYPoNUdZM2xvSJ3xBgZjQP9WRQZdWxH3wf7ERaM7 d+8Z4rO/cwE+A7D3gjmfchc3o7IBJrMpwM3hITTc1gRfxHN7Fh+qQNdp6k84AAnvaFh3CSzQ MA4/VL9tzRA/ib9eBHe2r3pMp23sshA0ALS5yXIkD7J60CAjLbMAEQEAAYkCPAQYAQgAJhYh BFffgFSP8BVF2sUoXDwRZiHEirRPBQJcqSFDAhsMBQkB4TOAAAoJEDwRZiHEirRPwBkQANNw HwEEnJJiaRwrTz1u9Ie4GJG9A0tpJ5rL/DSgCdNHTmtr28oP0bjqTWnxbhBT0aGXi8tM0QJ/ isVqKfaV2Fl+32xG3a9/Y8hMqtGErZjZQjHAhBLw3tjc2/1g8EOmsb+P3AkbWNWatcejcK9U OErJvo/EzUxHLktgFSgKf6rsx4Pv910mFEczcsf8Fq75lpqV8QH+/n/gJx4enlhLOoE9MrG0 RDMbMzJMN5IDqGU3Ae/Khcya8jh5h8rSl4Y+mdXbR5yD//YXI/VK70wkk8Fe6JdA4+7DY9/s d6fi1t46D9tveO0nSnvxOdKMfoRDIyLRmZgQqLgjiC/1pJ1ZyZHMUpSvuX6Fy1G1jeAiR9Vj PVSzRMT0XtHVh6zeOPp+qIrMkuPQXeItn/t4KoY9bSCzU5YGEjmlDwQyEhVUpdi6Fa3O1vrb d4+pkVefdrAePy/W7dYiWzJKPT4lOKAFc1a9uJ5+koDb+imQd7MjX7bWFz29ocsfuqvDnKIy 08Oq3k0IhF/lGDQCekWJ6Tap3mtOrvo0m9Z3pwOhWrVdLnHAPWxXP5rJ3Pdame0pn8AFRG40 0XOX0Jn6YDBLXXynr79ZCqiSqs2VDLIP4kW3M4wYe7MsqvbGBTLyOcGnUEQMLT+mpaUJg+ac I5FRbGf3c/r2PPse2sMSKW6sZ1+MhAwK Organization: Confluent Inc Message-ID: Date: Thu, 11 Jul 2019 16:00:09 -0700 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:60.0) Gecko/20100101 Thunderbird/60.7.2 MIME-Version: 1.0 In-Reply-To: <322c4932-3ca5-e97b-607b-da53b2b6128a@confluent.io> Content-Type: multipart/signed; micalg=pgp-sha512; protocol="application/pgp-signature"; boundary="bgOlCvD5pKs7bsCLRWqbvvgMqwy8MWVf8" --bgOlCvD5pKs7bsCLRWqbvvgMqwy8MWVf8 Content-Type: multipart/mixed; boundary="bEAP2XPcrW3hVWFfqHTg8icBuBdbftWGZ"; protected-headers="v1" From: "Matthias J. Sax" To: dev@kafka.apache.org Cc: iponomarev@mail.ru Message-ID: Subject: Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream References: <8ee71519-1c44-13dd-a4a1-437e53d584cb@mail.ru> <03267022-7221-473C-8B44-B26FF8BBB7AC@gmail.com> <186DE1C5-1FEE-4AD1-814F-63AAE0B4A254@gmail.com> <6a7dcef5-40cd-8495-49c6-8cd6bc672341@confluent.io> <322c4932-3ca5-e97b-607b-da53b2b6128a@confluent.io> In-Reply-To: <322c4932-3ca5-e97b-607b-da53b2b6128a@confluent.io> --bEAP2XPcrW3hVWFfqHTg8icBuBdbftWGZ Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: quoted-printable Ivan, did you see my last reply? What do you think about my proposal to mix both approaches and try to get best-of-both worlds? -Matthias On 6/11/19 3:56 PM, Matthias J. Sax wrote: > Thanks for the input John! >=20 >> under your suggestion, it seems that the name is required >=20 > If you want to get the `KStream` as part of the `Map` back using a > `Function`, yes. If you follow the "embedded chaining" pattern using a > `Consumer`, no. >=20 > Allowing for a default name via `split()` can of course be done. > Similarly, using `Named` instead of `String` is possible. >=20 > I wanted to sketch out a high level proposal to merge both patterns > only. Your suggestions to align the new API with the existing API make > totally sense. >=20 >=20 >=20 > One follow up question: Would `Named` be optional or required in > `split()` and `branch()`? It's unclear from your example. >=20 > If both are mandatory, what do we gain by it? The returned `Map` only > contains the corresponding branches, so why should we prefix all of > them? If only `Named` is mandatory in `branch()`, but optional in > `split()`, the same question raises? >=20 > Requiring `Named` in `split()` seems only to make sense, if `Named` is > optional in `branch()` and we generate `-X` suffix using a counter for > different branch name. However, this might lead to the problem of > changing names if branches are added/removed. Also, how would the names= > be generated if `Consumer` is mixed in (ie, not all branches are > returned in the `Map`). >=20 > If `Named` is optional for both, it could happen that a user misses to > specify a name for a branch what would lead to runtime issues. >=20 >=20 > Hence, I am actually in favor to not allow a default name but keep > `split()` without parameter and make `Named` in `branch()` required if = a > `Function` is used. This makes it explicit to the user that specifying = a > name is required if a `Function` is used. >=20 >=20 >=20 > About >=20 >> KBranchedStream#branch(BranchConfig) >=20 > I don't think that the branching predicate is a configuration and hence= > would not include it in a configuration object. >=20 >> withChain(...); >=20 > Similar, `withChain()` (that would only take a `Consumer`?) does not > seem to be a configuration. We can also not prevent a user to call > `withName()` in combination of `withChain()` what does not make sense > IMHO. We could of course throw an RTE but not have a compile time check= > seems less appealing. Also, it could happen that neither `withChain()` > not `withName()` is called and the branch is missing in the returned > `Map` what lead to runtime issues, too. >=20 > Hence, I don't think that we should add `BranchConfig`. A config object= > is helpful if each configuration can be set independently of all others= , > but this seems not to be the case here. If we add new configuration > later, we can also just move forward by deprecating the methods that > accept `Named` and add new methods that accepted `BranchConfig` (that > would of course implement `Named`). >=20 >=20 > Thoughts? >=20 >=20 > @Ivan, what do you think about the general idea to blend the two main > approaches of returning a `Map` plus an "embedded chaining"? >=20 >=20 >=20 > -Matthias >=20 >=20 >=20 > On 6/4/19 10:33 AM, John Roesler wrote: >> Thanks for the idea, Matthias, it does seem like this would satisfy >> everyone. Returning the map from the terminal operations also solves >> the problem of merging/joining the branched streams, if we want to add= >> support for the compliment later on. >> >> Under your suggestion, it seems that the name is required. Otherwise, >> we wouldn't have keys for the map to return. I this this is actually >> not too bad, since experience has taught us that, although names for >> operations are not required to define stream processing logic, it does= >> significantly improve the operational experience when you can map the >> topology, logs, metrics, etc. back to the source code. Since you >> wouldn't (have to) reference the name to chain extra processing onto >> the branch (thanks to the second argument), you can avoid the >> "unchecked name" problem that Ivan pointed out. >> >> In the current implementation of Branch, you can name the branch >> operator itself, and then all the branches get index-suffixed names >> built from the branch operator name. I guess under this proposal, we >> could naturally append the branch name to the branching operator name,= >> like this: >> >> stream.split(Named.withName("mysplit")) //creates node "mysplit" >> .branch(..., ..., "abranch") // creates node "mysplit-ab= ranch" >> .defaultBranch(...) // creates node "mysplit-default" >> >> It does make me wonder about the DSL syntax itself, though. >> >> We don't have a defined grammar, so there's plenty of room to debate >> the "best" syntax in the context of each operation, but in general, >> the KStream DSL operators follow this pattern: >> >> operator(function, config_object?) OR operator(config_object) >> >> where config_object is often just Named in the "function" variant. >> Even when the config_object isn't a Named, but some other config >> class, that config class _always_ implements NamedOperation. >> >> Here, we're introducing a totally different pattern: >> >> operator(function, function, string) >> >> where the string is the name. >> My first question is whether the name should instead be specified with= >> the NamedOperation interface. >> >> My second question is whether we should just roll all these arguments >> up into a config object like: >> >> KBranchedStream#branch(BranchConfig) >> >> interface BranchConfig extends NamedOperation { >> withPredicate(...); >> withChain(...); >> withName(...); >> } >> >> Although I guess we'd like to call BranchConfig something more like >> "Branched", even if I don't particularly like that pattern. >> >> This makes the source code a little noisier, but it also makes us more= >> future-proof, as we can deal with a wide range of alternatives purely >> in the config interface, and never have to deal with adding overloads >> to the KBranchedStream if/when we decide we want the name to be >> optional, or the KStream->KStream to be optional. >> >> WDYT? >> >> Thanks, >> -John >> >> On Fri, May 24, 2019 at 5:25 PM Michael Drogalis >> wrote: >>> >>> Matthias: I think that's pretty reasonable from my point of view. Goo= d >>> suggestion. >>> >>> On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax >>> wrote: >>> >>>> Interesting discussion. >>>> >>>> I am wondering, if we cannot unify the advantage of both approaches:= >>>> >>>> >>>> >>>> KStream#split() -> KBranchedStream >>>> >>>> // branch is not easily accessible in current scope >>>> KBranchedStream#branch(Predicate, Consumer) >>>> -> KBranchedStream >>>> >>>> // assign a name to the branch and >>>> // return the sub-stream to the current scope later >>>> // >>>> // can be simple as `#branch(p, s->s, "name")` >>>> // or also complex as `#branch(p, s->s.filter(...), "name")` >>>> KBranchedStream#branch(Predicate, Function, String)= >>>> -> KBranchedStream >>>> >>>> // default branch is not easily accessible >>>> // return map of all named sub-stream into current scope >>>> KBranchedStream#default(Cosumer) >>>> -> Map >>>> >>>> // assign custom name to default-branch >>>> // return map of all named sub-stream into current scope >>>> KBranchedStream#default(Function, String) >>>> -> Map >>>> >>>> // assign a default name for default >>>> // return map of all named sub-stream into current scope >>>> KBranchedStream#defaultBranch(Function) >>>> -> Map >>>> >>>> // return map of all names sub-stream into current scope >>>> KBranchedStream#noDefaultBranch() >>>> -> Map >>>> >>>> >>>> >>>> Hence, for each sub-stream, the user can pick to add a name and retu= rn >>>> the branch "result" to the calling scope or not. The implementation = can >>>> also check at runtime that all returned names are unique. The return= ed >>>> Map can be empty and it's also optional to use the Map. >>>> >>>> To me, it seems like a good way to get best of both worlds. >>>> >>>> Thoughts? >>>> >>>> >>>> >>>> -Matthias >>>> >>>> >>>> >>>> >>>> On 5/6/19 5:15 PM, John Roesler wrote: >>>>> Ivan, >>>>> >>>>> That's a very good point about the "start" operator in the dynamic = case. >>>>> I had no problem with "split()"; I was just questioning the necessi= ty. >>>>> Since you've provided a proof of necessity, I'm in favor of the >>>>> "split()" start operator. Thanks! >>>>> >>>>> Separately, I'm interested to see where the present discussion lead= s. >>>>> I've written enough Javascript code in my life to be suspicious of >>>>> nested closures. You have a good point about using method reference= s (or >>>>> indeed function literals also work). It should be validating that t= his >>>>> was also the JS community's first approach to flattening the logic = when >>>>> their nested closure situation got out of hand. Unfortunately, it's= >>>>> replacing nesting with redirection, both of which disrupt code >>>>> readability (but in different ways for different reasons). In other= >>>>> words, I agree that function references is *the* first-order soluti= on if >>>>> the nested code does indeed become a problem. >>>>> >>>>> However, the history of JS also tells us that function references a= ren't >>>>> the end of the story either, and you can see that by observing that= >>>>> there have been two follow-on eras, as they continue trying to cope= with >>>>> the consequences of living in such a callback-heavy language. First= , you >>>>> have Futures/Promises, which essentially let you convert nested cod= e to >>>>> method-chained code (Observables/FP is a popular variation on this)= =2E >>>>> Most lately, you have async/await, which is an effort to apply lang= uage >>>>> (not just API) syntax to the problem, and offer the "flattest" poss= ible >>>>> programming style to solve the problem (because you get back to jus= t one >>>>> code block per functional unit). >>>>> >>>>> Stream-processing is a different domain, and Java+KStreams is nowhe= re >>>>> near as callback heavy as JS, so I don't think we have to take the = JS >>>>> story for granted, but then again, I think we can derive some valua= ble >>>>> lessons by looking sideways to adjacent domains. I'm just bringing = this >>>>> up to inspire further/deeper discussion. At the same time, just lik= e JS, >>>>> we can afford to take an iterative approach to the problem. >>>>> >>>>> Separately again, I'm interested in the post-branch merge (and I'd = also >>>>> add join) problem that Paul brought up. We can clearly punt on it, = by >>>>> terminating the nested branches with sink operators. But is there a= DSL >>>>> way to do it? >>>>> >>>>> Thanks again for your driving this, >>>>> -John >>>>> >>>>> On Thu, May 2, 2019 at 7:39 PM Paul Whalen >>>> > wrote: >>>>> >>>>> Ivan, I=E2=80=99ll definitely forfeit my point on the clumsines= s of the >>>>> branch(predicate, consumer) solution, I don=E2=80=99t see any r= eal drawbacks >>>>> for the dynamic case. >>>>> >>>>> IMO the one trade off to consider at this point is the scope >>>>> question. I don=E2=80=99t know if I totally agree that =E2=80=9C= we rarely need them >>>>> in the same scope=E2=80=9D since merging the branches back toge= ther later >>>>> seems like a perfectly plausible use case that can be a lot nic= er >>>>> when the branched streams are in the same scope. That being sai= d, >>>>> for the reasons Ivan listed, I think it is overall the better >>>>> solution - working around the scope thing is easy enough if you= need >>>>> to. >>>>> >>>>> > On May 2, 2019, at 7:00 PM, Ivan Ponomarev >>>>> wrote: >>>>> > >>>>> > Hello everyone, thank you all for joining the discussion! >>>>> > >>>>> > Well, I don't think the idea of named branches, be it a >>>>> LinkedHashMap (no other Map will do, because order of definitio= n >>>>> matters) or `branch` method taking name and Consumer has more >>>>> advantages than drawbacks. >>>>> > >>>>> > In my opinion, the only real positive outcome from Michael's >>>>> proposal is that all the returned branches are in the same scop= e. >>>>> But 1) we rarely need them in the same scope 2) there is a >>>>> workaround for the scope problem, described in the KIP. >>>>> > >>>>> > 'Inlining the complex logic' is not a problem, because we can= use >>>>> method references instead of lambdas. In real world scenarios y= ou >>>>> tend to split the complex logic to methods anyway, so the code = is >>>>> going to be clean. >>>>> > >>>>> > The drawbacks are strong. The cohesion between predicates and= >>>>> handlers is lost. We have to define predicates in one place, an= d >>>>> handlers in another. This opens the door for bugs: >>>>> > >>>>> > - what if we forget to define a handler for a name? or a name= for >>>>> a handler? >>>>> > - what if we misspell a name? >>>>> > - what if we copy-paste and duplicate a name? >>>>> > >>>>> > What Michael propose would have been totally OK if we had bee= n >>>>> writing the API in Lua, Ruby or Python. In those languages the >>>>> "dynamic naming" approach would have looked most concise and >>>>> beautiful. But in Java we expect all the problems related to >>>>> identifiers to be eliminated in compile time. >>>>> > >>>>> > Do we have to invent duck-typing for the Java API? >>>>> > >>>>> > And if we do, what advantage are we supposed to get besides h= aving >>>>> all the branches in the same scope? Michael, maybe I'm missing = your >>>>> point? >>>>> > >>>>> > --- >>>>> > >>>>> > Earlier in this discussion John Roesler also proposed to do >>>>> without "start branching" operator, and later Paul mentioned th= at in >>>>> the case when we have to add a dynamic number of branches, the >>>>> current KIP is 'clumsier' compared to Michael's 'Map' solution.= Let >>>>> me address both comments here. >>>>> > >>>>> > 1) "Start branching" operator (I think that *split* is a good= name >>>>> for it indeed) is critical when we need to do a dynamic branchi= ng, >>>>> see example below. >>>>> > >>>>> > 2) No, dynamic branching in current KIP is not clumsy at all.= >>>>> Imagine a real-world scenario when you need one branch per enum= >>>>> value (say, RecordType). You can have something like this: >>>>> > >>>>> > /*John:if we had to start with stream.branch(...) here, it wo= uld >>>>> have been much messier.*/ >>>>> > KBranchedStream branched =3D stream.split(); >>>>> > >>>>> > /*Not clumsy at all :-)*/ >>>>> > for (RecordType recordType : RecordType.values()) >>>>> > branched =3D branched.branch((k, v) -> v.getRecTy= pe() =3D=3D >>>>> recordType, >>>>> > recordType::processRecords); >>>>> > >>>>> > Regards, >>>>> > >>>>> > Ivan >>>>> > >>>>> > >>>>> > 02.05.2019 14:40, Matthias J. Sax =D0=BF=D0=B8=D1=88=D0=B5=D1= =82: >>>>> >> I also agree with Michael's observation about the core probl= em of >>>>> >> current `branch()` implementation. >>>>> >> >>>>> >> However, I also don't like to pass in a clumsy Map object. M= y >>>>> thinking >>>>> >> was more aligned with Paul's proposal to just add a name to = each >>>>> >> `branch()` statement and return a `Map`. >>>>> >> >>>>> >> It makes the code easier to read, and also make the order of= >>>>> >> `Predicates` (that is essential) easier to grasp. >>>>> >> >>>>> >>>>>> Map> branches =3D stream.split() >>>>> >>>>>> .branch("branchOne", Predicate) >>>>> >>>>>> .branch( "branchTwo", Predicate) >>>>> >>>>>> .defaultBranch("defaultBranch"); >>>>> >> An open question is the case for which no defaultBranch() sh= ould >>>> be >>>>> >> specified. Atm, `split()` and `branch()` would return >>>>> `BranchedKStream` >>>>> >> and the call to `defaultBranch()` that returns the `Map` is >>>> mandatory >>>>> >> (what is not the case atm). Or is this actually not a real >>>> problem, >>>>> >> because users can just ignore the branch returned by >>>>> `defaultBranch()` >>>>> >> in the result `Map` ? >>>>> >> >>>>> >> >>>>> >> About "inlining": So far, it seems to be a matter of persona= l >>>>> >> preference. I can see arguments for both, but no "killer >>>>> argument" yet >>>>> >> that clearly make the case for one or the other. >>>>> >> >>>>> >> >>>>> >> -Matthias >>>>> >> >>>>> >>> On 5/1/19 6:26 PM, Paul Whalen wrote: >>>>> >>> Perhaps inlining is the wrong terminology. It doesn=E2=80=99= t require >>>>> that a lambda with the full downstream topology be defined inli= ne - >>>>> it can be a method reference as with Ivan=E2=80=99s original su= ggestion. >>>>> The advantage of putting the predicate and its downstream logic= >>>>> (Consumer) together in branch() is that they are required to be= near >>>>> to each other. >>>>> >>> >>>>> >>> Ultimately the downstream code has to live somewhere, and d= eep >>>>> branch trees will be hard to read regardless. >>>>> >>> >>>>> >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis >>>>> >>>> > wrote: >>>>> >>>> >>>>> >>>> I'm less enthusiastic about inlining the branch logic with= its >>>>> downstream >>>>> >>>> functionality. Programs that have deep branch trees will >>>>> quickly become >>>>> >>>> harder to read as a single unit. >>>>> >>>> >>>>> >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen >>>>> > wrote: >>>>> >>>>> >>>>> >>>>> Also +1 on the issues/goals as Michael outlined them, I t= hink >>>>> that sets a >>>>> >>>>> great framework for the discussion. >>>>> >>>>> >>>>> >>>>> Regarding the SortedMap solution, my understanding is tha= t the >>>>> current >>>>> >>>>> proposal in the KIP is what is in my PR which (pending na= ming >>>>> decisions) is >>>>> >>>>> roughly this: >>>>> >>>>> >>>>> >>>>> stream.split() >>>>> >>>>> .branch(Predicate, Consumer>) >>>>> >>>>> .branch(Predicate, Consumer>) >>>>> >>>>> .defaultBranch(Consumer>); >>>>> >>>>> >>>>> >>>>> Obviously some ordering is necessary, since branching as = a >>>>> construct >>>>> >>>>> doesn't work without it, but this solution seems like it >>>>> provides as much >>>>> >>>>> associativity as the SortedMap solution, because each bra= nch() >>>>> call >>>>> >>>>> directly associates the "conditional" with the "code bloc= k." >>>>> The value it >>>>> >>>>> provides over the KIP solution is the accessing of stream= s in >>>>> the same >>>>> >>>>> scope. >>>>> >>>>> >>>>> >>>>> The KIP solution is less "dynamic" than the SortedMap sol= ution >>>>> in the sense >>>>> >>>>> that it is slightly clumsier to add a dynamic number of >>>>> branches, but it is >>>>> >>>>> certainly possible. It seems to me like the API should f= avor >>>>> the "static" >>>>> >>>>> case anyway, and should make it simple and readable to >>>>> fluently declare and >>>>> >>>>> access your branches in-line. It also makes it impossibl= e to >>>>> ignore a >>>>> >>>>> branch, and it is possible to build an (almost) identical= >>>>> SortedMap >>>>> >>>>> solution on top of it. >>>>> >>>>> >>>>> >>>>> I could also see a middle ground where instead of a raw >>>>> SortedMap being >>>>> >>>>> taken in, branch() takes a name and not a Consumer. Some= thing >>>>> like this: >>>>> >>>>> >>>>> >>>>> Map> branches =3D stream.split() >>>>> >>>>> .branch("branchOne", Predicate) >>>>> >>>>> .branch( "branchTwo", Predicate) >>>>> >>>>> .defaultBranch("defaultBranch", Consumer= >); >>>>> >>>>> >>>>> >>>>> Pros for that solution: >>>>> >>>>> - accessing branched KStreams in same scope >>>>> >>>>> - no double brace initialization, hopefully slightly more= >>>>> readable than >>>>> >>>>> SortedMap >>>>> >>>>> >>>>> >>>>> Cons >>>>> >>>>> - downstream branch logic cannot be specified inline whic= h >>>>> makes it harder >>>>> >>>>> to read top to bottom (like existing API and SortedMap, b= ut >>>>> unlike the KIP) >>>>> >>>>> - you can forget to "handle" one of the branched streams = (like >>>>> existing >>>>> >>>>> API and SortedMap, but unlike the KIP) >>>>> >>>>> >>>>> >>>>> (KBranchedStreams could even work *both* ways but perhaps= >>>>> that's overdoing >>>>> >>>>> it). >>>>> >>>>> >>>>> >>>>> Overall I'm curious how important it is to be able to eas= ily >>>>> access the >>>>> >>>>> branched KStream in the same scope as the original. It's= >>>>> possible that it >>>>> >>>>> doesn't need to be handled directly by the API, but inste= ad >>>>> left up to the >>>>> >>>>> user. I'm sort of in the middle on it. >>>>> >>>>> >>>>> >>>>> Paul >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman >>>>> > >>>>> >>>>> wrote: >>>>> >>>>> >>>>> >>>>>> I'd like to +1 what Michael said about the issues with t= he >>>>> existing >>>>> >>>>> branch >>>>> >>>>>> method, I agree with what he's outlined and I think we s= hould >>>>> proceed by >>>>> >>>>>> trying to alleviate these problems. Specifically it seem= s >>>>> important to be >>>>> >>>>>> able to cleanly access the individual branches (eg by ma= pping >>>>> >>>>>> name->stream), which I thought was the original intentio= n of >>>>> this KIP. >>>>> >>>>>> >>>>> >>>>>> That said, I don't think we should so easily give in to = the >>>>> double brace >>>>> >>>>>> anti-pattern or force ours users into it if at all possi= ble to >>>>> >>>>> avoid...just >>>>> >>>>>> my two cents. >>>>> >>>>>> >>>>> >>>>>> Cheers, >>>>> >>>>>> Sophie >>>>> >>>>>> >>>>> >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis < >>>>> >>>>>> michael.drogalis@confluent.io >>>>> > wrote: >>>>> >>>>>> >>>>> >>>>>>> I=E2=80=99d like to propose a different way of thinking= about this. >>>>> To me, >>>>> >>>>> there >>>>> >>>>>>> are three problems with the existing branch signature: >>>>> >>>>>>> >>>>> >>>>>>> 1. If you use it the way most people do, Java raises un= safe >>>> type >>>>> >>>>>> warnings. >>>>> >>>>>>> 2. The way in which you use the stream branches is >>>>> positionally coupled >>>>> >>>>>> to >>>>> >>>>>>> the ordering of the conditionals. >>>>> >>>>>>> 3. It is brittle to extend existing branch calls with >>>>> additional code >>>>> >>>>>>> paths. >>>>> >>>>>>> >>>>> >>>>>>> Using associative constructs instead of relying on orde= red >>>>> constructs >>>>> >>>>>> would >>>>> >>>>>>> be a stronger approach. Consider a signature that inste= ad >>>>> looks like >>>>> >>>>>> this: >>>>> >>>>>>> Map> KStream#branch(SortedMap>>>> Predicate>>>> >>>>>>> super K,? super V>>); >>>>> >>>>>>> >>>>> >>>>>>> Branches are given names in a map, and as a result, the= API >>>>> returns a >>>>> >>>>>>> mapping of names to streams. The ordering of the >>>> conditionals is >>>>> >>>>>> maintained >>>>> >>>>>>> because it=E2=80=99s a sorted map. Insert order determi= nes the order >>>> of >>>>> >>>>>> evaluation. >>>>> >>>>>>> This solves problem 1 because there are no more varargs= =2E It >>>>> solves >>>>> >>>>>> problem >>>>> >>>>>>> 2 because you no longer lean on ordering to access the >>>>> branch you=E2=80=99re >>>>> >>>>>>> interested in. It solves problem 3 because you can intr= oduce >>>>> another >>>>> >>>>>>> conditional by simply attaching another name to the >>>>> structure, rather >>>>> >>>>>> than >>>>> >>>>>>> messing with the existing indices. >>>>> >>>>>>> >>>>> >>>>>>> One of the drawbacks is that creating the map inline is= >>>>> historically >>>>> >>>>>>> awkward in Java. I know it=E2=80=99s an anti-pattern to= use >>>>> voluminously, but >>>>> >>>>>>> double brace initialization would clean up the aestheti= cs. >>>>> >>>>>>> >>>>> >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler >>>>> > >>>>> >>>>> wrote: >>>>> >>>>>>>> Hi Ivan, >>>>> >>>>>>>> >>>>> >>>>>>>> Thanks for the update. >>>>> >>>>>>>> >>>>> >>>>>>>> FWIW, I agree with Matthias that the current "start >>>> branching" >>>>> >>>>> operator >>>>> >>>>>>> is >>>>> >>>>>>>> confusing when named the same way as the actual branch= es. >>>>> "Split" >>>>> >>>>> seems >>>>> >>>>>>>> like a good name. Alternatively, we can do without a "= start >>>>> >>>>> branching" >>>>> >>>>>>>> operator at all, and just do: >>>>> >>>>>>>> >>>>> >>>>>>>> stream >>>>> >>>>>>>> .branch(Predicate) >>>>> >>>>>>>> .branch(Predicate) >>>>> >>>>>>>> .defaultBranch(); >>>>> >>>>>>>> >>>>> >>>>>>>> Tentatively, I think that this branching operation sho= uld be >>>>> >>>>> terminal. >>>>> >>>>>>> That >>>>> >>>>>>>> way, we don't create ambiguity about how to use it. Th= at >>>>> is, `branch` >>>>> >>>>>>>> should return `KBranchedStream`, while `defaultBranch`= is >>>>> `void`, to >>>>> >>>>>>>> enforce that it comes last, and that there is only one= >>>>> definition of >>>>> >>>>>> the >>>>> >>>>>>>> default branch. Potentially, we should log a warning i= f >>>>> there's no >>>>> >>>>>>> default, >>>>> >>>>>>>> and additionally log a warning (or throw an exception)= if a >>>>> record >>>>> >>>>>> falls >>>>> >>>>>>>> though with no default. >>>>> >>>>>>>> >>>>> >>>>>>>> Thoughts? >>>>> >>>>>>>> >>>>> >>>>>>>> Thanks, >>>>> >>>>>>>> -John >>>>> >>>>>>>> >>>>> >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax < >>>>> >>>>> matthias@confluent.io >>>>> >>>>>>>> wrote: >>>>> >>>>>>>> >>>>> >>>>>>>>> Thanks for updating the KIP and your answers. >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>>> this is to make the name similar to String#split >>>>> >>>>>>>>>>> that also returns an array, right? >>>>> >>>>>>>>> The intend was to avoid name duplication. The return = type >>>>> should >>>>> >>>>>> _not_ >>>>> >>>>>>>>> be an array. >>>>> >>>>>>>>> >>>>> >>>>>>>>> The current proposal is >>>>> >>>>>>>>> >>>>> >>>>>>>>> stream.branch() >>>>> >>>>>>>>> .branch(Predicate) >>>>> >>>>>>>>> .branch(Predicate) >>>>> >>>>>>>>> .defaultBranch(); >>>>> >>>>>>>>> >>>>> >>>>>>>>> IMHO, this reads a little odd, because the first >>>>> `branch()` does >>>>> >>>>> not >>>>> >>>>>>>>> take any parameters and has different semantics than = the >>>> later >>>>> >>>>>>>>> `branch()` calls. Note, that from the code snippet ab= ove, >>>> it's >>>>> >>>>> hidden >>>>> >>>>>>>>> that the first call is `KStream#branch()` while the o= thers >>>> are >>>>> >>>>>>>>> `KBranchedStream#branch()` what makes reading the cod= e >>>> harder. >>>>> >>>>>>>>> >>>>> >>>>>>>>> Because I suggested to rename `addBranch()` -> `branc= h()`, >>>>> I though >>>>> >>>>>> it >>>>> >>>>>>>>> might be better to also rename `KStream#branch()` to = avoid >>>> the >>>>> >>>>> naming >>>>> >>>>>>>>> overlap that seems to be confusing. The following rea= ds >>>> much >>>>> >>>>> cleaner >>>>> >>>>>> to >>>>> >>>>>>>> me: >>>>> >>>>>>>>> stream.split() >>>>> >>>>>>>>> .branch(Predicate) >>>>> >>>>>>>>> .branch(Predicate) >>>>> >>>>>>>>> .defaultBranch(); >>>>> >>>>>>>>> >>>>> >>>>>>>>> Maybe there is a better alternative to `split()` thou= gh to >>>>> avoid >>>>> >>>>> the >>>>> >>>>>>>>> naming overlap. >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>>> 'default' is, however, a reserved word, so unfortuna= tely >>>> we >>>>> >>>>> cannot >>>>> >>>>>>> have >>>>> >>>>>>>>> a method with such name :-) >>>>> >>>>>>>>> >>>>> >>>>>>>>> Bummer. Didn't consider this. Maybe we can still come= up >>>>> with a >>>>> >>>>> short >>>>> >>>>>>>> name? >>>>> >>>>>>>>> >>>>> >>>>>>>>> Can you add the interface `KBranchedStream` to the KI= P >>>>> with all >>>>> >>>>> it's >>>>> >>>>>>>>> methods? It will be part of public API and should be >>>>> contained in >>>>> >>>>> the >>>>> >>>>>>>>> KIP. For example, it's unclear atm, what the return t= ype of >>>>> >>>>>>>>> `defaultBranch()` is. >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> You did not comment on the idea to add a >>>>> `KBranchedStream#get(int >>>>> >>>>>>> index) >>>>> >>>>>>>>> -> KStream` method to get the individually >>>>> branched-KStreams. Would >>>>> >>>>>> be >>>>> >>>>>>>>> nice to get your feedback about it. It seems you sugg= est >>>>> that users >>>>> >>>>>>>>> would need to write custom utility code otherwise, to= >>>>> access them. >>>>> >>>>> We >>>>> >>>>>>>>> should discuss the pros and cons of both approaches. = It >>>> feels >>>>> >>>>>>>>> "incomplete" to me atm, if the API has no built-in su= pport >>>>> to get >>>>> >>>>> the >>>>> >>>>>>>>> branched-KStreams directly. >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> -Matthias >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote: >>>>> >>>>>>>>>> Hi all! >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> I have updated the KIP-418 according to the new visi= on. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Matthias, thanks for your comment! >>>>> >>>>>>>>>> >>>>> >>>>>>>>>>> Renaming KStream#branch() -> #split() >>>>> >>>>>>>>>> I can see your point: this is to make the name simil= ar to >>>>> >>>>>>> String#split >>>>> >>>>>>>>>> that also returns an array, right? But is it worth t= he >>>>> loss of >>>>> >>>>>>>> backwards >>>>> >>>>>>>>>> compatibility? We can have overloaded branch() as we= ll >>>>> without >>>>> >>>>>>>> affecting >>>>> >>>>>>>>>> the existing code. Maybe the old array-based `branch= ` >>>> method >>>>> >>>>> should >>>>> >>>>>>> be >>>>> >>>>>>>>>> deprecated, but this is a subject for discussion. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>>> Renaming KBranchedStream#addBranch() -> >>>>> >>>>> BranchingKStream#branch(), >>>>> >>>>>>>>>> KBranchedStream#defaultBranch() -> >>>> BranchingKStream#default() >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Totally agree with 'addBranch->branch' rename. 'defa= ult' >>>> is, >>>>> >>>>>>> however, a >>>>> >>>>>>>>>> reserved word, so unfortunately we cannot have a met= hod >>>>> with such >>>>> >>>>>>> name >>>>> >>>>>>>>> :-) >>>>> >>>>>>>>>>> defaultBranch() does take an `Predicate` as argumen= t, >>>> but I >>>>> >>>>> think >>>>> >>>>>>> that >>>>> >>>>>>>>>> is not required? >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Absolutely! I think that was just copy-paste error o= r >>>>> something. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Dear colleagues, >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> please revise the new version of the KIP and Paul's = PR >>>>> >>>>>>>>>> (https://github.com/apache/kafka/pull/6512) >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Any new suggestions/objections? >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Regards, >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Ivan >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax =D0=BF=D0=B8=D1=88= =D0=B5=D1=82: >>>>> >>>>>>>>>>> Thanks for driving the discussion of this KIP. It s= eems >>>> that >>>>> >>>>>>> everybody >>>>> >>>>>>>>>>> agrees that the current branch() method using array= s is >>>> not >>>>> >>>>>> optimal. >>>>> >>>>>>>>>>> I had a quick look into the PR and I like the overa= ll >>>>> proposal. >>>>> >>>>>>> There >>>>> >>>>>>>>>>> are some minor things we need to consider. I would >>>>> recommend the >>>>> >>>>>>>>>>> following renaming: >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> KStream#branch() -> #split() >>>>> >>>>>>>>>>> KBranchedStream#addBranch() -> BranchingKStream#bra= nch() >>>>> >>>>>>>>>>> KBranchedStream#defaultBranch() -> >>>>> BranchingKStream#default() >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> It's just a suggestion to get slightly shorter meth= od >>>> names. >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> In the current PR, defaultBranch() does take an >>>>> `Predicate` as >>>>> >>>>>>>> argument, >>>>> >>>>>>>>>>> but I think that is not required? >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> Also, we should consider KIP-307, that was recently= >>>>> accepted and >>>>> >>>>>> is >>>>> >>>>>>>>>>> currently implemented: >>>>> >>>>>>>>>>> >>>>> >>>>> >>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+t= o+define+custom+processor+names+with+KStreams+DSL >>>>> >>>>>>>>>>> Ie, we should add overloads that accepted a `Named`= >>>>> parameter. >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> For the issue that the created `KStream` object are= in >>>>> different >>>>> >>>>>>>> scopes: >>>>> >>>>>>>>>>> could we extend `KBranchedStream` with a `get(int >>>>> index)` method >>>>> >>>>>>> that >>>>> >>>>>>>>>>> returns the corresponding "branched" result `KStrea= m` >>>>> object? >>>>> >>>>>> Maybe, >>>>> >>>>>>>> the >>>>> >>>>>>>>>>> second argument of `addBranch()` should not be a >>>>> >>>>>> `Consumer` >>>>> >>>>>>>> but >>>>> >>>>>>>>>>> a `Function` and `get()` could ret= urn >>>>> whatever >>>>> >>>>>> the >>>>> >>>>>>>>>>> `Function` returns? >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> Finally, I would also suggest to update the KIP wit= h the >>>>> current >>>>> >>>>>>>>>>> proposal. That makes it easier to review. >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> -Matthias >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote: >>>>> >>>>>>>>>>>> Ivan, >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> I'm a bit of a novice here as well, but I think it= >>>>> makes sense >>>>> >>>>>> for >>>>> >>>>>>>> you >>>>> >>>>>>>>> to >>>>> >>>>>>>>>>>> revise the KIP and continue the discussion. Obvio= usly >>>>> we'll >>>>> >>>>> need >>>>> >>>>>>>> some >>>>> >>>>>>>>>>>> buy-in from committers that have actual binding vo= tes on >>>>> >>>>> whether >>>>> >>>>>>> the >>>>> >>>>>>>>> KIP >>>>> >>>>>>>>>>>> could be adopted. It would be great to hear if th= ey >>>>> think this >>>>> >>>>>> is >>>>> >>>>>>> a >>>>> >>>>>>>>> good >>>>> >>>>>>>>>>>> idea overall. I'm not sure if that happens just b= y >>>>> starting a >>>>> >>>>>>> vote, >>>>> >>>>>>>>> or if >>>>> >>>>>>>>>>>> there is generally some indication of interest >>>> beforehand. >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> That being said, I'll continue the discussion a bi= t: >>>>> assuming >>>>> >>>>> we >>>>> >>>>>> do >>>>> >>>>>>>>> move >>>>> >>>>>>>>>>>> forward the solution of "stream.branch() returns >>>>> >>>>>> KBranchedStream", >>>>> >>>>>>> do >>>>> >>>>>>>>> we >>>>> >>>>>>>>>>>> deprecate "stream.branch(...) returns KStream[]"? = I >>>> would >>>>> >>>>> favor >>>>> >>>>>>>>>>>> deprecating, since having two mutually exclusive A= PIs >>>> that >>>>> >>>>>>> accomplish >>>>> >>>>>>>>> the >>>>> >>>>>>>>>>>> same thing is confusing, especially when they're f= airly >>>>> similar >>>>> >>>>>>>>> anyway. We >>>>> >>>>>>>>>>>> just need to be sure we're not making something >>>>> >>>>>>> impossible/difficult >>>>> >>>>>>>>> that >>>>> >>>>>>>>>>>> is currently possible/easy. >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> Regarding my PR - I think the general structure wo= uld >>>> work, >>>>> >>>>> it's >>>>> >>>>>>>> just a >>>>> >>>>>>>>>>>> little sloppy overall in terms of naming and clari= ty. In >>>>> >>>>>>> particular, >>>>> >>>>>>>>>>>> passing in the "predicates" and "children" lists w= hich >>>> get >>>>> >>>>>> modified >>>>> >>>>>>>> in >>>>> >>>>>>>>>>>> KBranchedStream but read from all the way >>>>> KStreamLazyBranch is >>>>> >>>>> a >>>>> >>>>>>> bit >>>>> >>>>>>>>>>>> complicated to follow. >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> Thanks, >>>>> >>>>>>>>>>>> Paul >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev < >>>>> >>>>>> iponomarev@mail.ru >>>>> >>>>>>>>> wrote: >>>>> >>>>>>>>>>>>> Hi Paul! >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> I read your code carefully and now I am fully >>>>> convinced: your >>>>> >>>>>>>> proposal >>>>> >>>>>>>>>>>>> looks better and should work. We just have to doc= ument >>>> the >>>>> >>>>>> crucial >>>>> >>>>>>>>> fact >>>>> >>>>>>>>>>>>> that KStream consumers are invoked as they're add= ed. >>>>> And then >>>>> >>>>>> it's >>>>> >>>>>>>> all >>>>> >>>>>>>>>>>>> going to be very nice. >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> What shall we do now? I should re-write the KIP a= nd >>>>> resume the >>>>> >>>>>>>>>>>>> discussion here, right? >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> Why are you telling that your PR 'should not be e= ven a >>>>> >>>>> starting >>>>> >>>>>>>> point >>>>> >>>>>>>>> if >>>>> >>>>>>>>>>>>> we go in this direction'? To me it looks like a g= ood >>>>> starting >>>>> >>>>>>> point. >>>>> >>>>>>>>> But >>>>> >>>>>>>>>>>>> as a novice in this project I might miss some imp= ortant >>>>> >>>>> details. >>>>> >>>>>>>>>>>>> Regards, >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> Ivan >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen =D0=BF=D0=B8=D1=88=D0= =B5=D1=82: >>>>> >>>>>>>>>>>>>> Ivan, >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> Maybe I=E2=80=99m missing the point, but I belie= ve the >>>>> >>>>> stream.branch() >>>>> >>>>>>>>> solution >>>>> >>>>>>>>>>>>> supports this. The couponIssuer::set* consumers w= ill be >>>>> >>>>> invoked >>>>> >>>>>> as >>>>> >>>>>>>>> they=E2=80=99re >>>>> >>>>>>>>>>>>> added, not during streamsBuilder.build(). So the = user >>>>> still >>>>> >>>>>> ought >>>>> >>>>>>> to >>>>> >>>>>>>>> be >>>>> >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward and= >>>>> depend on >>>>> >>>>> the >>>>> >>>>>>>>> branched >>>>> >>>>>>>>>>>>> streams having been set. >>>>> >>>>>>>>>>>>>> The issue I mean to point out is that it is hard= to >>>>> access >>>>> >>>>> the >>>>> >>>>>>>>> branched >>>>> >>>>>>>>>>>>> streams in the same scope as the original stream = (that >>>>> is, not >>>>> >>>>>>>> inside >>>>> >>>>>>>>> the >>>>> >>>>>>>>>>>>> couponIssuer), which is a problem with both propo= sed >>>>> >>>>> solutions. >>>>> >>>>>> It >>>>> >>>>>>>>> can be >>>>> >>>>>>>>>>>>> worked around though. >>>>> >>>>>>>>>>>>>> [Also, great to hear additional interest in 401,= I=E2=80=99m >>>>> excited >>>>> >>>>> to >>>>> >>>>>>>> hear >>>>> >>>>>>>>>>>>> your thoughts!] >>>>> >>>>>>>>>>>>>> Paul >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev < >>>>> >>>>>> iponomarev@mail.ru >>>>> >>>>>>>>> wrote: >>>>> >>>>>>>>>>>>>>> Hi Paul! >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> The idea to postpone the wiring of branches to = the >>>>> >>>>>>>>>>>>> streamsBuilder.build() also looked great for me a= t >>>> first >>>>> >>>>> glance, >>>>> >>>>>>> but >>>>> >>>>>>>>> --- >>>>> >>>>>>>>>>>>>>>> the newly branched streams are not available i= n the >>>>> same >>>>> >>>>>> scope >>>>> >>>>>>> as >>>>> >>>>>>>>> each >>>>> >>>>>>>>>>>>> other. That is, if we wanted to merge them back >>>> together >>>>> >>>>> again >>>>> >>>>>> I >>>>> >>>>>>>>> don't see >>>>> >>>>>>>>>>>>> a way to do that. >>>>> >>>>>>>>>>>>>>> You just took the words right out of my mouth, = I was >>>>> just >>>>> >>>>>> going >>>>> >>>>>>> to >>>>> >>>>>>>>>>>>> write in details about this issue. >>>>> >>>>>>>>>>>>>>> Consider the example from Bill's book, p. 101: = say >>>>> we need >>>>> >>>>> to >>>>> >>>>>>>>> identify >>>>> >>>>>>>>>>>>> customers who have bought coffee and made a purch= ase >>>>> in the >>>>> >>>>>>>>> electronics >>>>> >>>>>>>>>>>>> store to give them coupons. >>>>> >>>>>>>>>>>>>>> This is the code I usually write under these >>>>> circumstances >>>>> >>>>>> using >>>>> >>>>>>>> my >>>>> >>>>>>>>>>>>> 'brancher' class: >>>>> >>>>>>>>>>>>>>> @Setter >>>>> >>>>>>>>>>>>>>> class CouponIssuer{ >>>>> >>>>>>>>>>>>>>> private KStream<....> coffePurchases; >>>>> >>>>>>>>>>>>>>> private KStream<....> electronicsPurchases; >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> KStream<...> coupons(){ >>>>> >>>>>>>>>>>>>>> return >>>>> >>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatev= er >>>>> >>>>>>>>>>>>>>> /*In the real world the code here can be >>>>> complex, so >>>>> >>>>>>>>> creation of >>>>> >>>>>>>>>>>>> a separate CouponIssuer class is fully justified,= in >>>>> order to >>>>> >>>>>>>> separate >>>>> >>>>>>>>>>>>> classes' responsibilities.*/ >>>>> >>>>>>>>>>>>>>> } >>>>> >>>>>>>>>>>>>>> } >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> CouponIssuer couponIssuer =3D new CouponIssuer(= ); >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>() >>>>> >>>>>>>>>>>>>>> .branch(predicate1, >>>> couponIssuer::setCoffePurchases) >>>>> >>>>>>>>>>>>>>> .branch(predicate2, >>>>> >>>>>> couponIssuer::setElectronicsPurchases) >>>>> >>>>>>>>>>>>>>> .onTopOf(transactionStream); >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to wire = up >>>>> everything >>>>> >>>>>>>> later, >>>>> >>>>>>>>>>>>> without the terminal operation!!!*/ >>>>> >>>>>>>>>>>>>>> couponIssuer.coupons()... >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> Does this make sense? In order to properly >>>>> initialize the >>>>> >>>>>>>>> CouponIssuer >>>>> >>>>>>>>>>>>> we need the terminal operation to be called befor= e >>>>> >>>>>>>>> streamsBuilder.build() >>>>> >>>>>>>>>>>>> is called. >>>>> >>>>>>>>>>>>>>> [BTW Paul, I just found out that your KIP-401 i= s >>>>> essentially >>>>> >>>>>> the >>>>> >>>>>>>>> next >>>>> >>>>>>>>>>>>> KIP I was going to write here. I have some though= ts >>>>> based on >>>>> >>>>> my >>>>> >>>>>>>>> experience, >>>>> >>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.] >>>>> >>>>>>>>>>>>>>> Regards, >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> Ivan >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen =D0=BF=D0=B8=D1=88= =D0=B5=D1=82: >>>>> >>>>>>>>>>>>>>>> Ivan, >>>>> >>>>>>>>>>>>>>>> I tried to make a very rough proof of concept = of a >>>>> fluent >>>>> >>>>> API >>>>> >>>>>>>> based >>>>> >>>>>>>>>>>>> off of >>>>> >>>>>>>>>>>>>>>> KStream here >>>>> (https://github.com/apache/kafka/pull/6512), >>>>> >>>>>> and >>>>> >>>>>>> I >>>>> >>>>>>>>> think >>>>> >>>>>>>>>>>>> I >>>>> >>>>>>>>>>>>>>>> succeeded at removing both cons. >>>>> >>>>>>>>>>>>>>>> - Compatibility: I was incorrect earlier ab= out >>>>> >>>>>>> compatibility >>>>> >>>>>>>>>>>>> issues, >>>>> >>>>>>>>>>>>>>>> there aren't any direct ones. I was unawar= e >>>>> that Java >>>>> >>>>> is >>>>> >>>>>>>> smart >>>>> >>>>>>>>>>>>> enough to >>>>> >>>>>>>>>>>>>>>> distinguish between a branch(varargs...) >>>>> returning one >>>>> >>>>>>> thing >>>>> >>>>>>>>> and >>>>> >>>>>>>>>>>>> branch() >>>>> >>>>>>>>>>>>>>>> with no arguments returning another thing. >>>>> >>>>>>>>>>>>>>>> - Requiring a terminal method: We don't act= ually >>>>> need >>>>> >>>>> it. >>>>> >>>>>>> We >>>>> >>>>>>>>> can >>>>> >>>>>>>>>>>>> just >>>>> >>>>>>>>>>>>>>>> build up the branches in the KBranchedStrea= m who >>>>> shares >>>>> >>>>>> its >>>>> >>>>>>>>> state >>>>> >>>>>>>>>>>>> with the >>>>> >>>>>>>>>>>>>>>> ProcessorSupplier that will actually do the= >>>>> branching. >>>>> >>>>>>> It's >>>>> >>>>>>>>> not >>>>> >>>>>>>>>>>>> terribly >>>>> >>>>>>>>>>>>>>>> pretty in its current form, but I think it >>>>> demonstrates >>>>> >>>>>> its >>>>> >>>>>>>>>>>>> feasibility. >>>>> >>>>>>>>>>>>>>>> To be clear, I don't think that pull request s= hould >>>> be >>>>> >>>>> final >>>>> >>>>>> or >>>>> >>>>>>>>> even a >>>>> >>>>>>>>>>>>>>>> starting point if we go in this direction, I j= ust >>>>> wanted to >>>>> >>>>>> see >>>>> >>>>>>>> how >>>>> >>>>>>>>>>>>>>>> challenging it would be to get the API working= =2E >>>>> >>>>>>>>>>>>>>>> I will say though, that I'm not sure the exist= ing >>>>> solution >>>>> >>>>>>> could >>>>> >>>>>>>> be >>>>> >>>>>>>>>>>>>>>> deprecated in favor of this, which I had origi= nally >>>>> >>>>> suggested >>>>> >>>>>>>> was a >>>>> >>>>>>>>>>>>>>>> possibility. The reason is that the newly bra= nched >>>>> streams >>>>> >>>>>> are >>>>> >>>>>>>> not >>>>> >>>>>>>>>>>>>>>> available in the same scope as each other. Th= at >>>>> is, if we >>>>> >>>>>>> wanted >>>>> >>>>>>>>> to >>>>> >>>>>>>>>>>>> merge >>>>> >>>>>>>>>>>>>>>> them back together again I don't see a way to = do >>>>> that. The >>>>> >>>>>> KIP >>>>> >>>>>>>>>>>>> proposal >>>>> >>>>>>>>>>>>>>>> has the same issue, though - all this means is= that >>>> for >>>>> >>>>>> either >>>>> >>>>>>>>>>>>> solution, >>>>> >>>>>>>>>>>>>>>> deprecating the existing branch(...) is not on= the >>>>> table. >>>>> >>>>>>>>>>>>>>>> Thanks, >>>>> >>>>>>>>>>>>>>>> Paul >>>>> >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomar= ev < >>>>> >>>>>>>>> iponomarev@mail.ru > >>>>> >>>>>>>>>>>>> wrote: >>>>> >>>>>>>>>>>>>>>>> OK, let me summarize what we have discussed u= p to >>>> this >>>>> >>>>>> point. >>>>> >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed tha= t >>>>> branch API >>>>> >>>>>>> needs >>>>> >>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP. >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> There are two potential ways to do it: >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> 1. (as origianlly proposed) >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>() >>>>> >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->..) >>>>> >>>>>>>>>>>>>>>>> .branch(predicate2, ks->..) >>>>> >>>>>>>>>>>>>>>>> .defaultBranch(ks->..) //optional >>>>> >>>>>>>>>>>>>>>>> .onTopOf(stream).mapValues(...).... //onTo= pOf >>>>> returns >>>>> >>>>>> its >>>>> >>>>>>>>> argument >>>>> >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) The c= ode >>>> won't >>>>> >>>>> make >>>>> >>>>>>>> sense >>>>> >>>>>>>>>>>>> until >>>>> >>>>>>>>>>>>>>>>> all the necessary ingredients are provided. >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> CONS: The need to create a KafkaStreamsBranch= er >>>>> instance >>>>> >>>>>>>>> contrasts the >>>>> >>>>>>>>>>>>>>>>> fluency of other KStream methods. >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> 2. (as Paul proposes) >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> stream >>>>> >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->...) >>>>> >>>>>>>>>>>>>>>>> .branch(predicate2, ks->...) >>>>> >>>>>>>>>>>>>>>>> .defaultBranch(ks->...) //or noDefault(). = Both >>>>> >>>>>>>>> defaultBranch(..) >>>>> >>>>>>>>>>>>> and >>>>> >>>>>>>>>>>>>>>>> noDefault() return void >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams inte= rface >>>> is >>>>> >>>>>> defined. >>>>> >>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods >>>>> >>>>>>>> (defaultBranch(ks->) >>>>> >>>>>>>>> and >>>>> >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very easy = to >>>>> miss the >>>>> >>>>>> fact >>>>> >>>>>>>>> that one >>>>> >>>>>>>>>>>>>>>>> of the terminal methods should be called. If = these >>>>> methods >>>>> >>>>>> are >>>>> >>>>>>>> not >>>>> >>>>>>>>>>>>>>>>> called, we can throw an exception in runtime.= >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can we do= >>>> better? >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> Regards, >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> Ivan >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev =D0=BF=D0=B8= =D1=88=D0=B5=D1=82: >>>>> >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev =D0=BF=D0=B8= =D1=88=D0=B5=D1=82: >>>>> >>>>>>>>>>>>>>>>>>> Paul, >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>> I see your point when you are talking about= >>>>> >>>>>>>>>>>>>>>>>>> stream..branch..branch...default.. >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be >>>>> implemented the >>>>> >>>>>>> easy >>>>> >>>>>>>>> way. >>>>> >>>>>>>>>>>>>>>>>>> Maybe we all should think further. >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas. >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>> user could specify a terminal method that >>>> assumes >>>>> >>>>> nothing >>>>> >>>>>>>> will >>>>> >>>>>>>>>>>>> reach >>>>> >>>>>>>>>>>>>>>>>>>> the default branch, >>>>> >>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs= =2E >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be the on= ly >>>> option >>>>> >>>>>> besides >>>>> >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios when= we >>>>> want to >>>>> >>>>>> just >>>>> >>>>>>>>> silently >>>>> >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any >>>>> predicate. 2) >>>>> >>>>>>> Throwing >>>>> >>>>>>>>> an >>>>> >>>>>>>>>>>>>>>>>>> exception in the middle of data flow proces= sing >>>>> looks >>>>> >>>>>> like a >>>>> >>>>>>>> bad >>>>> >>>>>>>>>>>>> idea. >>>>> >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would pref= er to >>>>> emit a >>>>> >>>>>>>> special >>>>> >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is exac= tly >>>> where >>>>> >>>>>>> `default` >>>>> >>>>>>>>> can >>>>> >>>>>>>>>>>>> be >>>>> >>>>>>>>>>>>>>>>>>> used. >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the >>>>> >>>>> InternalTopologyBuilder >>>>> >>>>>>> to >>>>> >>>>>>>>> track >>>>> >>>>>>>>>>>>>>>>>>>> dangling >>>>> >>>>>>>>>>>>>>>>>>> branches that haven't been terminated and r= aise >>>>> a clear >>>>> >>>>>>> error >>>>> >>>>>>>>>>>>> before it >>>>> >>>>>>>>>>>>>>>>>>> becomes an issue. >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the prog= ram is >>>>> >>>>> compiled >>>>> >>>>>>> and >>>>> >>>>>>>>> run? >>>>> >>>>>>>>>>>>>>>>>>> Well, I'd prefer an API that simply won't >>>>> compile if >>>>> >>>>> used >>>>> >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a >>>>> method chain >>>>> >>>>>>>> starting >>>>> >>>>>>>>>>>>> from >>>>> >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost differ= ence >>>>> between >>>>> >>>>>>>> runtime >>>>> >>>>>>>>> and >>>>> >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure unco= vers >>>>> >>>>> instantly >>>>> >>>>>> on >>>>> >>>>>>>>> unit >>>>> >>>>>>>>>>>>>>>>>>> tests, it costs more for the project than a= >>>>> compilation >>>>> >>>>>>>> failure. >>>>> >>>>>>>>>>>>>>>>>>> Regards, >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>> Ivan >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen =D0=BF=D0=B8=D1= =88=D0=B5=D1=82: >>>>> >>>>>>>>>>>>>>>>>>>> Ivan, >>>>> >>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>> Good point about the terminal operation be= ing >>>>> required. >>>>> >>>>>>> But >>>>> >>>>>>>> is >>>>> >>>>>>>>>>>>> that >>>>> >>>>>>>>>>>>>>>>>>>> really >>>>> >>>>>>>>>>>>>>>>>>>> such a bad thing? If the user doesn't wan= t a >>>>> >>>>>> defaultBranch >>>>> >>>>>>>>> they >>>>> >>>>>>>>>>>>> can >>>>> >>>>>>>>>>>>>>>>>>>> call >>>>> >>>>>>>>>>>>>>>>>>>> some other terminal method (noDefaultBranc= h()?) >>>>> just as >>>>> >>>>>>>>> easily. In >>>>> >>>>>>>>>>>>>>>>>>>> fact I >>>>> >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a nice= r API >>>> - a >>>>> >>>>> user >>>>> >>>>>>>> could >>>>> >>>>>>>>>>>>> specify >>>>> >>>>>>>>>>>>>>>>> a >>>>> >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing will = reach >>>> the >>>>> >>>>>> default >>>>> >>>>>>>>> branch, >>>>> >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case occur= s. >>>> That >>>>> >>>>> seems >>>>> >>>>>>> like >>>>> >>>>>>>>> an >>>>> >>>>>>>>>>>>>>>>>>>> improvement over the current branch() API,= >>>>> which allows >>>>> >>>>>> for >>>>> >>>>>>>> the >>>>> >>>>>>>>>>>>> more >>>>> >>>>>>>>>>>>>>>>>>>> subtle >>>>> >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting >>>> dropped. >>>>> >>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>> The need for a terminal operation certainl= y has >>>>> to be >>>>> >>>>>> well >>>>> >>>>>>>>>>>>>>>>>>>> documented, but >>>>> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the >>>>> >>>>> InternalTopologyBuilder >>>>> >>>>>>> to >>>>> >>>>>>>>> track >>>>> >>>>>>>>>>>>>>>>>>>> dangling >>>>> >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated and = raise >>>>> a clear >>>>> >>>>>>> error >>>>> >>>>>>>>>>>>> before it >>>>> >>>>>>>>>>>>>>>>>>>> becomes an issue. Especially now that the= re is >>>> a >>>>> >>>>> "build >>>>> >>>>>>>> step" >>>>> >>>>>>>>>>>>> where >>>>> >>>>>>>>>>>>>>>>> the >>>>> >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when >>>>> >>>>>> StreamsBuilder.build() >>>>> >>>>>>> is >>>>> >>>>>>>>>>>>> called. >>>>> >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its argument= , I >>>> agree >>>>> >>>>> that >>>>> >>>>>>> it's >>>>> >>>>>>>>>>>>>>>>>>>> critical to >>>>> >>>>>>>>>>>>>>>>>>>> allow users to do other operations on the = input >>>>> stream. >>>>> >>>>>>> With >>>>> >>>>>>>>> the >>>>> >>>>>>>>>>>>>>>>> fluent >>>>> >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same way al= l >>>> other >>>>> >>>>>>> operations >>>>> >>>>>>>>> do - >>>>> >>>>>>>>>>>>> if >>>>> >>>>>>>>>>>>>>>>> you >>>>> >>>>>>>>>>>>>>>>>>>> want to process off the original KStream >>>> multiple >>>>> >>>>> times, >>>>> >>>>>>> you >>>>> >>>>>>>>> just >>>>> >>>>>>>>>>>>>>>>>>>> need the >>>>> >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call as ma= ny >>>>> operations >>>>> >>>>>> on >>>>> >>>>>>> it >>>>> >>>>>>>>> as >>>>> >>>>>>>>>>>>> you >>>>> >>>>>>>>>>>>>>>>>>>> desire. >>>>> >>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>> Thoughts? >>>>> >>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>> Best, >>>>> >>>>>>>>>>>>>>>>>>>> Paul >>>>> >>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponom= arev < >>>>> >>>>>>>>> iponomarev@mail.ru >>>>> >>>>>>>>>>>>>>>>>>>> wrote: >>>>> >>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>> Hello Paul, >>>>> >>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we do no= t >>>>> always need >>>>> >>>>>> the >>>>> >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal >>>> operation we >>>>> >>>>> don't >>>>> >>>>>>>> know >>>>> >>>>>>>>>>>>> when to >>>>> >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'. >>>>> >>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its argum= ent, >>>>> so we >>>>> >>>>> can >>>>> >>>>>> do >>>>> >>>>>>>>>>>>> something >>>>> >>>>>>>>>>>>>>>>>>>>> more with the original branch after branc= hing. >>>>> >>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>> I understand your point that the need of >>>> special >>>>> >>>>> object >>>>> >>>>>>>>>>>>> construction >>>>> >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream met= hods. >>>> But >>>>> >>>>> here >>>>> >>>>>> we >>>>> >>>>>>>>> have a >>>>> >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to spli= t the >>>>> flow, >>>>> >>>>> so >>>>> >>>>>> I >>>>> >>>>>>>>> think >>>>> >>>>>>>>>>>>> this >>>>> >>>>>>>>>>>>>>>>> is >>>>> >>>>>>>>>>>>>>>>>>>>> still idiomatic. >>>>> >>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>> Regards, >>>>> >>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>> Ivan >>>>> >>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen =D0=BF=D0=B8= =D1=88=D0=B5=D1=82: >>>>> >>>>>>>>>>>>>>>>>>>>>> Ivan, >>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve thi= s >>>>> API, but I >>>>> >>>>>> find >>>>> >>>>>>>> the >>>>> >>>>>>>>>>>>>>>>>>>>>> onTopOff() >>>>> >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it >>>>> contrasts the >>>>> >>>>>>> fluency >>>>> >>>>>>>>> of >>>>> >>>>>>>>>>>>> other >>>>> >>>>>>>>>>>>>>>>>>>>>> KStream method calls. Ideally I'd like = to >>>>> just call >>>>> >>>>> a >>>>> >>>>>>>>> method on >>>>> >>>>>>>>>>>>> the >>>>> >>>>>>>>>>>>>>>>>>>>> stream >>>>> >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if the b= ranch >>>>> cases >>>>> >>>>> are >>>>> >>>>>>>>> defined >>>>> >>>>>>>>>>>>>>>>>>>>>> fluently. >>>>> >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, handleC= ase) >>>>> is very >>>>> >>>>>> nice >>>>> >>>>>>>>> and the >>>>> >>>>>>>>>>>>>>>>>>>>>> right >>>>> >>>>>>>>>>>>>>>>>>>>> way >>>>> >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped aro= und >>>>> how we >>>>> >>>>>>> specify >>>>> >>>>>>>>> the >>>>> >>>>>>>>>>>>> source >>>>> >>>>>>>>>>>>>>>>>>>>>> stream. >>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>> Like: >>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>> stream.branch() >>>>> >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, >>>> this::handle1) >>>>> >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, >>>> this::handle2) >>>>> >>>>>>>>>>>>>>>>>>>>>> .defaultBranch(this::handleDef= ault); >>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a KBranchedStream= s or >>>>> >>>>>>>> KStreamBrancher >>>>> >>>>>>>>> or >>>>> >>>>>>>>>>>>>>>>>>>>> something, >>>>> >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and >>>>> terminated by >>>>> >>>>>>>>>>>>> defaultBranch() >>>>> >>>>>>>>>>>>>>>>>>>>>> (which >>>>> >>>>>>>>>>>>>>>>>>>>>> returns void). This is obviously >>>>> incompatible with >>>>> >>>>> the >>>>> >>>>>>>>> current >>>>> >>>>>>>>>>>>>>>>>>>>>> API, so >>>>> >>>>>>>>>>>>>>>>>>>>> the >>>>> >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to have a= >>>>> different >>>>> >>>>>> name, >>>>> >>>>>>>> but >>>>> >>>>>>>>> that >>>>> >>>>>>>>>>>>>>>>>>>>>> seems >>>>> >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we could c= all it >>>>> >>>>>> something >>>>> >>>>>>>> like >>>>> >>>>>>>>>>>>>>>>>>>>>> branched() >>>>> >>>>>>>>>>>>>>>>>>>>> or >>>>> >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the old = API. >>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of you= r >>>>> KIP? It >>>>> >>>>>> seems >>>>> >>>>>>>>> like it >>>>> >>>>>>>>>>>>>>>>>>>>>> does to >>>>> >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line branching= >>>>> while also >>>>> >>>>>>>> allowing >>>>> >>>>>>>>> you >>>>> >>>>>>>>>>>>> to >>>>> >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of >>>>> KBranchedStreams >>>>> >>>>>> if >>>>> >>>>>>>>> desired. >>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>> >>>>>>>>>>>>>>>>>>>>>> Paul >>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Pon= omarev >>>>> >>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Bill, >>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply! >>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it: >>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>> void handleFirstCase(KStream >>>>> ks){ >>>>> >>>>>>>>>>>>>>>>>>>>>>> ks.filter(....).mapValues(...= ) >>>>> >>>>>>>>>>>>>>>>>>>>>>> } >>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream>>>> String> ks){ >>>>> >>>>>>>>>>>>>>>>>>>>>>> ks.selectKey(...).groupByKey(= )... >>>>> >>>>>>>>>>>>>>>>>>>>>>> } >>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>> ...... >>>>> >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher() >>>>> >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, >>>>> this::handleFirstCase) >>>>> >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, >>>>> this::handleSecondCase) >>>>> >>>>>>>>>>>>>>>>>>>>>>> .onTopOf(....) >>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>> Ivan >>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck =D0=BF=D0=B8= =D1=88=D0=B5=D1=82: >>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan, >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the >>>> KafkaStreamsBrancher >>>>> >>>>> takes a >>>>> >>>>>>>>> Consumer >>>>> >>>>>>>>>>>>> as a >>>>> >>>>>>>>>>>>>>>>>>>>>>> second >>>>> >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, and th= e >>>>> example in >>>>> >>>>>> the >>>>> >>>>>>>> KIP >>>>> >>>>>>>>>>>>> shows >>>>> >>>>>>>>>>>>>>>>>>>>>>>> each >>>>> >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a termina= l node >>>>> >>>>>>>>> (KafkaStreams#to() >>>>> >>>>>>>>>>>>>>>>>>>>>>>> in this >>>>> >>>>>>>>>>>>>>>>>>>>>>>> case). >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but how w= ould >>>> we >>>>> >>>>> handle >>>>> >>>>>>> the >>>>> >>>>>>>>> case >>>>> >>>>>>>>>>>>>>>>>>>>>>>> where the >>>>> >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but wants to= >>>> continue >>>>> >>>>>>>> processing >>>>> >>>>>>>>> and >>>>> >>>>>>>>>>>>> not >>>>> >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on the= >>>> branched >>>>> >>>>>> stream >>>>> >>>>>>>>>>>>> immediately? >>>>> >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic as is= if >>>>> we had >>>>> >>>>>>>> something >>>>> >>>>>>>>> like >>>>> >>>>>>>>>>>>>>>>>>>>>>>> this: >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>> KStream[] branches =3D= >>>>> >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1, >>>>> >>>>>>>>>>>>>>>>>>>>>>>> predicate2); >>>>> >>>>>>>>>>>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...= ).. >>>>> >>>>>>>>>>>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey(= )..... >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks! >>>>> >>>>>>>>>>>>>>>>>>>>>>>> Bill >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill B= ejeck >>>> < >>>>> >>>>>>>>> bbejeck@gmail.com >>>>> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> All, >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the discussion= for >>>> KIP- >>>>> >>>>> 418. >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message: >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about >>>> KIP-418. >>>>> >>>>> Please >>>>> >>>>>>>> take >>>>> >>>>>>>>> a >>>>> >>>>>>>>>>>>> look >>>>> >>>>>>>>>>>>>>>>> at >>>>> >>>>>>>>>>>>>>>>>>>>> the >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would appreciate an= y >>>>> feedback :) >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418: >>>>> >>>>> >>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+metho= d-chaining+way+to+branch+KStream >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488: >>>>> >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488 >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164: >>>>> >>>>> https://github.com/apache/kafka/pull/6164 >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>> >>>>> > >>>>> >>>> >=20 --bEAP2XPcrW3hVWFfqHTg8icBuBdbftWGZ-- --bgOlCvD5pKs7bsCLRWqbvvgMqwy8MWVf8 Content-Type: application/pgp-signature; name="signature.asc" Content-Description: OpenPGP digital signature Content-Disposition: attachment; filename="signature.asc" -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIzBAEBCgAdFiEE8osu2CcCCF5douGQu8PBaGu5w1EFAl0nv3kACgkQu8PBaGu5 w1H/Wg//XsllMsRPXnrYfbDC2LPz9XEdKB3kYmXWfwVgHgSJLmKfv1v7H5+WZO7o W+WJVTumjcNE5/6c4KPJ932SC6Qxd3NMQk2OOdEoA0i65A6nwpgG0S9jQHKSehaH fRd+g8Dg4d+7ZMjOFsP1cyG5f+HOhVZ1Bl0Az8QAx0Um5REBiLD4azF2EMpMUnwB I6apzcTlSw+qVFbJswqQ4BeI3Ziu76i/UxfXpcLf4vYUYeb/g5F/akIZQrK2SpM1 SdkAX9FbBMoK0PNmgC7hn8Lf13bOabGpSag00gjC5XqUIB92CDVt7VJw2lrBIPx7 HOPQVYG2vTwR4yyl9ZveY7ldqDECMlflabcmtER4jKiZkaVlQkrFxUjv3rBNnP3M jS0AeBcpNk2Z8Ul95JjqnOZZmklfqgBs6c/RQDEcatDRSFqBbukLykaaUQXO/+hW MhqVYZ3kQwr84s3pKOWllJUIIFHF2G+9tdlOnHv1KdbF8UYhTWyddBSIXF2Nv+0S zg+IzhRZytCFdD380qbKm2jR8WXqBDe11PoHbGZFuKLNFTuzgpX9LbNFqMDkO6Vg 904qKw+wj7FqN6vEkWoP4kiXbclkIZhvAUcj6TPFMzIfV4Y7hhB5YoP5tYf7TLVu e4PYotVyXX7kAgHLYJc2Wk0fHle/rubipMV+iOIbESBsx8sxiB0= =qDF8 -----END PGP SIGNATURE----- --bgOlCvD5pKs7bsCLRWqbvvgMqwy8MWVf8--