Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E2B77200CFA for ; Tue, 5 Sep 2017 17:08:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E154D161421; Tue, 5 Sep 2017 15:08:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 33FCF160BE4 for ; Tue, 5 Sep 2017 17:08:10 +0200 (CEST) Received: (qmail 4335 invoked by uid 500); 5 Sep 2017 15:08:03 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 4325 invoked by uid 99); 5 Sep 2017 15:08:03 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Sep 2017 15:08:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 79B131A2EE0 for ; Tue, 5 Sep 2017 15:08:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.001 X-Spam-Level: X-Spam-Status: No, score=-0.001 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Q0P62iJG6wwC for ; Tue, 5 Sep 2017 15:07:56 +0000 (UTC) Received: from proxy.tng.vnc.biz (zimbra-vnc.tngtech.com [83.144.240.98]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 216895F5FD for ; Tue, 5 Sep 2017 15:07:55 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by proxy.tng.vnc.biz (Postfix) with ESMTP id 36CC91E1980; Tue, 5 Sep 2017 17:07:50 +0200 (CEST) Received: from proxy.tng.vnc.biz ([127.0.0.1]) by localhost (proxy.tng.vnc.biz [127.0.0.1]) (amavisd-new, port 10032) with ESMTP id UW4qtcvt_7hb; Tue, 5 Sep 2017 17:07:49 +0200 (CEST) Received: from localhost (localhost [127.0.0.1]) by proxy.tng.vnc.biz (Postfix) with ESMTP id 6CD311E1A8D; Tue, 5 Sep 2017 17:07:49 +0200 (CEST) X-Virus-Scanned: amavisd-new at Received: from proxy.tng.vnc.biz ([127.0.0.1]) by localhost (proxy.tng.vnc.biz [127.0.0.1]) (amavisd-new, port 10026) with ESMTP id 2dypzcnWB7uA; Tue, 5 Sep 2017 17:07:42 +0200 (CEST) Received: from [10.16.72.178] (fire.tngtech.com [212.204.93.100]) by proxy.tng.vnc.biz (Postfix) with ESMTPSA id D72CB1E1980; Tue, 5 Sep 2017 17:07:41 +0200 (CEST) Subject: Re: DataSet: CombineHint heuristics To: =?UTF-8?B?R8OhYm9yIEfDqXZheQ==?= Cc: user References: <4607fb76-8825-0be0-45cb-1aba547c9e31@tngtech.com> <51ED0595-7051-4E0D-AAFA-4BD7841ED452@apache.org> From: Urs Schoenenberger Message-ID: <9ab80503-1b4f-c1c2-b790-d239e763781c@tngtech.com> Date: Tue, 5 Sep 2017 17:07:46 +0200 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Thunderbird/52.2.1 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: quoted-printable archived-at: Tue, 05 Sep 2017 15:08:11 -0000 Hi G=C3=A1bor, thank you very much for your explanation, that makes a lot of sense. Best regards, Urs On 05.09.2017 14:32, G=C3=A1bor G=C3=A9vay wrote: > Hi Urs, >=20 > Yes, the 1/10th ratio is just a very loose rule of thumb. I would > suggest to try both the SORT and HASH strategies with a workload that > is as similar as possible to your production workload (similar data, > similar parallelism, etc.), and see which one is faster for your > specific use case. >=20 > An important difference between the HASH and SORT strategies is that > the sorting combiner stores the original input records, while the hash > combiner stores only combined records. I.e., when an input record > arrives whose key is already in the hashtable then this record won't > consume additional memory, because it is combined right away. > Therefore, for example, if you would like your combiner to not emit > any records prematurely (i.e., combine everything possible, without > running out of memory), then with the SORT strategy you need combiner > memory proportional to your input size, while with the HASH strategy > you need combiner memory proportional only to the number of keys. >=20 > You are correct in that the performance depends very much on how many > records fit into a single Sorter/Hashtable. However, I wrote > #keys/#total records into the documentation because this is easier for > a user to estimate, and this ratio being small correlates with the > HASH strategy getting faster, as explained above. >=20 > Best, > G=C3=A1bor >=20 >=20 >=20 > On Thu, Aug 31, 2017 at 4:02 PM, Aljoscha Krettek = wrote: >> Hi, >> >> I would say that your assumption is correct and that the COMBINE strat= egy does in fact also depend on the ration " #total records/#records that= fit into a single Sorter/Hashtable". >> >> I'm CC'ing Fabian, just to be sure. He knows that stuff better than I = do. >> >> Best, >> Aljoscha >> >>> On 31. Aug 2017, at 13:41, Urs Schoenenberger wrote: >>> >>> Hi all, >>> >>> I was wondering about the heuristics for CombineHint: >>> >>> Flink uses SORT by default, but the doc for HASH says that we should >>> expect it to be faster if the number of keys is less than 1/10th of t= he >>> number of records. >>> >>> HASH should be faster if it is able to combine a lot of records, whic= h >>> happens if multiple events for the same key are present in a data chu= nk >>> *that fits into a combine-hashtable* (cf handling in >>> ReduceCombineDriver.java). >>> >>> Now, if I have 10 billion events and 100 million keys, but only about= 1 >>> million records fit into a hashtable, the number of matches may be >>> extremely low, so very few events are getting combined (of course, th= is >>> is similar for SORT as the sorter's memory is bounded, too). >>> >>> Am I correct in assuming that the actual tradeoff is not only based o= n >>> the ratio of #total records/#keys, but also on #total records/#record= s >>> that fit into a single Sorter/Hashtable? >>> >>> Thanks, >>> Urs >>> >>> -- >>> Urs Sch=C3=B6nenberger - urs.schoenenberger@tngtech.com >>> >>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterf=C3=B6hring >>> Gesch=C3=A4ftsf=C3=BChrer: Henrik Klagges, Dr. Robert Dahlke, Gerhard= M=C3=BCller >>> Sitz: Unterf=C3=B6hring * Amtsgericht M=C3=BCnchen * HRB 135082 >> --=20 Urs Sch=C3=B6nenberger - urs.schoenenberger@tngtech.com TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterf=C3=B6hring Gesch=C3=A4ftsf=C3=BChrer: Henrik Klagges, Dr. Robert Dahlke, Gerhard M=C3= =BCller Sitz: Unterf=C3=B6hring * Amtsgericht M=C3=BCnchen * HRB 135082