From dev-return-3361-archive-asf-public=cust-asf.ponee.io@druid.apache.org Fri Jul 23 20:36:06 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id ECBEB180654 for ; Fri, 23 Jul 2021 22:36:05 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id 2593E469C8 for ; Fri, 23 Jul 2021 20:36:05 +0000 (UTC) Received: (qmail 44181 invoked by uid 500); 23 Jul 2021 20:36:04 -0000 Mailing-List: contact dev-help@druid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@druid.apache.org Delivered-To: mailing list dev@druid.apache.org Received: (qmail 44169 invoked by uid 99); 23 Jul 2021 20:36:04 -0000 Received: from mailrelay1-he-de.apache.org (HELO mailrelay1-he-de.apache.org) (116.203.21.61) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Jul 2021 20:36:04 +0000 Received: from mail-ej1-f53.google.com (mail-ej1-f53.google.com [209.85.218.53]) by mailrelay1-he-de.apache.org (ASF Mail Server at mailrelay1-he-de.apache.org) with ESMTPSA id B55DB3E80B for ; Fri, 23 Jul 2021 20:36:03 +0000 (UTC) Received: by mail-ej1-f53.google.com with SMTP id hs23so5296094ejc.13 for ; Fri, 23 Jul 2021 13:36:03 -0700 (PDT) X-Gm-Message-State: AOAM532ptMn0kgrdnRLpQjBwpkROVR0DgcZ94w6rR6u61J6JGmk6YprC hsmLNTwxPW34+03mAIYOGPDE4t3GaBYKPE18O3g= X-Google-Smtp-Source: ABdhPJytJbV89XIoRyOwuSPUyqGY9FgF8vdzk72hFPvFZ4Kt3DTjSdD2OooszPG5vmZz3OxhK/SL90cVmwnXCIIvYwc= X-Received: by 2002:a17:906:a24c:: with SMTP id bi12mr6384572ejb.530.1627072563173; Fri, 23 Jul 2021 13:36:03 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Gian Merlino Date: Fri, 23 Jul 2021 13:35:52 -0700 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: ItemsSketch Aggregator in druid-datasketches extension To: dev@druid.apache.org Content-Type: multipart/alternative; boundary="00000000000026decc05c7d05aa5" --00000000000026decc05c7d05aa5 Content-Type: text/plain; charset="UTF-8" Hey Michael, Very cool! To answer your question: it is critical to have a BufferAggregator. Some context; there are 3 kinds of aggregators: - Aggregator: stores intermediate state on heap; is used during ingestion and by the non-vectorized timeseries query engine. Required, or else some queries won't work properly. - BufferAggregator: stores intermediate state off-heap; is used by the non-vectorized topN and groupBy engines. Required, or else some queries won't work properly. - VectorAggregator: stores intermediate state off-heap; is used by vectorized query engines. Optional, but if there are any aggregators in a query that don't have a VectorAggregator implementation, then the whole query will run non-vectorized, which will slow it down. The main reason that BufferAggregators exist is to support off-heap aggregation, which minimizes GC pressure and prevents out-of-memory errors during the aggregation process. > Assuming it is, we can begin talking with the datasketches team about the possibility of a Direct implementation. With ItemsSketch, the biggest roadblock you're likely to run into is the fact that the items may be of variable size. Currently in Druid each BufferAggregator (and VectorAggregator) must have a fixed-size amount of space allocated off heap. The way this is done is by returning the number of bytes you need from AggregatorFactory.getMaxIntermediateSize. It's all allocated upfront. It can depend on aggregator configuration (for example, the HLL aggregators need more space if they're configured to have higher accuracy) but it can't depend on the data that is encountered during the query. So if you do have variable size items, you get into quite a conundrum! In order to have a "proper" implementation of an off-heap variable-sized sketch, we'd need an aggregator API that allows them to allocate additional memory at runtime. The DataSketches folks have asked us for this before but we haven't built it yet. There's a couple ways you could solve it in the meantime: 1) Allocate a fixed-size amount of memory that is enough to store most data you reasonably expect to encounter, update that off-heap, and allocate additional memory on heap if needed, without Druid "knowing" about it. This is something that'd require a Direct version of the ItemsSketch. It's a technique used by the quantiles sketches too. It's a hack but it works. You can see it in action by looking at DirectUpdateDoublesSketch in DataSketches (check out the spots where "mem_" is reassigned: it's allocating new on-heap memory that Druid doesn't know about) and DoublesSketchBuildBufferAggregator in Druid (check out the "sketches" map and "relocate" method in DoublesSketchBuildBufferAggregatorHelper: it's making sure to retain the references just in case one of them grew into the heap). 2) Allocate some arbitrary amount of memory, but don't use it, use a technique like the one in DoublesSketchBuildBufferAggregatorHelper to simply maintain references to on-heap sketches. This would work but you would run the risk of running out of heap memory. (There's nothing that explicitly controls how much heap you'll use.) To a degree you can mitigate this by allocating a larger amount of unused off-heap memory via AggregatorFactory.getMaxIntermediateSize, which will get Druid to flush aggregation state more often (it does this when it runs out of off-heap buffer space), which will sort of limit how many sketches are going to be in flight at once. It's quite indirect but it's the best I can think of. > I am also thinking of finishing the implementation by explicitly serializing the entire sketch on each update, but this would only be for experimentation as I doubt this is the intended behavior for implementations of BufferedAggregator. You're right, that's not the intended sort of implementation. It works but it's usually too slow to be practical. (It incurs a full serialization roundtrip for every row.) On Fri, Jul 23, 2021 at 12:18 PM Michael Schiff wrote: > I am looking into implementing a new Aggregator in the datasketches > extension using the ItemSketch in the frequencies package: > > https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html > > https://github.com/apache/datasketches-java/tree/master/src/main/java/org/apache/datasketches/frequencies > > Ive started on a partial implementation here (still a WIP, lots of TODOs): > > https://github.com/apache/druid/compare/master...michaelschiff:fis-aggregator?expand=1 > > From everything I've seen, it's critical that there is an efficient > implementation of BufferAggregator. The existing aggregators take advantage > of other sketch types providing "Direct" implementations that are > implemented directly against a ByteBuffer. This leads to fairly > transparent implementation of BufferAggregator. ItemSketch is able to > serialize itself and to wrap ByteBuffer for instantiation, but the actual > interactions are all on heap (core of the implementation is > https://github.com/apache/datasketches-java/blob/27ecce938555d731f29df97f12f4744a0efb663d/src/main/java/org/apache/datasketches/frequencies/ReversePurgeItemHashMap.java > ). > > Can anyone confirm that it is critical (i.e. Aggregator will not function) > to have an implementation of BufferAggregator? Assuming it is, we can begin > talking with the datasketches team about the possibility of a Direct > implementation. I am also thinking of finishing the implementation by > explicitly serializing the entire sketch on each update, but this would > only be for experimentation as I doubt this is the intended behavior for > implementations of BufferedAggregator. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org > For additional commands, e-mail: dev-help@druid.apache.org > > --00000000000026decc05c7d05aa5--