kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aw...@apache.org
Subject [kudu] 01/02: [util] Import Impala's block based BloomFilter
Date Mon, 16 Dec 2019 19:45:22 GMT
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit e671e81ce170c4595fed497a5a518a50eb700b69
Author: Bankim Bhavsar <bankim@cloudera.com>
AuthorDate: Thu Nov 14 15:53:26 2019 -0800

    [util] Import Impala's block based BloomFilter
    
    This change imports Impala's block based BloomFilter(BF) that uses tiny
    BloomFilters that are space, cache and hash efficient.
    Plan is to use this BloomFilter for pushing down predicates from Impala.
    
    Added this BloomFilter in kudu-util and renamed to BlockBloomFilter to
    avoid name collision with existing BloomFilter in Kudu.
    
    Removed dependencies on auto-generated code like Thrift/Protobuf,
    Impala specific utilities like BufferPool, CPUInfo.
    
    Added a simple BlockBloomFilterBufferAllocatorIf interface to allow
    integration with Impala that uses its own BufferPool.
    Added a DefaultBlockBloomFilterBufferAllocator implementation derived
    from earlier version in Impala.
    
    AVX2 operations are used which leads to following scenarios:
    1) Compiler lacks AVX2 support.
    Code is conditionally compiled using USE_AVX2 macro.
    2) Compiler supports AVX2 but CPU at runtime does not.
    Function pointers used to dispatch to generic implementation.
    3) Compiler supports AVX2 and CPU at runtime does too.
    Function pointers used to dispatch to AVX2 specific implementation.
    
    Made bunch of cosmetic changes to adhere to coding style.
    
    Change-Id: I89c54a051c5093cf5fb81481a47a0a6677d7d906
    Reviewed-on: http://gerrit.cloudera.org:8080/14745
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
    Reviewed-by: Andrew Wong <awong@cloudera.com>
---
 src/kudu/util/CMakeLists.txt             |  22 +++
 src/kudu/util/block_bloom_filter-test.cc | 241 +++++++++++++++++++++++++++++++
 src/kudu/util/block_bloom_filter.cc      | 201 ++++++++++++++++++++++++++
 src/kudu/util/block_bloom_filter.h       | 231 +++++++++++++++++++++++++++++
 src/kudu/util/block_bloom_filter_avx2.cc |  78 ++++++++++
 src/kudu/util/bloom_filter.h             |   4 +
 6 files changed, 777 insertions(+)

diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index affb7e8..70a0bce 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -148,6 +148,7 @@ set(UTIL_SRCS
   atomic.cc
   bitmap.cc
   block_cache_metrics.cc
+  block_bloom_filter.cc
   bloom_filter.cc
   cache.cc
   char_util.cc
@@ -242,6 +243,26 @@ set(UTIL_SRCS
 # optimized regardless of the default optimization options.
 set_source_files_properties(memory/overwrite.cc PROPERTIES COMPILE_FLAGS "-O3")
 
+# Detect AVX2 support
+set(AVX2_CMD "echo | ${CMAKE_CXX_COMPILER} -mavx2 -dM -E - | awk '$2 == \"__AVX2__\" { print
$3 }'")
+execute_process(
+  COMMAND bash -c ${AVX2_CMD}
+  OUTPUT_VARIABLE AVX2_SUPPORT
+  OUTPUT_STRIP_TRAILING_WHITESPACE
+)
+message("AVX2_SUPPORT=${AVX2_SUPPORT}")
+
+# block_bloom_filter_avx2.cc uses AVX2 operations.
+if (AVX2_SUPPORT)
+  list(APPEND UTIL_SRCS block_bloom_filter_avx2.cc)
+  set_source_files_properties(block_bloom_filter_avx2.cc PROPERTIES COMPILE_FLAGS "-mavx2")
+  # block_bloom_filter.cc is not compiled explicitly with AVX2 instructions(-mavx2) but it
needs
+  # to know at compile time whether AVX2 support is available, hence the custom definition
+  # instead of relying on __AVX2__ defined by compiler with -mavx2.
+  set_source_files_properties(block_bloom_filter_avx2.cc block_bloom_filter.cc
+                              PROPERTIES COMPILE_DEFINITIONS "USE_AVX2=1")
+endif()
+
 set(UTIL_LIBS
   crcutil
   gflags
@@ -388,6 +409,7 @@ ADD_KUDU_TEST(bit-util-test)
 ADD_KUDU_TEST(bitmap-test)
 ADD_KUDU_TEST(bitset-test)
 ADD_KUDU_TEST(blocking_queue-test)
+ADD_KUDU_TEST(block_bloom_filter-test)
 ADD_KUDU_TEST(bloom_filter-test)
 ADD_KUDU_TEST(cache-bench RUN_SERIAL true)
 ADD_KUDU_TEST(cache-test)
diff --git a/src/kudu/util/block_bloom_filter-test.cc b/src/kudu/util/block_bloom_filter-test.cc
new file mode 100644
index 0000000..492174e
--- /dev/null
+++ b/src/kudu/util/block_bloom_filter-test.cc
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/block_bloom_filter.h"
+
+#include <cmath> // IWYU pragma: keep
+#include <cstdint>
+#include <cstdlib>
+#include <iosfwd>
+#include <memory>
+#include <unordered_set>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(disable_blockbloomfilter_avx2);
+
+using namespace std; // NOLINT(*)
+
+namespace kudu {
+
+class BlockBloomFilterTest : public KuduTest {
+ public:
+  void SetUp() override {
+    SeedRandom();
+  }
+  // Make a random uint32_t, avoiding the absent high bit and the low-entropy low bits
+  // produced by rand().
+  static uint32_t MakeRand() {
+    uint32_t result = (rand() >> 8) & 0xffff;
+    result <<= 16;
+    result |= (rand() >> 8) & 0xffff;
+    return result;
+  }
+
+  shared_ptr<BlockBloomFilter> CreateBloomFilter(size_t log_space_bytes) {
+    FLAGS_disable_blockbloomfilter_avx2 = (MakeRand() & 0x1) == 0;
+
+    auto bf = make_shared<BlockBloomFilter>(DefaultBlockBloomFilterBufferAllocator::GetSingleton());
+    CHECK_OK(bf->Init(log_space_bytes));
+    bloom_filters_.emplace_back(bf);
+    return bf;
+  }
+
+  void TearDown() override {
+    for (const auto& bf : bloom_filters_) {
+      bf->Close();
+    }
+  }
+
+ private:
+  vector<shared_ptr<BlockBloomFilter>> bloom_filters_;
+};
+
+// We can construct (and destruct) Bloom filters with different spaces.
+TEST_F(BlockBloomFilterTest, Constructor) {
+  for (int i = 0; i < 30; ++i) {
+    CreateBloomFilter(i);
+  }
+}
+
+TEST_F(BlockBloomFilterTest, InvalidSpace) {
+  BlockBloomFilter bf(DefaultBlockBloomFilterBufferAllocator::GetSingleton());
+  // Random number in the range [38, 64).
+  const int log_space_bytes = 38 + rand() % (64 - 38);
+  Status s = bf.Init(log_space_bytes);
+  ASSERT_TRUE(s.IsInvalidArgument());
+  ASSERT_STR_CONTAINS(s.ToString(), "Bloom filter too large");
+  bf.Close();
+}
+
+// We can Insert() hashes into a Bloom filter with different spaces.
+TEST_F(BlockBloomFilterTest, Insert) {
+  for (int i = 13; i < 17; ++i) {
+    auto bf = CreateBloomFilter(i);
+    for (int k = 0; k < (1 << 15); ++k) {
+      bf->Insert(MakeRand());
+    }
+  }
+}
+
+// After Insert()ing something into a Bloom filter, it can be found again immediately.
+TEST_F(BlockBloomFilterTest, Find) {
+  for (int i = 13; i < 17; ++i) {
+    auto bf = CreateBloomFilter(i);
+    for (int k = 0; k < (1 << 15); ++k) {
+      const auto to_insert = MakeRand();
+      bf->Insert(to_insert);
+      EXPECT_TRUE(bf->Find(to_insert));
+    }
+  }
+}
+
+// After Insert()ing something into a Bloom filter, it can be found again much later.
+TEST_F(BlockBloomFilterTest, CumulativeFind) {
+  for (int i = 5; i < 11; ++i) {
+    vector<uint32_t> inserted;
+    auto bf = CreateBloomFilter(i);
+    for (int k = 0; k < (1 << 10); ++k) {
+      const uint32_t to_insert = MakeRand();
+      inserted.push_back(to_insert);
+      bf->Insert(to_insert);
+      for (int n = 0; n < inserted.size(); ++n) {
+        EXPECT_TRUE(bf->Find(inserted[n]));
+      }
+    }
+  }
+}
+
+// The empirical false positives we find when looking for random items is with a constant
+// factor of the false positive probability the Bloom filter was constructed for.
+TEST_F(BlockBloomFilterTest, FindInvalid) {
+  static const int find_limit = 1 << 20;
+  unordered_set<uint32_t> to_find;
+  while (to_find.size() < find_limit) {
+    to_find.insert(MakeRand());
+  }
+  static const int max_log_ndv = 19;
+  unordered_set<uint32_t> to_insert;
+  while (to_insert.size() < (1ULL << max_log_ndv)) {
+    const auto candidate = MakeRand();
+    if (to_find.find(candidate) == to_find.end()) {
+      to_insert.insert(candidate);
+    }
+  }
+  vector<uint32_t> shuffled_insert(to_insert.begin(), to_insert.end());
+  for (int log_ndv = 12; log_ndv < max_log_ndv; ++log_ndv) {
+    for (int log_fpp = 4; log_fpp < 15; ++log_fpp) {
+      double fpp = 1.0 / (1 << log_fpp);
+      const size_t ndv = 1 << log_ndv;
+      const int log_heap_space = BlockBloomFilter::MinLogSpace(ndv, fpp);
+      auto bf = CreateBloomFilter(log_heap_space);
+      // Fill up a BF with exactly as much ndv as we planned for it:
+      for (size_t i = 0; i < ndv; ++i) {
+        bf->Insert(shuffled_insert[i]);
+      }
+      int found = 0;
+      // Now we sample from the set of possible hashes, looking for hits.
+      for (const auto& i : to_find) {
+        found += bf->Find(i);
+      }
+      EXPECT_LE(found, find_limit * fpp * 2)
+          << "Too many false positives with -log2(fpp) = " << log_fpp;
+      // Because the space is rounded up to a power of 2, we might actually get a lower
+      // fpp than the one passed to MinLogSpace().
+      const double expected_fpp = BlockBloomFilter::FalsePositiveProb(ndv, log_heap_space);
+      EXPECT_GE(found, find_limit * expected_fpp)
+          << "Too few false positives with -log2(fpp) = " << log_fpp;
+      EXPECT_LE(found, find_limit * expected_fpp * 16)
+          << "Too many false positives with -log2(fpp) = " << log_fpp;
+    }
+  }
+}
+
+// Test that MaxNdv() and MinLogSpace() are dual
+TEST_F(BlockBloomFilterTest, MinSpaceMaxNdv) {
+  for (int log2fpp = -2; log2fpp >= -63; --log2fpp) {
+    const double fpp = pow(2, log2fpp);
+    for (int given_log_space = 8; given_log_space < 30; ++given_log_space) {
+      const size_t derived_ndv = BlockBloomFilter::MaxNdv(given_log_space, fpp);
+      int derived_log_space = BlockBloomFilter::MinLogSpace(derived_ndv, fpp);
+
+      EXPECT_EQ(derived_log_space, given_log_space) << "fpp: " << fpp
+                                                    << " derived_ndv: " << derived_ndv;
+
+      // If we lower the fpp, we need more space; if we raise it we need less.
+      derived_log_space = BlockBloomFilter::MinLogSpace(derived_ndv, fpp / 2);
+      EXPECT_GE(derived_log_space, given_log_space) << "fpp: " << fpp
+                                                    << " derived_ndv: " << derived_ndv;
+      derived_log_space = BlockBloomFilter::MinLogSpace(derived_ndv, fpp * 2);
+      EXPECT_LE(derived_log_space, given_log_space) << "fpp: " << fpp
+                                                    << " derived_ndv: " << derived_ndv;
+    }
+    for (size_t given_ndv = 1000; given_ndv < 1000 * 1000; given_ndv *= 3) {
+      const int derived_log_space = BlockBloomFilter::MinLogSpace(given_ndv, fpp);
+      const size_t derived_ndv = BlockBloomFilter::MaxNdv(derived_log_space, fpp);
+
+      // The max ndv is close to, but larger than, then ndv we asked for
+      EXPECT_LE(given_ndv, derived_ndv) << "fpp: " << fpp
+                                        << " derived_log_space: " << derived_log_space;
+      EXPECT_GE(given_ndv * 2, derived_ndv)
+          << "fpp: " << fpp << " derived_log_space: " << derived_log_space;
+
+      // Changing the fpp changes the ndv capacity in the expected direction.
+      size_t new_derived_ndv = BlockBloomFilter::MaxNdv(derived_log_space, fpp / 2);
+      EXPECT_GE(derived_ndv, new_derived_ndv)
+          << "fpp: " << fpp << " derived_log_space: " << derived_log_space;
+      new_derived_ndv = BlockBloomFilter::MaxNdv(derived_log_space, fpp * 2);
+      EXPECT_LE(derived_ndv, new_derived_ndv)
+          << "fpp: " << fpp << " derived_log_space: " << derived_log_space;
+    }
+  }
+}
+
+TEST_F(BlockBloomFilterTest, MinSpaceEdgeCase) {
+  int min_space = BlockBloomFilter::MinLogSpace(1, 0.75);
+  EXPECT_GE(min_space, 0) << "LogSpace should always be >= 0";
+}
+
+// Check that MinLogSpace() and FalsePositiveProb() are dual
+TEST_F(BlockBloomFilterTest, MinSpaceForFpp) {
+  for (size_t ndv = 10000; ndv < 100 * 1000 * 1000; ndv *= 1.01) {
+    for (double fpp = 0.1; fpp > pow(2, -20); fpp *= 0.99) { // NOLINT: loop on double
+      // When contructing a Bloom filter, we can request a particular fpp by calling
+      // MinLogSpace().
+      const int min_log_space = BlockBloomFilter::MinLogSpace(ndv, fpp);
+      // However, at the resulting ndv and space, the expected fpp might be lower than
+      // the one that was requested.
+      double expected_fpp = BlockBloomFilter::FalsePositiveProb(ndv, min_log_space);
+      EXPECT_LE(expected_fpp, fpp);
+      // The fpp we get might be much lower than the one we asked for. However, if the
+      // space were just one size smaller, the fpp we get would be larger than the one we
+      // asked for.
+      expected_fpp = BlockBloomFilter::FalsePositiveProb(ndv, min_log_space - 1);
+      EXPECT_GE(expected_fpp, fpp);
+      // Therefore, the return value of MinLogSpace() is actually the minimum
+      // log space at which we can guarantee the requested fpp.
+    }
+  }
+}
+}  // namespace kudu
diff --git a/src/kudu/util/block_bloom_filter.cc b/src/kudu/util/block_bloom_filter.cc
new file mode 100644
index 0000000..a175abc
--- /dev/null
+++ b/src/kudu/util/block_bloom_filter.cc
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/block_bloom_filter.h"
+
+#include <emmintrin.h>
+#include <mm_malloc.h>
+
+#include <algorithm>
+#include <cmath>
+#include <cstdlib>
+#include <cstring>
+#include <ostream>
+
+#include <gflags/gflags.h>
+
+#include "kudu/gutil/singleton.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/flag_tags.h"
+
+DEFINE_bool(disable_blockbloomfilter_avx2, false,
+            "Disable AVX2 operations in BlockBloomFilter. This flag has no effect if the
target "
+            "CPU doesn't support AVX2 at run-time or BlockBloomFilter was built with a compiler
"
+            "that doesn't support AVX2.");
+TAG_FLAG(disable_blockbloomfilter_avx2, hidden);
+
+namespace kudu {
+
+constexpr uint32_t BlockBloomFilter::kRehash[8] __attribute__((aligned(32)));
+const base::CPU BlockBloomFilter::kCpu = base::CPU();
+
+BlockBloomFilter::BlockBloomFilter(BlockBloomFilterBufferAllocatorIf* buffer_allocator) :
+  always_false_(true),
+  buffer_allocator_(buffer_allocator),
+  log_num_buckets_(0),
+  directory_mask_(0),
+  directory_(nullptr) {
+#ifdef USE_AVX2
+  if (has_avx2()) {
+    bucket_insert_func_ptr_ = &BlockBloomFilter::BucketInsertAVX2;
+    bucket_find_func_ptr_ = &BlockBloomFilter::BucketFindAVX2;
+  } else {
+    bucket_insert_func_ptr_ = &BlockBloomFilter::BucketInsert;
+    bucket_find_func_ptr_ = &BlockBloomFilter::BucketFind;
+  }
+#else
+  bucket_insert_func_ptr_ = &BlockBloomFilter::BucketInsert;
+  bucket_find_func_ptr_ = &BlockBloomFilter::BucketFind;
+#endif
+}
+
+BlockBloomFilter::~BlockBloomFilter() {
+  DCHECK(directory_ == nullptr) <<
+    "Close() should have been called before the object is destroyed.";
+}
+
+Status BlockBloomFilter::Init(const int log_space_bytes) {
+  // Since log_space_bytes is in bytes, we need to convert it to the number of tiny
+  // Bloom filters we will use.
+  log_num_buckets_ = std::max(1, log_space_bytes - kLogBucketByteSize);
+  // Since we use 32 bits in the arguments of Insert() and Find(), log_num_buckets_
+  // must be limited.
+  if (log_num_buckets_ > 32) {
+    return Status::InvalidArgument(
+        strings::Substitute("Bloom filter too large. log_space_bytes: $0", log_space_bytes));
+  }
+  // Don't use log_num_buckets_ if it will lead to undefined behavior by a shift
+  // that is too large.
+  directory_mask_ = (1ULL << log_num_buckets_) - 1;
+
+  const size_t alloc_size = directory_size();
+  Close(); // Ensure that any previously allocated memory for directory_ is released.
+  RETURN_NOT_OK(buffer_allocator_->AllocateBuffer(alloc_size,
+                                                  reinterpret_cast<void**>(&directory_)));
+  memset(directory_, 0, alloc_size);
+  return Status::OK();
+}
+
+void BlockBloomFilter::Close() {
+  if (directory_ != nullptr) {
+    buffer_allocator_->FreeBuffer(directory_);
+    directory_ = nullptr;
+  }
+}
+
+ATTRIBUTE_NO_SANITIZE_INTEGER
+void BlockBloomFilter::BucketInsert(const uint32_t bucket_idx, const uint32_t hash) noexcept
{
+  // new_bucket will be all zeros except for eight 1-bits, one in each 32-bit word. It is
+  // 16-byte aligned so it can be read as a __m128i using aligned SIMD loads in the second
+  // part of this method.
+  uint32_t new_bucket[kBucketWords] __attribute__((aligned(16)));
+  for (int i = 0; i < kBucketWords; ++i) {
+    // Rehash 'hash' and use the top kLogBucketWordBits bits, following Dietzfelbinger.
+    new_bucket[i] = (kRehash[i] * hash) >> ((1 << kLogBucketWordBits) - kLogBucketWordBits);
+    new_bucket[i] = 1U << new_bucket[i];
+  }
+  for (int i = 0; i < 2; ++i) {
+    __m128i new_bucket_sse = _mm_load_si128(reinterpret_cast<__m128i*>(new_bucket +
4 * i));
+    __m128i* existing_bucket = reinterpret_cast<__m128i*>(&directory_[bucket_idx][4
* i]);
+    *existing_bucket = _mm_or_si128(*existing_bucket, new_bucket_sse);
+  }
+}
+
+ATTRIBUTE_NO_SANITIZE_INTEGER
+bool BlockBloomFilter::BucketFind(
+    const uint32_t bucket_idx, const uint32_t hash) const noexcept {
+  for (int i = 0; i < kBucketWords; ++i) {
+    BucketWord hval = (kRehash[i] * hash) >> ((1 << kLogBucketWordBits) - kLogBucketWordBits);
+    hval = 1U << hval;
+    if (!(directory_[bucket_idx][i] & hval)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+// The following three methods are derived from
+//
+// fpp = (1 - exp(-kBucketWords * ndv/space))^kBucketWords
+//
+// where space is in bits.
+size_t BlockBloomFilter::MaxNdv(const int log_space_bytes, const double fpp) {
+  DCHECK(log_space_bytes > 0 && log_space_bytes < 61);
+  DCHECK(0 < fpp && fpp < 1);
+  static const double ik = 1.0 / kBucketWords;
+  return -1 * ik * static_cast<double>(1ULL << (log_space_bytes + 3)) * log(1
- pow(fpp, ik));
+}
+
+int BlockBloomFilter::MinLogSpace(const size_t ndv, const double fpp) {
+  static const double k = kBucketWords;
+  if (0 == ndv) return 0;
+  // m is the number of bits we would need to get the fpp specified
+  const double m = -k * ndv / log(1 - pow(fpp, 1.0 / k));
+
+  // Handle case where ndv == 1 => ceil(log2(m/8)) < 0.
+  return std::max(0, static_cast<int>(ceil(log2(m / 8))));
+}
+
+double BlockBloomFilter::FalsePositiveProb(const size_t ndv, const int log_space_bytes) {
+  return pow(1 - exp((-1.0 * static_cast<double>(kBucketWords) * static_cast<double>(ndv))
+                     / static_cast<double>(1ULL << (log_space_bytes + 3))),
+             kBucketWords);
+}
+
+void BlockBloomFilter::InsertNoAvx2(const uint32_t hash) noexcept {
+  always_false_ = false;
+  const uint32_t bucket_idx = Rehash32to32(hash) & directory_mask_;
+  BucketInsert(bucket_idx, hash);
+}
+
+// To set 8 bits in an 32-byte Bloom filter, we set one bit in each 32-bit uint32_t. This
+// is a "split Bloom filter", and it has approximately the same false positive probability
+// as standard a Bloom filter; See Mitzenmacher's "Bloom Filters and Such". It also has
+// the advantage of requiring fewer random bits: log2(32) * 8 = 5 * 8 = 40 random bits for
+// a split Bloom filter, but log2(256) * 8 = 64 random bits for a standard Bloom filter.
+void BlockBloomFilter::Insert(const uint32_t hash) noexcept {
+  DCHECK_NOTNULL(directory_);
+  always_false_ = false;
+  const uint32_t bucket_idx = Rehash32to32(hash) & directory_mask_;
+  (this->*bucket_insert_func_ptr_)(bucket_idx, hash);
+}
+
+bool BlockBloomFilter::Find(const uint32_t hash) const noexcept {
+  if (always_false_) {
+    return false;
+  }
+  DCHECK_NOTNULL(directory_);
+  const uint32_t bucket_idx = Rehash32to32(hash) & directory_mask_;
+  return (this->*bucket_find_func_ptr_)(bucket_idx, hash);
+}
+
+Status DefaultBlockBloomFilterBufferAllocator::AllocateBuffer(size_t bytes, void** ptr) {
+  int ret_code = posix_memalign(ptr, CACHELINE_SIZE, bytes);
+  return ret_code == 0 ? Status::OK() :
+                         Status::RuntimeError(strings::Substitute("bad_alloc. bytes: $0",
bytes));
+}
+
+void DefaultBlockBloomFilterBufferAllocator::FreeBuffer(void* ptr) {
+  DCHECK_NOTNULL(ptr);
+  free(ptr);
+}
+
+DefaultBlockBloomFilterBufferAllocator* DefaultBlockBloomFilterBufferAllocator::GetSingleton()
{
+  return Singleton<DefaultBlockBloomFilterBufferAllocator>::get();
+}
+
+} // namespace kudu
diff --git a/src/kudu/util/block_bloom_filter.h b/src/kudu/util/block_bloom_filter.h
new file mode 100644
index 0000000..2ae7594
--- /dev/null
+++ b/src/kudu/util/block_bloom_filter.h
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <limits>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/cpu.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/status.h"
+
+DECLARE_bool(disable_blockbloomfilter_avx2);
+
+namespace kudu {
+
+// Forward declaration.
+class BlockBloomFilterBufferAllocatorIf;
+
+// Space and cache efficient block based BloomFilter that takes 32-bit hash as key.
+// For a simple BloomFilter that takes arbitrary datatype as key see BloomFilter in bloom_filter.h
+//
+// A BloomFilter stores sets of items and offers a query operation indicating whether or
+// not that item is in the set.  BloomFilters use much less space than other compact data
+// structures, but they are less accurate: for a small percentage of elements, the query
+// operation incorrectly returns true even when the item is not in the set.
+//
+// When talking about Bloom filter size, rather than talking about 'size', which might be
+// ambiguous, we distinguish two different quantities:
+//
+// 1. Space: the amount of memory used
+//
+// 2. NDV: the number of unique items that have been inserted
+//
+// BlockBloomFilter is implemented using block Bloom filters from Putze et al.'s "Cache-,
+// Hash- and Space-Efficient Bloom Filters". The basic idea is to hash the item to a tiny
+// Bloom filter the size of a single cache line or smaller. This implementation sets 8
+// bits in each tiny Bloom filter. This provides a false positive rate near optimal for
+// between 5 and 15 bits per distinct value, which corresponds to false positive
+// probabilities between 0.1% (for 15 bits) and 10% (for 5 bits).
+//
+// Our tiny BloomFilters are 32 bytes to take advantage of 32-byte SIMD in newer Intel
+// machines.
+class BlockBloomFilter {
+ public:
+  explicit BlockBloomFilter(BlockBloomFilterBufferAllocatorIf* buffer_allocator);
+  ~BlockBloomFilter();
+
+  // Reset the filter state, allocate/reallocate the internal data structures.
+  // All calls to Insert() and Find() should only be done between the calls to Init() and
+  // Close().Init and Close are safe to call multiple times.
+  Status Init(int log_space_bytes);
+  void Close();
+
+  // Representation of a filter which allows all elements to pass.
+  static constexpr BlockBloomFilter* const kAlwaysTrueFilter = nullptr;
+
+  bool AlwaysFalse() const { return always_false_; }
+
+  // Adds an element to the BloomFilter. The function used to generate 'hash' need not
+  // have good uniformity, but it should have low collision probability. For instance, if
+  // the set of values is 32-bit ints, the identity function is a valid hash function for
+  // this Bloom filter, since the collision probability (the probability that two
+  // non-equal values will have the same hash value) is 0.
+  void Insert(uint32_t hash) noexcept;
+
+  // Finds an element in the BloomFilter, returning true if it is found and false (with
+  // high probability) if it is not.
+  bool Find(uint32_t hash) const noexcept;
+
+  // As more distinct items are inserted into a BloomFilter, the false positive rate
+  // rises. MaxNdv() returns the NDV (number of distinct values) at which a BloomFilter
+  // constructed with (1 << log_space_bytes) bytes of space hits false positive
+  // probability fpp.
+  static size_t MaxNdv(int log_space_bytes, double fpp);
+
+  // If we expect to fill a Bloom filter with 'ndv' different unique elements and we
+  // want a false positive probability of less than 'fpp', then this is the log (base 2)
+  // of the minimum number of bytes we need.
+  static int MinLogSpace(size_t ndv, double fpp);
+
+  // Returns the expected false positive rate for the given ndv and log_space_bytes.
+  static double FalsePositiveProb(size_t ndv, int log_space_bytes);
+
+  // Returns amount of space used, in bytes.
+  int64_t GetSpaceUsed() const { return sizeof(Bucket) * (1LL << log_num_buckets_);
}
+
+  static int64_t GetExpectedMemoryUsed(uint32_t log_heap_size) {
+    DCHECK_GE(log_heap_size, kLogBucketWordBits);
+    return sizeof(Bucket) * (1LL << std::max<int>(1, log_heap_size - kLogBucketWordBits));
+  }
+
+ private:
+  // always_false_ is true when the bloom filter hasn't had any elements inserted.
+  bool always_false_;
+
+  static const base::CPU kCpu;
+
+  // The BloomFilter is divided up into Buckets and each Bucket comprises of 8 BucketWords
of
+  // 4 bytes each.
+  static constexpr uint64_t kBucketWords = 8;
+  typedef uint32_t BucketWord;
+
+  // log2(number of bits in a BucketWord)
+  static constexpr int kLogBucketWordBits = 5;
+  static constexpr BucketWord kBucketWordMask = (1 << kLogBucketWordBits) - 1;
+
+  // log2(number of bytes in a bucket)
+  static constexpr int kLogBucketByteSize = 5;
+
+  static_assert((1 << kLogBucketWordBits) == std::numeric_limits<BucketWord>::digits,
+      "BucketWord must have a bit-width that is be a power of 2, like 64 for uint64_t.");
+
+  typedef BucketWord Bucket[kBucketWords];
+
+  BlockBloomFilterBufferAllocatorIf* buffer_allocator_;
+
+  // log_num_buckets_ is the log (base 2) of the number of buckets in the directory.
+  int log_num_buckets_;
+
+  // directory_mask_ is (1 << log_num_buckets_) - 1. It is precomputed for
+  // efficiency reasons.
+  uint32_t directory_mask_;
+
+  Bucket* directory_;
+
+  // Same as Insert(), but skips the CPU check and assumes that AVX is not available.
+  void InsertNoAvx2(uint32_t hash) noexcept;
+
+  // Does the actual work of Insert(). bucket_idx is the index of the bucket to insert
+  // into and 'hash' is the value passed to Insert().
+  void BucketInsert(uint32_t bucket_idx, uint32_t hash) noexcept;
+
+  bool BucketFind(uint32_t bucket_idx, uint32_t hash) const noexcept;
+
+#ifdef USE_AVX2
+  // Same as Insert(), but skips the CPU check and assumes that AVX is available.
+  void InsertAvx2(uint32_t hash) noexcept __attribute__((__target__("avx2")));
+
+  // A faster SIMD version of BucketInsert().
+  void BucketInsertAVX2(uint32_t bucket_idx, uint32_t hash) noexcept
+      __attribute__((__target__("avx2")));
+
+  // A faster SIMD version of BucketFind().
+  bool BucketFindAVX2(uint32_t bucket_idx, uint32_t hash) const noexcept
+      __attribute__((__target__("avx2")));
+#endif
+
+  // Function pointers initialized in constructor to avoid run-time cost
+  // in hot-path of Find and Insert operations.
+  decltype(&BlockBloomFilter::BucketInsert) bucket_insert_func_ptr_;
+  decltype(&BlockBloomFilter::BucketFind) bucket_find_func_ptr_;
+
+  int64_t directory_size() const {
+    return 1ULL << (log_num_buckets_ + kLogBucketByteSize);
+  }
+
+  // Detect at run-time whether CPU supports AVX2
+  static bool has_avx2() {
+    return !FLAGS_disable_blockbloomfilter_avx2 && kCpu.has_avx2();
+  }
+
+  // Some constants used in hashing. #defined for efficiency reasons.
+#define BLOOM_HASH_CONSTANTS                                             \
+  0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU, 0x705495c7U, 0x2df1424bU, \
+      0x9efc4947U, 0x5c6bfb31U
+
+  // kRehash is used as 8 odd 32-bit unsigned ints.  See Dietzfelbinger et al.'s "A
+  // reliable randomized algorithm for the closest-pair problem".
+  static constexpr uint32_t kRehash[8]
+      __attribute__((aligned(32))) = {BLOOM_HASH_CONSTANTS};
+
+  // Get 32 more bits of randomness from a 32-bit hash:
+  ATTRIBUTE_NO_SANITIZE_INTEGER
+  static inline uint32_t Rehash32to32(const uint32_t hash) {
+    // Constants generated by uuidgen(1) with the -r flag
+    static constexpr uint64_t m = 0x7850f11ec6d14889ULL;
+    static constexpr uint64_t a = 0x6773610597ca4c63ULL;
+    // This is strongly universal hashing following Dietzfelbinger's "Universal hashing
+    // and k-wise independent random variables via integer arithmetic without primes". As
+    // such, for any two distinct uint32_t's hash1 and hash2, the probability (over the
+    // randomness of the constants) that any subset of bit positions of
+    // Rehash32to32(hash1) is equal to the same subset of bit positions
+    // Rehash32to32(hash2) is minimal.
+    return (static_cast<uint64_t>(hash) * m + a) >> 32U;
+  }
+
+  DISALLOW_COPY_AND_ASSIGN(BlockBloomFilter);
+};
+
+// Generic interface to allocate and de-allocate memory for the BlockBloomFilter.
+class BlockBloomFilterBufferAllocatorIf {
+ public:
+  virtual Status AllocateBuffer(size_t bytes, void** ptr) = 0;
+  virtual void FreeBuffer(void* ptr) = 0;
+};
+
+class DefaultBlockBloomFilterBufferAllocator : public BlockBloomFilterBufferAllocatorIf {
+ public:
+  // Required for Singleton.
+  DefaultBlockBloomFilterBufferAllocator() = default;
+
+  Status AllocateBuffer(size_t bytes, void** ptr) override;
+  void FreeBuffer(void* ptr) override;
+
+  static DefaultBlockBloomFilterBufferAllocator* GetSingleton();
+ private:
+  DISALLOW_COPY_AND_ASSIGN(DefaultBlockBloomFilterBufferAllocator);
+};
+
+}  // namespace kudu
diff --git a/src/kudu/util/block_bloom_filter_avx2.cc b/src/kudu/util/block_bloom_filter_avx2.cc
new file mode 100644
index 0000000..e10b6cc
--- /dev/null
+++ b/src/kudu/util/block_bloom_filter_avx2.cc
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This file is conditionally compiled if compiler supports AVX2.
+// However the tidy bot appears to compile this file regardless and does not define the USE_AVX2
+// macro raising incorrect errors.
+#if defined(CLANG_TIDY)
+#define USE_AVX2 1
+#endif
+
+#include "kudu/util/block_bloom_filter.h"
+
+#include <cstdint>
+#include <immintrin.h>
+
+#include "kudu/gutil/port.h"
+
+namespace kudu {
+
+// A static helper function for the AVX2 methods. Turns a 32-bit hash into a 256-bit Bucket
+// with 1 single 1-bit set in each 32-bit lane.
+static inline ATTRIBUTE_ALWAYS_INLINE __attribute__((__target__("avx2"))) __m256i MakeMask(
+    const uint32_t hash) {
+  const __m256i ones = _mm256_set1_epi32(1);
+  const __m256i rehash = _mm256_setr_epi32(BLOOM_HASH_CONSTANTS);
+  // Load hash into a YMM register, repeated eight times
+  __m256i hash_data = _mm256_set1_epi32(hash);
+  // Multiply-shift hashing ala Dietzfelbinger et al.: multiply 'hash' by eight different
+  // odd constants, then keep the 5 most significant bits from each product.
+  hash_data = _mm256_mullo_epi32(rehash, hash_data);
+  hash_data = _mm256_srli_epi32(hash_data, 27);
+  // Use these 5 bits to shift a single bit to a location in each 32-bit lane
+  return _mm256_sllv_epi32(ones, hash_data);
+}
+
+void BlockBloomFilter::BucketInsertAVX2(const uint32_t bucket_idx, const uint32_t hash) noexcept
{
+  const __m256i mask = MakeMask(hash);
+  __m256i* const bucket = &(reinterpret_cast<__m256i*>(directory_)[bucket_idx]);
+  _mm256_store_si256(bucket, _mm256_or_si256(*bucket, mask));
+  // For SSE compatibility, unset the high bits of each YMM register so SSE instructions
+  // dont have to save them off before using XMM registers.
+  _mm256_zeroupper();
+}
+
+bool BlockBloomFilter::BucketFindAVX2(const uint32_t bucket_idx, const uint32_t hash) const
+    noexcept {
+  const __m256i mask = MakeMask(hash);
+  const __m256i bucket = reinterpret_cast<__m256i*>(directory_)[bucket_idx];
+  // We should return true if 'bucket' has a one wherever 'mask' does. _mm256_testc_si256
+  // takes the negation of its first argument and ands that with its second argument. In
+  // our case, the result is zero everywhere iff there is a one in 'bucket' wherever
+  // 'mask' is one. testc returns 1 if the result is 0 everywhere and returns 0 otherwise.
+  const bool result = _mm256_testc_si256(bucket, mask);
+  _mm256_zeroupper();
+  return result;
+}
+
+void BlockBloomFilter::InsertAvx2(const uint32_t hash) noexcept {
+  always_false_ = false;
+  const uint32_t bucket_idx = Rehash32to32(hash) & directory_mask_;
+  BucketInsertAVX2(bucket_idx, hash);
+}
+
+} // namespace kudu
diff --git a/src/kudu/util/bloom_filter.h b/src/kudu/util/bloom_filter.h
index d395646..30cd64c 100644
--- a/src/kudu/util/bloom_filter.h
+++ b/src/kudu/util/bloom_filter.h
@@ -31,6 +31,10 @@
 
 namespace kudu {
 
+// A simple BloomFilter that takes arbitrary datatype as key.
+// For a space and cache efficient block based BloomFilter that takes 32-bit hash as key
see
+// BlockBloomFilter in block_bloom_filter.h
+
 // Probe calculated from a given key. This caches the calculated
 // hash values which are necessary for probing into a Bloom Filter,
 // so that when many bloom filters have to be consulted for a given


Mime
View raw message