flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-2907) Bloom filter for Join
Date Thu, 14 Jan 2016 20:26:39 GMT

     [ https://issues.apache.org/jira/browse/FLINK-2907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Fabian Hueske updated FLINK-2907:
    Component/s:     (was: Java API)
                     (was: Scala API)
                 DataSet API

> Bloom filter for Join
> ---------------------
>                 Key: FLINK-2907
>                 URL: https://issues.apache.org/jira/browse/FLINK-2907
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataSet API
>    Affects Versions: 1.0.0
>            Reporter: Greg Hogan
>            Assignee: Greg Hogan
>              Labels: requires-design-doc
> A bloom filter can be a chainable operation for probe side Join elements. An element
not matched by the bloom filter will not be serialized, shipped, deserialized, and processed.
> Generating the bloom filter is a chainable operation over hash side elements. The bloom
filter created on each TaskManager must be the same size to allow combining by xor. The most
efficient means to distribute the bloom filter is to assign each TaskManager an equal partition
that it will receive from all other TaskManagers. This will be broadcast once all local elements
(by hashing) and remote partitions (by xor) have been processed into that part of the bloom
> An example with numbers: triangle listing/counting joining 2B edges on 149B two-paths
resulting in 21B triangles (this is using the optimal algorithm). At 8 bits per element the
bloom filter will have a false-positive rate of ~2% and require a 2 GB bloom filter (stored
once and shared per TaskManager). Each TaskManager both sends and receives data equivalent
to the size of the bloom filter (minus the local partition, the size of which trends towards
zero as the number of TaskManagers increases). The number of matched elements is 21B (true
positive) + ~0.02*(149B-21B) = 23.5B, a reduction of 84% or 1.5 TB (at 12 bytes per element).
With 4 TaskManagers only 12 GB of bloom filter would be transmitted, a savings of 99.2%.
> Key issues are determining the size of the bloom filter (dependent on the count of hash
side elements, the available memory segments, and the error rate) and whether this can be
integrated with Join or must be a separate operator. This also depends on dynamic memory allocation
as spilling to disk would perform the serialization, write, read, and deserialization we are
looking to avoid.

This message was sent by Atlassian JIRA

View raw message