From dev-return-97195-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Thu Aug 16 07:41:06 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 5A3DB180675 for ; Thu, 16 Aug 2018 07:41:05 +0200 (CEST) Received: (qmail 21263 invoked by uid 500); 16 Aug 2018 05:40:59 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 21251 invoked by uid 99); 16 Aug 2018 05:40:58 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Aug 2018 05:40:58 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id C7837180812 for ; Thu, 16 Aug 2018 05:40:57 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.188 X-Spam-Level: * X-Spam-Status: No, score=1.188 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, KAM_LINEPADDING=1.2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_PASS=-0.001, SPF_PASS=-0.001, T_DKIMWL_WL_MED=-0.01] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=trvoffice.onmicrosoft.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 7ZdSb4tr6gDO for ; Thu, 16 Aug 2018 05:40:53 +0000 (UTC) Received: from EUR03-AM5-obe.outbound.protection.outlook.com (mail-eopbgr30070.outbound.protection.outlook.com [40.107.3.70]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 37B7F5F3AF for ; Thu, 16 Aug 2018 05:40:53 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=trvoffice.onmicrosoft.com; s=selector1-trivago-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=9VTaxoAqw9BDkq4QWF5f61mNV/5T4XGIoRXDRNRbFNM=; b=DOWkys7J9V/cNM3i/Qhj1gbsLe7eTaPEVeyRYZxHiIFN1GuOO8JqNsnBazkgKJH0XyAePhtYi6yQp4Ne122u2R4swcWKD2r5Gz6NzLeJ3gHPmgmXV59pWJmagB1qTDMPJrJnCpehhNIKugHlFRdmfKMalPp05P5/I1Y0ZVXSFeU= Authentication-Results: spf=none (sender IP is ) smtp.mailfrom=Jan.Filipiak@trivago.com; Received: from [172.21.55.147] (195.225.132.2) by DB6PR0501MB2854.eurprd05.prod.outlook.com (10.172.226.145) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.1038.23; Thu, 16 Aug 2018 05:40:45 +0000 Message-ID: <5B750E38.80706@trivago.com> Date: Thu, 16 Aug 2018 07:40:08 +0200 From: Jan Filipiak User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:31.0) Gecko/20100101 Thunderbird/31.0 MIME-Version: 1.0 To: dev@kafka.apache.org Subject: Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted. References: <5B7134A0.1020000@trivago.com> <5B727921.9070403@trivago.com> In-Reply-To: Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 7bit X-Originating-IP: [195.225.132.2] X-ClientProxiedBy: HE1PR02CA0118.eurprd02.prod.outlook.com (10.170.249.47) To DB6PR0501MB2854.eurprd05.prod.outlook.com (10.172.226.145) X-MS-PublicTrafficType: Email X-MS-Office365-Filtering-Correlation-Id: 9f2760c0-3ce3-4b84-dc3c-08d6033ad08f X-Microsoft-Antispam: BCL:0;PCL:0;RULEID:(7020095)(4652040)(8989137)(5600074)(711020)(4534165)(4627221)(201703031133081)(201702281549075)(8990107)(2017052603328)(7153060)(7193020);SRVR:DB6PR0501MB2854; X-Microsoft-Exchange-Diagnostics: 1;DB6PR0501MB2854;3:f02Wy4PtUpgSbZH0+vPf1jV3dm2NjC3zjfEIcfrnQbz5c22YDm5XTSD27DukpLpNXgffgOnRyyLWR7UYpuLgPZ+YWxqCTx2eH1v1yQWzuTKRvHyVfA8RAkQuyDfnUjmqTDwi8pTe1XmaVf5CaFclnITfEtM6PGKMLFPINC+86aELCUO2pYeKdjxzl/IugxhBJi6z5rLoZDK2cvR4E3ZdPERn+6//ZDlDe/X5yPPWot6xnq0UXY40GohKviZOrjtv;25:AYuRso9YJiZur0TX6zwRzoSXeX8VcWGO8k7ay8a5o3OAwHXwyOiDqJvm69vJbPa2J+zANbUyvLbr2DUXHVm8FUXDDHDh3G0M/Ao2fPXM2aAjBUpIiyUfMjN0HzeWJ2JkHKK2LHQbC7t0Uiyv3iybXFk1z2yRkHLOaB7RpKR7WUIyCXEIbKdicnnGCn3rWzse4auBbxI+Jg9/1uPHKoensp3l6RYx0HBwR2vbbF+0H2qI9cPwtwU316QZvrc5OKztrYz0WnMvdV/igIcAEZG+6stbtatKFWUWZW1R0HtAhVPHlvz1wu5ZDkPtK86LVEdFExjzT3VR1IVC+i6OPC4YGQ==;31:+/F3vM0p8Ji5hOJhi27ZQQ+qTCxrkQ3cRLeUVKdXZhwK1eLESI/v0jJ9Ws4rBqtKztMa7KCjuD5tCzemua6c27wmT0fytasHzTMNuhbmk2nUkf/ZYRrJfpmwTy7S16Onacw0KwFP4qn3wdmiY+qrKeuABNuWcfx4eHaowPx0It8ihXUNRCJCnWHJ1NZquKDHSUUVrcO5sNRrVm3x6KbkoEDv0aRDWZwX6fSNh5J/ijY= X-MS-TrafficTypeDiagnostic: DB6PR0501MB2854: X-Microsoft-Exchange-Diagnostics: 1;DB6PR0501MB2854;20:NxeLWCkMU7cf6+5DmGE3qfAPr/SkbwHrfAUhDCyxfDudFlVs0q6pChz/Tjo0CR8pkD62wmvKAPnsBcx5Uz+7hfqlKluaqEVySrVQPIqlxPrfBggioBNabvynl1sbMxRNh1uFi+M1Pq/0dAXoUKawcVzzrz66DLDzwf2iXIYMto4+871WSMLZa0FZU19wMo4rDKZNyg5wiKa2rN+S+mFXChaOr/Vc0qut+mUy/gPTpnLFcVd8FINe4PKuVdXDaYnyRrFhIymhqvzAaRtN4u+Q7czdMt+baRvgtQFckQhhMmPcwaRnytFlmegbmI90qzAb0TLv2fdUjPccXVcTnwmOEtgve9oazPUFV8wN5n0jXZoua87kIEo8eIGta2KTnQo+cEanXe9ShQRB1hjsn+5KkSrZbvNJ4Gv9JTfvvm1rPkWeRKot4yCSZCQ1E2fiDwP2NY7UrDkkBYqM2RNFkugSDgmiN9Qpek4qNawW478/pcxfsNFJCvRgvStb8UfUud0WgTaXPwx3jMZ6COcolkEuGVm5V9GLj37jcwTsOjr2vsJi4OcZSPnKBRXg32kYqvxc5S5oLHf4hl4FCmSWIk4M3pIIJZFOIyEdvQeLod3+Qro= X-Microsoft-Antispam-PRVS: X-Exchange-Antispam-Report-Test: UriScan:(173065285818494)(134217032509453)(271806183753584)(131327999870524)(85827821059158)(190461294614860)(155532106045638)(21532816269658)(67729699691378)(148717330147763); X-MS-Exchange-SenderADCheck: 1 X-Exchange-Antispam-Report-CFA-Test: BCL:0;PCL:0;RULEID:(6040522)(2401047)(5005006)(8121501046)(3231311)(944501410)(52105095)(10201501046)(3002001)(93006095)(93001095)(149027)(150027)(6041310)(20161123564045)(20161123558120)(201703131423095)(201702281528075)(20161123555045)(201703061421075)(201703061406153)(20161123560045)(20161123562045)(201708071742011)(7699016);SRVR:DB6PR0501MB2854;BCL:0;PCL:0;RULEID:;SRVR:DB6PR0501MB2854; X-Microsoft-Exchange-Diagnostics: 1;DB6PR0501MB2854;4:163oLwFteTfSl5K3A/elYJkOSjLz/kK1s33g/rN1DsMHt4AsDmxpu0vqO2sSXBeJcnGKgYaqA1NVRDQ2Qb2zOCIXWy+J/0wYel7q9avc57jDIuq1X9TBC3YRBv+LzbsCt07ApBfnjYwAFMmkUY8pp4337C3YD4FCHCTrDR/60TzjQ/RjFqsLPiN1dCn8PTgzgxsdZNZao5wdJWr7duC3yonQxD1+PS4oR070xiqBBHRjTXcUzko+Z8JQtXxtWictR3DH+N7rAIRslbbcsPN9r0bNMDbg0Ktvz4RoebVG3Wp+xPWbXU9z1sW3YzDm+/jq+fpAoK3J9P3SqIk/9ASli9SJwrBFQ3buzG3zr7rqIVRX+gJh5MXe9dzmOv0qDh2kMPxywG/0Kfo1cMZeWa1x0H1TVcqjzrfAeHkEu6S9sA+gOlDHLcVXq0CeSgPbYCkT64ITmOmSjnqRTUJAYzaz8yfG0PC+ja4XI8JVWdhu16TCF9meyFdEeUM9MbRirk3xOxaCfxagNDHzl3Rs9TvZxLaHP8iDnQUYk8sjeHtE2kJn7htUu2d1O5FmK+zQUTDdbJpDJq7qiMDr30fAYH9ADY5ch/Q3uXio6g/8nnVhgFs+pU1jPngtdkWd9glm2V5y X-Forefront-PRVS: 07665BE9D1 X-Forefront-Antispam-Report: SFV:NSPM;SFS:(10009020)(6049001)(39850400004)(396003)(136003)(346002)(366004)(376002)(189003)(199004)(55674003)(52314003)(8936002)(23676004)(2486003)(52146003)(230700001)(2361001)(478600001)(5660300001)(6666003)(67846002)(6916009)(47776003)(2906002)(6116002)(53546011)(386003)(25786009)(7736002)(966005)(3846002)(65806001)(36756003)(76176011)(52116002)(86362001)(66066001)(65956001)(65816011)(305945005)(72206003)(956004)(53936002)(64126003)(186003)(93886005)(11346002)(6306002)(50466002)(90366009)(97736004)(446003)(486006)(16526019)(6246003)(229853002)(8676002)(476003)(2616005)(81156014)(81166006)(316002)(26005)(16576012)(77096007)(33656002)(105586002)(68736007)(106356001)(5024004)(2351001)(58126008)(14444005)(6486002);DIR:OUT;SFP:1101;SCL:1;SRVR:DB6PR0501MB2854;H:[172.21.55.147];FPR:;SPF:None;LANG:en;PTR:InfoNoRecords;MX:1;A:1; Received-SPF: None (protection.outlook.com: trivago.com does not designate permitted sender hosts) X-Microsoft-Exchange-Diagnostics: =?utf-8?B?MTtEQjZQUjA1MDFNQjI4NTQ7MjM6YVRiZlB0ZlNTQlA5ZURTN256SGhrUkRS?= =?utf-8?B?TVYrVnhEU0V2dlE0aG9wcnpneWZETE9Gb2M5ZzJpTU55Tjk1K0drNEQ1bU1m?= =?utf-8?B?dUp0ay9WWFNscDhmam5DcUdmVENsU1EyNGt4czEweUxJUXZoaVhMMElQL3pM?= =?utf-8?B?VEFwbzYwTGR1bWM4bHM0VVk1SDFTbGN2VnEvdHZDL1FjcUIwTDEyaXBJa1NG?= =?utf-8?B?MVY5Q0xyelZPSlgwcUpJVUxRUG5yLzZ3blBjL3NlVUt2VEZXanBXVGtIQlBF?= =?utf-8?B?ZDFkOHErL3Btc3JPRzRILzNSS1dudFdFeDlGMkRmVXdHMks4dThEN3g1TXk3?= =?utf-8?B?S1RSbDFPclFESVc1d1V0akZ3a053VDdCeGo3YzlyVElMaTJ3ckxHZnZrd1ll?= =?utf-8?B?N2xyVnBZeXpPSGF6VEsydmw2VnM3WHVQL2VoZTYrOEdFSDBSMHFDdm9WdDFv?= =?utf-8?B?VklrRGVId1M5ZzF1My9STXpwT25peWxodWVyUks5NDB1ZlJydnpjWU1KTk44?= =?utf-8?B?NzZNT3QzZVYxY1RsNFVZVWFHMW5vMThsWXVrOElDajFFVkFHbXJLTWNtektD?= =?utf-8?B?UUFkMjQyeHF6Y0lHYkM4U2JIUnpYSGhZRTdQNGJsRjQ5dVZBSTB5UzRidDJU?= =?utf-8?B?WE9qL2FtQmtvOGVoNzZoUnZMT25lYjltTGN3dEdWb3JaSy91WGhUVTZDS0JP?= =?utf-8?B?WGJHaEpUREt0dit3ekxEWlN1NWcrM2Fsb0tsbGNTdVVvMCtMVHNrbkkyL1NI?= =?utf-8?B?YUE3NmZzSTNnQTFjbDNpYlVQYnRqR1p3TFR0REl1Z0x6d0JNaEZ5dHkzdXZX?= =?utf-8?B?QlIxMjk4dDlBTmhjMlJvMEY5VjB3ZDk0TldXc3RZMktMc3BSWGs5d25NVWxx?= =?utf-8?B?Wjd6SEU4Tmx5Ukt3NkdQK2Q3MG4yd0F3M1p4ZUZXSUlwY0dhYU9Kci9qTGpV?= =?utf-8?B?b1hyaitvTWtyZkhnRUE0WS9zbjRRUUpiMjJNMFg3YmxiQk1vSlErTEtYVUMw?= =?utf-8?B?S0R5aU8ybzF6Ny9UblFPRVNBTGdvWDU3YmI0UE9lMEhFd2I2WklWTzJVNmpK?= =?utf-8?B?UUxXRmFLTkIrbzhrdXVrQTlMTE00TnVmVWZuc2w2dGlFQ3ZKUnl3dHFZK2d1?= =?utf-8?B?YktlbnFRVitkc1l4amV6bys1MHFsdmZVd2xUU3M2RE5HUTdKOXQrMTN6T2R1?= =?utf-8?B?MjlpUUhrYldWUWdKcFYzZXkwNktFODVWWGt1U0N5bjZ4USt0bGhWbDc1clZm?= =?utf-8?B?RVRSRFRMQStWTGZCNXRPK0lHUDJ1WW04V0NOS0ovQWw1UXJYa0NIb3BRSStV?= =?utf-8?B?aWovYVBiOFc2czRYWk9ERW83N2t4Tlp5d0RKYVg5Q3dGTVRGRmVwTjEvTXVT?= =?utf-8?B?dmJxdjJzNWlndmVZNlRhWkhGYThjRlgwdlFyWEpUak1HNlNQcjY3ZDh3WTFF?= =?utf-8?B?aEZ1bFBNUU1WT2N1K0lCdEYrQ1lUVVg5Qys5ZFpldmswUXpzakxsUkFvd3Jh?= =?utf-8?B?cFdQUEhWaDNnN1lkZXdSWWRsSERUS2FXbkpwYVpMY2FXaUFpcWlmdnQ0VGZB?= =?utf-8?B?NEd4bFd0cE9lV1E0b1dCQmcrWFVIeEdMdkpjZkRUNmlNMTdwNUNmYjJ2Ky9t?= =?utf-8?B?TGJSQzVsT3dCVHRMVy91bXBhZFVsa0pDc0txWEFGeVJxYVhOZWR6dkRDbmda?= =?utf-8?B?c2ZJcHY0N0ZQbjI2OVloSmVPaFlMOGMya2lhanRDZ1ZkU2VXY25NMXVNWlFa?= =?utf-8?B?TXpLOWJhZUU3N0VtQVBHSUt1ODBkYThaUUJoUnZvRy9VZlJSSHBmTXZ3cWth?= =?utf-8?B?RXNTZG55K2FYMWZEL21ldkhhZmVPMEpMUzBjWGNVVm1Jb3lpUENnNWNyOVNu?= =?utf-8?B?QU1MeWhzeDFRSFZaQ240eFBic3B6YTJ4NXc0TnlYU080RmwwTVErak4yMFVj?= =?utf-8?B?ZmJWdUxhUDZSdzlPVTdJZEM3czJrR3hlMWJJTWdRZmR0MHpicDFzaVBuR0JP?= =?utf-8?B?eXBhakgvUE5MVEljQzRQWFIwV0dsQTBYMmNUYThtVExpZldkdFJ2MVNTTnpw?= =?utf-8?B?OUdUQXd1UTBrb3ZFV3BabUpzYWk1UFdFaDA3ZkhOVlNYZWZpaVlBNGU5dTJF?= =?utf-8?Q?JlODeh3FB74pdZVDda7yA5GgNEViWm7nHsADstetuSzLa/?= X-Microsoft-Antispam-Message-Info: AGErFPSIsohhjlqowR5Y6nyhKzbwPhGL3QHglNwBaddsiwT7ZlsiwfhaEw/R1ZvFU5j2fmO4LbIRn5rvslNWgYqgddL+4vrU6VXG3R8b2/U1NSpJydSoB01Pkpiy6Xb+wC4SG1ptdXsj7U6LKwVjGPn+2OxaBmdI7cUuybNNBlX6V4M/CsxqDTlvpHtcM5CkDxtQS8uH1wNwz/lAnlEbVg281EoHNBRAiT6iBaDAVMzT24C8OT9RnKHVQVPRAh02A7zofITVRo9Qw5GgJQOAXRmaTp6p5Hql6j+0OcsMit7Jj3ck9bG97UWmALjmiXhTMGzWbwFawK5gqWtfnIZF8JO6NnUugQcI+gXUWwLDWpA= X-Microsoft-Exchange-Diagnostics: 1;DB6PR0501MB2854;6:cYxCZdsEz+P9heXq5v1msPlARQCvtaMZw/zEvoPkccUFFRosdRpMn038n/b6qrtXKqF/zym3oBZOWDnalbF5yjd7BEFZxlijPFZTYOxiW747/iZwNdiqxptGVltx1V9LOnT1fExE8wXBbxvyNtLJUtu5HTFYe+NZ4xp680paIno6rqi53RBviBGQ+hQCMoYRDFkl4Fv/o90eqX29kI50df72tlJvGobciLFbdRLhbUEsgRspi6fGPAzr79kW2eUA4wtwLjYSzBLPywSE//O7D9C8YKoJrZTR3/LNncuxie35TaOAYQefRo17CC0KanYlDd2R3ypHy0C2uDJ4VhkqczNvmRDd8bRVR5AGS1IuBRIA0Bl38Dr21dtFouDKviPdUDUIiBbOoN/mo/Cbu7c8AlEUtUOgxwUz6FSqT6DVaRxR8JjyZ6+23rahO0vvOR7bzhOXGWBjfis1FHXJ6FIaJw==;5:+0bUvqapvg52LDilmvpxu3KRG7pIo/5tGiObIUr0h53+A4DTEBzZAYASFLQ70ldDhNmmJPU5n7VnGjypOcpd/LuS/vgSlis01LFGRvrf4icGZ4mMfezdljX8gnIA6I3QyPDVOQAH0RMvMp6zdCsqrOCmxK3JJ3VbfNzOLaC3GNY=;7:Ca4NH3KQ+qQfmIMLmjwWVlX9BbxF1eq4OR9raUH4mDV5tAp+FI++hDjZ8UCY0Tu0NqhsDBHz64TcmwsC2FWkFkly8eFUfIc3Fe93HOacmLhQeKjHj5mDT1om7ist8aXXQvTKYS69Rn4fQFgx7y7c+pTy1q7bs8RFDcO6RX+sPm/yWVFhZOM6m3RnZjmTQ4QovkRt5sesOmGHJ1ltZi2cqnliLL3cgScGRVHeuQNXsteZOvSAFJsUwbTDw6ltk6bD SpamDiagnosticOutput: 1:99 SpamDiagnosticMetadata: NSPM X-OriginatorOrg: trivago.com X-MS-Exchange-CrossTenant-OriginalArrivalTime: 16 Aug 2018 05:40:45.7360 (UTC) X-MS-Exchange-CrossTenant-Network-Message-Id: 9f2760c0-3ce3-4b84-dc3c-08d6033ad08f X-MS-Exchange-CrossTenant-FromEntityHeader: Hosted X-MS-Exchange-CrossTenant-Id: 688965da-43a5-418f-ae45-331761010f00 X-MS-Exchange-Transport-CrossTenantHeadersStamped: DB6PR0501MB2854 even before message headers, the option for me always existed to just wrap the messages into my own custom envelop. So I of course thought this through. One sentence in your last email triggered all the thought process I put in the back then again to design it in the, what i think is the "kafka-way". It ended up ranting a little about what happened in the past. I see plenty of colleagues of mine falling into traps in the API, that I did warn about in the 1.0 DSL rewrite. I have the same feeling again. So I hope it gives you some insights into my though process. I am aware that since i never ported 213 to higher streams version, I don't really have a steak here and initially I didn't feel like actually sending it. But maybe you can pull something good from it. Best jan On 15.08.2018 04:44, Adam Bellemare wrote: > @Jan > Thanks Jan. I take it you mean "key-widening" somehow includes information > about which record is processed first? I understand about a CombinedKey > with both the Foreign and Primary key, but I don't see how you track > ordering metadata in there unless you actually included a metadata field in > the key type as well. > > @Guozhang > As Jan mentioned earlier, is Record Headers mean to strictly be used in > just the user-space? It seems that it is possible that a collision on the > (key,value) tuple I wish to add to it could occur. For instance, if I > wanted to add a ("foreignKeyOffset",10) to the Headers but the user already > specified their own header with the same key name, then it appears there > would be a collision. (This is one of the issues I brought up in the KIP). > > -------------------------------- > > I will be posting a prototype PR against trunk within the next day or two. > One thing I need to point out is that my design very strictly wraps the > entire foreignKeyJoin process entirely within the DSL function. There is no > exposure of CombinedKeys or widened keys, nothing to resolve with regards > to out-of-order processing and no need for the DSL user to even know what's > going on inside of the function. The code simply returns the results of the > join, keyed by the original key. Currently my API mirrors identically the > format of the data returned by the regular join function, and I believe > that this is very useful to many users of the DSL. It is my understanding > that one of the main design goals of the DSL is to provide higher level > functionality without requiring the users to know exactly what's going on > under the hood. With this in mind, I thought it best to solve ordering and > partitioning problems within the function and eliminate the requirement for > users to do additional work after the fact to resolve the results of their > join. Basically, I am assuming that most users of the DSL just "want it to > work" and want it to be easy. I did this operating under the assumption > that if a user truly wants to optimize their own workflow down to the > finest details then they will break from strictly using the DSL and move > down to the processors API. I think. The abstraction is not powerful enough to not have kafka specifics leak up The leak I currently think this has is that you can not reliable prevent the delete coming out first, before you emit the correct new record. As it is an abstraction entirely around kafka. I can only recommend to not to. Honesty and simplicity should always be first prio trying to hide this just makes it more complex, less understandable and will lead to mistakes in usage. Exactly why I am also in big disfavour of GraphNodes and later optimization stages. Can someone give me an example of an optimisation that really can't be handled by the user constructing his topology differently? Having reusable Processor API components accessible by the DSL and composable as one likes is exactly where DSL should max out and KSQL should do the next step. I find it very unprofessional from a software engineering approach to run software where you can not at least senseful reason about the inner workings of the libraries used. Gives this people have to read and understand in anyway, why try to hide it? It really miss the beauty of 0.10 version DSL. Apparently not a thing I can influence but just warn about. @gouzhang you can't imagine how many extra IQ-Statestores I constantly prune from stream app's because people just keep passing Materialized's into all the operations. :D :'-( I regret that I couldn't convince you guys back then. Plus this whole entire topology as a floating interface chain, never seen it anywhere :-/ :'( I don't know. I guess this is just me regretting to only have 24h/day. > I updated the KIP today with some points worth talking about, should anyone > be so inclined to check it out. Currently we are running this code in > production to handle relational joins from our Kafka Connect topics, as per > the original motivation of the KIP. > > > > > > > > > > > I believe the foreignKeyJoin should be responsible for. In my > > > > On Tue, Aug 14, 2018 at 5:22 PM, Guozhang Wang wrote: > >> Hello Adam, >> >> As for your question regarding GraphNodes, it is for extending Streams >> optimization framework. You can find more details on >> https://issues.apache.org/jira/browse/KAFKA-6761. >> >> The main idea is that instead of directly building up the "physical >> topology" (represented as Topology in the public package, and internally >> built as the ProcessorTopology class) while users are specifying the >> transformation operators, we first keep it as a "logical topology" >> (represented as GraphNode inside InternalStreamsBuilder). And then only >> execute the optimization and the construction of the "physical" Topology >> when StreamsBuilder.build() is called. >> >> Back to your question, I think it makes more sense to add a new type of >> StreamsGraphNode (maybe you can consider inheriting from the >> BaseJoinProcessorNode). Note that although in the Topology we will have >> multiple connected ProcessorNodes to represent a (foreign-key) join, we >> still want to keep it as a single StreamsGraphNode, or just a couple of >> them in the logical representation so that in the future we can construct >> the physical topology differently (e.g. having another way than the current >> distributed hash-join). >> >> ------------------------------------------------------- >> >> Back to your questions to KIP-213, I think Jan has summarized it >> pretty-well. Note that back then we do not have headers support so we have >> to do such "key-widening" approach to ensure ordering. >> >> >> Guozhang >> >> >> >> On Mon, Aug 13, 2018 at 11:39 PM, Jan Filipiak >> wrote: >> >>> Hi Adam, >>> >>> I love how you are on to this already! I resolve this by "key-widening" I >>> treat the result of FKA,and FKB differently. >>> As you can see the output of my join has a Combined Key and therefore I >>> can resolve the "race condition" in a group by >>> if I so desire. >>> >>> I think this reflects more what happens under the hood and makes it more >>> clear to the user what is going on. The Idea >>> of hiding this behind metadata and handle it in the DSL is from my POV >>> unideal. >>> >>> To write into your example: >>> >>> key + A, null) >>> (key +B, ) >>> >>> is what my output would look like. >>> >>> >>> Hope that makes sense :D >>> >>> Best Jan >>> >>> >>> >>> >>> >>> On 13.08.2018 18:16, Adam Bellemare wrote: >>> >>>> Hi Jan >>>> >>>> If you do not use headers or other metadata, how do you ensure that >>>> changes >>>> to the foreign-key value are not resolved out-of-order? >>>> ie: If an event has FK = A, but you change it to FK = B, you need to >>>> propagate both a delete (FK=A -> null) and an addition (FK=B). In my >>>> solution, without maintaining any metadata, it is possible for the final >>>> output to be in either order - the correctly updated joined value, or >> the >>>> null for the delete. >>>> >>>> (key, null) >>>> (key, ) >>>> >>>> or >>>> >>>> (key, ) >>>> (key, null) >>>> >>>> I looked back through your code and through the discussion threads, and >>>> didn't see any information on how you resolved this. I have a version of >>>> my >>>> code working for 2.0, I am just adding more integration tests and will >>>> update the KIP accordingly. Any insight you could provide on resolving >>>> out-of-order semantics without metadata would be helpful. >>>> >>>> Thanks >>>> Adam >>>> >>>> >>>> On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak >>> wrote: >>>> >>>> Hi, >>>>> Happy to see that you want to make an effort here. >>>>> >>>>> Regarding the ProcessSuppliers I couldn't find a way to not rewrite the >>>>> joiners + the merger. >>>>> The re-partitioners can be reused in theory. I don't know if >> repartition >>>>> is optimized in 2.0 now. >>>>> >>>>> I made this >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-241+ >>>>> KTable+repartition+with+compacted+Topics >>>>> back then and we are running KIP-213 with KIP-241 in combination. >>>>> >>>>> For us it is vital as it minimized the size we had in our repartition >>>>> topics plus it removed the factor of 2 in events on every message. >>>>> I know about this new "delete once consumer has read it". I don't >> think >>>>> 241 is vital for all usecases, for ours it is. I wanted >>>>> to use 213 to sneak in the foundations for 241 aswell. >>>>> >>>>> I don't quite understand what a PropagationWrapper is, but I am certain >>>>> that you do not need RecordHeaders >>>>> for 213 and I would try to leave them out. They either belong to the >> DSL >>>>> or to the user, having a mixed use is >>>>> to be avoided. We run the join with 0.8 logformat and I don't think one >>>>> needs more. >>>>> >>>>> This KIP will be very valuable for the streams project! I couldn't >> never >>>>> convince myself to invest into the 1.0+ DSL >>>>> as I used almost all my energy to fight against it. Maybe this can also >>>>> help me see the good sides a little bit more. >>>>> >>>>> If there is anything unclear with all the text that has been written, >>>>> feel >>>>> free to just directly cc me so I don't miss it on >>>>> the mailing list. >>>>> >>>>> Best Jan >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On 08.08.2018 15:26, Adam Bellemare wrote: >>>>> >>>>> More followup, and +dev as Guozhang replied to me directly previously. >>>>>> I am currently porting the code over to trunk. One of the major >> changes >>>>>> since 1.0 is the usage of GraphNodes. I have a question about this: >>>>>> >>>>>> For a foreignKey joiner, should it have its own dedicated node type? >> Or >>>>>> would it be advisable to construct it from existing GraphNode >>>>>> components? >>>>>> For instance, I believe I could construct it from several >>>>>> OptimizableRepartitionNode, some SinkNode, some SourceNode, and >> several >>>>>> StatefulProcessorNode. That being said, there is some underlying >>>>>> complexity >>>>>> to each approach. >>>>>> >>>>>> I will be switching the KIP-213 to use the RecordHeaders in Kafka >>>>>> Streams >>>>>> instead of the PropagationWrapper, but conceptually it should be the >>>>>> same. >>>>>> >>>>>> Again, any feedback is welcomed... >>>>>> >>>>>> >>>>>> On Mon, Jul 30, 2018 at 9:38 AM, Adam Bellemare < >>>>>> adam.bellemare@gmail.com >>>>>> wrote: >>>>>> >>>>>> Hi Guozhang et al >>>>>> >>>>>>> I was just reading the 2.0 release notes and noticed a section on >>>>>>> Record >>>>>>> Headers. >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API >>>>>>> >>>>>>> I am not yet sure if the contents of a RecordHeader is propagated all >>>>>>> the >>>>>>> way through the Sinks and Sources, but if it is, and if it remains >>>>>>> attached >>>>>>> to the record (including null records) I may be able to ditch the >>>>>>> propagationWrapper for an implementation using RecordHeader. I am not >>>>>>> yet >>>>>>> sure if this is doable, so if anyone understands RecordHeader impl >>>>>>> better >>>>>>> than I, I would be happy to hear from you. >>>>>>> >>>>>>> In the meantime, let me know of any questions. I believe this PR has >> a >>>>>>> lot >>>>>>> of potential to solve problems for other people, as I have >> encountered >>>>>>> a >>>>>>> number of other companies in the wild all home-brewing their own >>>>>>> solutions >>>>>>> to come up with a method of handling relational data in streams. >>>>>>> >>>>>>> Adam >>>>>>> >>>>>>> >>>>>>> On Fri, Jul 27, 2018 at 1:45 AM, Guozhang Wang >>>>>>> wrote: >>>>>>> >>>>>>> Hello Adam, >>>>>>> >>>>>>>> Thanks for rebooting the discussion of this KIP ! Let me finish my >>>>>>>> pass >>>>>>>> on the wiki and get back to you soon. Sorry for the delays.. >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> On Tue, Jul 24, 2018 at 6:08 AM, Adam Bellemare < >>>>>>>> adam.bellemare@gmail.com >>>>>>>> >>>>>>>> wrote: >>>>>>>>> Let me kick this off with a few starting points that I would like >> to >>>>>>>>> generate some discussion on. >>>>>>>>> >>>>>>>>> 1) It seems to me that I will need to repartition the data twice - >>>>>>>>> once >>>>>>>>> on >>>>>>>>> the foreign key, and once back to the primary key. Is there >> anything >>>>>>>>> I >>>>>>>>> am >>>>>>>>> missing here? >>>>>>>>> >>>>>>>>> 2) I believe I will also need to materialize 3 state stores: the >>>>>>>>> prefixScan >>>>>>>>> SS, the highwater mark SS (for out-of-order resolution) and the >> final >>>>>>>>> state >>>>>>>>> store, due to the workflow I have laid out. I have not thought of a >>>>>>>>> better >>>>>>>>> way yet, but would appreciate any input on this matter. I have gone >>>>>>>>> back >>>>>>>>> through the mailing list for the previous discussions on this KIP, >>>>>>>>> and >>>>>>>>> I >>>>>>>>> did not see anything relating to resolving out-of-order compute. I >>>>>>>>> cannot >>>>>>>>> see a way around the current three-SS structure that I have. >>>>>>>>> >>>>>>>>> 3) Caching is disabled on the prefixScan SS, as I do not know how >> to >>>>>>>>> resolve the iterator obtained from rocksDB with that of the cache. >> In >>>>>>>>> addition, I must ensure everything is flushed before scanning. >> Since >>>>>>>>> the >>>>>>>>> materialized prefixScan SS is under "control" of the function, I do >>>>>>>>> not >>>>>>>>> anticipate this to be a problem. Performance throughput will need >> to >>>>>>>>> be >>>>>>>>> tested, but as Jan observed in his initial overview of this issue, >> it >>>>>>>>> is >>>>>>>>> generally a surge of output events which affect performance moreso >>>>>>>>> than >>>>>>>>> the >>>>>>>>> flush or prefixScan itself. >>>>>>>>> >>>>>>>>> Thoughts on any of these are greatly appreciated, since these >>>>>>>>> elements >>>>>>>>> are >>>>>>>>> really the cornerstone of the whole design. I can put up the code I >>>>>>>>> have >>>>>>>>> written against 1.0.2 if we so desire, but first I was hoping to >> just >>>>>>>>> tackle some of the fundamental design proposals. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Adam >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Jul 23, 2018 at 10:05 AM, Adam Bellemare < >>>>>>>>> adam.bellemare@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Here is the new discussion thread for KIP-213. I picked back up on >>>>>>>>> the >>>>>>>>> KIP >>>>>>>>> >>>>>>>>> as this is something that we too at Flipp are now running in >>>>>>>>>> production. >>>>>>>>> Jan started this last year, and I know that Trivago is also using >>>>>>>>>> something >>>>>>>>> similar in production, at least in terms of APIs and functionality. >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>> 213+Support+non-key+joining+in+KTable >>>>>>>>>> >>>>>>>>>> I do have an implementation of the code for Kafka 1.0.2 (our local >>>>>>>>>> production version) but I won't post it yet as I would like to >> focus >>>>>>>>>> on the >>>>>>>>> workflow and design first. That being said, I also need to add some >>>>>>>>>> clearer >>>>>>>>> integration tests (I did a lot of testing using a non-Kafka Streams >>>>>>>>>> framework) and clean up the code a bit more before putting it in a >>>>>>>>>> PR >>>>>>>>>> against trunk (I can do so later this week likely). >>>>>>>>>> >>>>>>>>>> Please take a look, >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> >>>>>>>>>> Adam Bellemare >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >> -- >> -- Guozhang >>