Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D98D8200B70 for ; Fri, 12 Aug 2016 11:15:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D8738160AB0; Fri, 12 Aug 2016 09:15:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6A5CD160ABD for ; Fri, 12 Aug 2016 11:15:45 +0200 (CEST) Received: (qmail 41664 invoked by uid 500); 12 Aug 2016 09:09:31 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 41512 invoked by uid 99); 12 Aug 2016 09:09:28 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Aug 2016 09:09:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 4E099185892 for ; Fri, 12 Aug 2016 09:09:28 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id g2qOKrBuNC8Q for ; Fri, 12 Aug 2016 09:09:24 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id A9FB05FB8B for ; Fri, 12 Aug 2016 09:09:23 +0000 (UTC) Received: (qmail 52917 invoked by uid 99); 12 Aug 2016 08:46:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Aug 2016 08:46:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E9283E055E; Fri, 12 Aug 2016 08:46:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianqiao@apache.org To: commits@quickstep.incubator.apache.org Date: Fri, 12 Aug 2016 08:46:01 -0000 Message-Id: In-Reply-To: <2a49ff5f5a29452fa3148dc826fcc86e@git.apache.org> References: <2a49ff5f5a29452fa3148dc826fcc86e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-quickstep git commit: Atomic build bloom filters archived-at: Fri, 12 Aug 2016 09:15:47 -0000 Atomic build bloom filters Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/82e7efd9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/82e7efd9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/82e7efd9 Branch: refs/heads/LIP-for-tpch Commit: 82e7efd939ed1f2a478389e6f83f38f59585ef81 Parents: 50c650b Author: Jianqiao Zhu Authored: Fri Aug 12 03:26:29 2016 -0500 Committer: Jianqiao Zhu Committed: Fri Aug 12 03:26:29 2016 -0500 ---------------------------------------------------------------------- relational_operators/BuildHashOperator.cpp | 2 +- storage/BloomFilterIndexSubBlock.cpp | 75 ++++++----- storage/HashTable.hpp | 67 +++------- utility/BloomFilter.hpp | 170 +++++++++--------------- 4 files changed, 119 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/82e7efd9/relational_operators/BuildHashOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildHashOperator.cpp b/relational_operators/BuildHashOperator.cpp index b4e20e4..d9d2db2 100644 --- a/relational_operators/BuildHashOperator.cpp +++ b/relational_operators/BuildHashOperator.cpp @@ -144,7 +144,7 @@ serialization::WorkOrder* BuildHashOperator::createWorkOrderProto(const block_id } void BuildHashOperator::actionOnCompletion() { - hash_table_->finalizeBuildSideThreadLocalBloomFilters(); +// hash_table_->finalizeBuildSideThreadLocalBloomFilters(); } void BuildHashWorkOrder::execute() { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/82e7efd9/storage/BloomFilterIndexSubBlock.cpp ---------------------------------------------------------------------- diff --git a/storage/BloomFilterIndexSubBlock.cpp b/storage/BloomFilterIndexSubBlock.cpp index 6ba466e..fc3fb90 100644 --- a/storage/BloomFilterIndexSubBlock.cpp +++ b/storage/BloomFilterIndexSubBlock.cpp @@ -62,29 +62,30 @@ BloomFilterIndexSubBlock::BloomFilterIndexSubBlock(const TupleStorageSubBlock &t CHECK(DescriptionIsValid(relation_, description_)) << "Attempted to construct an BloomFilterIndexSubBlock from an invalid description."; - // Store the attribute ids that are being indexed. - indexed_attribute_ids_.reserve(description.indexed_attribute_ids_size()); - for (int i = 0; i < description.indexed_attribute_ids_size(); ++i) { - indexed_attribute_ids_.push_back(description.indexed_attribute_ids(i)); - } - - // Make the bit_array_ point to sub_block_memory. - bit_array_.reset(static_cast(sub_block_memory)); - - bool is_bloom_filter_initialized = !is_new_block; - const std::uint32_t salt_count = description.GetExtension(BloomFilterIndexSubBlockDescription::number_of_hashes); - - // Initialize the bloom_filter_ data structure to operate on bit_array. - bloom_filter_.reset(new BloomFilter(salt_count, - bit_array_size_in_bytes_, - bit_array_.get(), - is_bloom_filter_initialized)); - is_initialized_ = true; - is_consistent_ = true; +// // Store the attribute ids that are being indexed. +// indexed_attribute_ids_.reserve(description.indexed_attribute_ids_size()); +// for (int i = 0; i < description.indexed_attribute_ids_size(); ++i) { +// indexed_attribute_ids_.push_back(description.indexed_attribute_ids(i)); +// } +// +// // Make the bit_array_ point to sub_block_memory. +// bit_array_.reset(static_cast(sub_block_memory)); +// +// bool is_bloom_filter_initialized = !is_new_block; +// const std::uint32_t salt_count = description.GetExtension(BloomFilterIndexSubBlockDescription::number_of_hashes); +// +// // Initialize the bloom_filter_ data structure to operate on bit_array. +// bloom_filter_.reset(new BloomFilter(salt_count, +// bit_array_size_in_bytes_, +// bit_array_.get(), +// is_bloom_filter_initialized)); +// is_initialized_ = true; +// is_consistent_ = true; + LOG(FATAL) << "BloomFilterIndexSubBlock disabled"; } BloomFilterIndexSubBlock::~BloomFilterIndexSubBlock() { - bit_array_.release(); // De-allocation of bit_array_ is handled by StorageBlock. +// bit_array_.release(); // De-allocation of bit_array_ is handled by StorageBlock. } bool BloomFilterIndexSubBlock::DescriptionIsValid(const CatalogRelationSchema &relation, @@ -258,22 +259,24 @@ BloomFilterIndexSubBlock::BloomFilterSelectivity } bool BloomFilterIndexSubBlock::rebuild() { - DCHECK(is_initialized_); - bloom_filter_->reset(); - bool didSucceed = true; - if (tuple_store_.isPacked()) { - for (tuple_id tid = 0; didSucceed && tid <= tuple_store_.getMaxTupleID(); ++tid) { - didSucceed = addEntry(tid); - } - } else { - for (tuple_id tid = 0; didSucceed && tid <= tuple_store_.getMaxTupleID(); ++tid) { - if (tuple_store_.hasTupleWithID(tid)) { - didSucceed = addEntry(tid); - } - } - } - is_consistent_ = true; - return didSucceed; +// DCHECK(is_initialized_); +// bloom_filter_->reset(); +// bool didSucceed = true; +// if (tuple_store_.isPacked()) { +// for (tuple_id tid = 0; didSucceed && tid <= tuple_store_.getMaxTupleID(); ++tid) { +// didSucceed = addEntry(tid); +// } +// } else { +// for (tuple_id tid = 0; didSucceed && tid <= tuple_store_.getMaxTupleID(); ++tid) { +// if (tuple_store_.hasTupleWithID(tid)) { +// didSucceed = addEntry(tid); +// } +// } +// } +// is_consistent_ = true; +// return didSucceed; + LOG(FATAL) << "BloomFilterIndexSubBlock disabled"; + return false; } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/82e7efd9/storage/HashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp index 1caa183..9c50fc7 100644 --- a/storage/HashTable.hpp +++ b/storage/HashTable.hpp @@ -38,7 +38,6 @@ #include "storage/TupleReference.hpp" #include "storage/ValueAccessor.hpp" #include "storage/ValueAccessorUtil.hpp" -#include "threading/Mutex.hpp" #include "threading/SpinSharedMutex.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" @@ -1031,16 +1030,16 @@ class HashTable : public HashTableBasebitwiseOr( - thread_local_bf_pair.second[i].get()); - } - } - } - } +// inline void finalizeBuildSideThreadLocalBloomFilters() { +// if (has_build_side_bloom_filter_) { +// for (const auto &thread_local_bf_pair : thread_local_bloom_filters_) { +// for (std::size_t i = 0; i < build_bloom_filters_.size(); ++i) { +// build_bloom_filters_[i]->bitwiseOr( +// thread_local_bf_pair.second[i].get()); +// } +// } +// } +// } /** * @brief This function adds a pointer to the list of bloom filters to be @@ -1352,8 +1351,6 @@ class HashTable : public HashTableBase build_bloom_filters_; - std::map>> thread_local_bloom_filters_; - Mutex bloom_filter_mutex_; std::vector build_attribute_ids_; std::vector probe_bloom_filters_; std::vector probe_attribute_ids_; @@ -1503,20 +1500,9 @@ HashTablePutResult HashTable(build_bloom_filters_[0]->getNumberOfHashes(), - build_bloom_filters_[0]->getBitArraySize())); - thread_local_bloom_filter = bf_vector[0].get(); - } else { - thread_local_bloom_filter = bf_it->second[0].get(); - } + bloom_filter = build_bloom_filters_[0]; } if (resizable) { @@ -1536,8 +1522,8 @@ HashTablePutResult HashTableinsertUnSafe(static_cast(key.getDataPtr()), - key.getDataSize()); + bloom_filter->insert(static_cast(key.getDataPtr()), + key.getDataSize()); } if (result == HashTablePutResult::kDuplicateKey) { DEBUG_ASSERT(!using_prealloc); @@ -1566,8 +1552,8 @@ HashTablePutResult HashTableinsertUnSafe(static_cast(key.getDataPtr()), - key.getDataSize()); + bloom_filter->insert(static_cast(key.getDataPtr()), + key.getDataSize()); } if (result != HashTablePutResult::kOK) { return result; @@ -1641,30 +1627,13 @@ HashTablePutResult HashTable> *thread_local_bf_vector; - { - MutexLock lock(bloom_filter_mutex_); - auto bf_it = thread_local_bloom_filters_.find(tid); - if (bf_it == thread_local_bloom_filters_.end()) { - thread_local_bf_vector = &thread_local_bloom_filters_[tid]; - for (const auto &build_side_bf : build_bloom_filters_) { - thread_local_bf_vector->emplace_back( - std::make_unique(build_side_bf->getNumberOfHashes(), - build_side_bf->getBitArraySize())); - } - } else { - thread_local_bf_vector = &bf_it->second; - } - } - for (std::size_t i = 0; i < build_bloom_filters_.size(); ++i) { const auto &build_attr = build_attribute_ids_[i]; - BloomFilter *thread_local_bloom_filter = (*thread_local_bf_vector)[i].get(); + BloomFilter *bloom_filter = build_bloom_filters_[i]; const std::size_t attr_size = accessor->template getUntypedValueAndByteLengthAtAbsolutePosition(0, build_attr).second; while (accessor->next()) { - thread_local_bloom_filter->insertUnSafe( + bloom_filter->insert( static_cast(accessor->getUntypedValue(build_attr)), attr_size); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/82e7efd9/utility/BloomFilter.hpp ---------------------------------------------------------------------- diff --git a/utility/BloomFilter.hpp b/utility/BloomFilter.hpp index 749d33a..5b87c2d 100644 --- a/utility/BloomFilter.hpp +++ b/utility/BloomFilter.hpp @@ -21,6 +21,7 @@ #define QUICKSTEP_UTILITY_BLOOM_FILTER_HPP #include +#include #include #include #include @@ -100,36 +101,13 @@ class BloomFilterBlocked { * @param bit_array_size_in_bytes Size of the bit array. **/ BloomFilterBlocked(const std::uint8_t hash_fn_count, - const std::uint64_t bit_array_size_in_bytes) + const std::uint64_t bit_array_size_in_bytes) : hash_fn_count_(hash_fn_count), array_size_in_bytes_(getNearestAllowedSize(bit_array_size_in_bytes)), - is_bit_array_owner_(true), - bit_array_(new std::uint8_t[array_size_in_bytes_]) { - reset(); - } - - /** - * @brief Constructor. - * @note When a bit_array is passed as an argument to the constructor, - * then the ownership of the bit array lies with the caller. - * - * @param hash_fn_count The number of hash functions used by this bloom filter. - * @param bit_array_size_in_bytes Size of the bit array. - * @param bit_array A pointer to the memory region that is used to store bit array. - * @param is_initialized A boolean that indicates whether to zero-out the region - * before use or not. - **/ - BloomFilterBlocked(const std::uint8_t hash_fn_count, - const std::uint64_t bit_array_size_in_bytes, - std::uint8_t *bit_array, - const bool is_initialized) - : hash_fn_count_(hash_fn_count), - array_size_in_bytes_(getNearestAllowedSize(bit_array_size_in_bytes, true)), - is_bit_array_owner_(false), - bit_array_(bit_array) { // Owned by the calling method. - if (!is_initialized) { - reset(); - } + bit_array_(bit_array_size_in_bytes) { + std::memset(bit_array_.data(), + 0x0, + sizeof(std::atomic) * bit_array_size_in_bytes); } /** @@ -141,22 +119,14 @@ class BloomFilterBlocked { * bloom filter configuration. **/ explicit BloomFilterBlocked(const serialization::BloomFilter &bloom_filter_proto) - : hash_fn_count_(bloom_filter_proto.number_of_hashes()), - array_size_in_bytes_(bloom_filter_proto.bloom_filter_size()), - is_bit_array_owner_(true), - bit_array_(new std::uint8_t[array_size_in_bytes_]) { - reset(); + : BloomFilterBlocked(bloom_filter_proto.number_of_hashes(), + bloom_filter_proto.bloom_filter_size()) { } /** * @brief Destructor. **/ ~BloomFilterBlocked() { - if (is_bit_array_owner_) { - bit_array_.reset(); - } else { - bit_array_.release(); - } } static bool ProtoIsValid(const serialization::BloomFilter &bloom_filter_proto) { @@ -164,15 +134,6 @@ class BloomFilterBlocked { } /** - * @brief Zeros out the contents of the bit array. - **/ - inline void reset() { - // Initialize the bit_array with all zeros. - std::fill_n(bit_array_.get(), array_size_in_bytes_, 0x00); - inserted_element_count_ = 0; - } - - /** * @brief Get the number of hash functions used in this bloom filter. * * @return Returns the number of hash functions. @@ -190,18 +151,9 @@ class BloomFilterBlocked { return array_size_in_bytes_; } - /** - * @brief Get the constant pointer to the bit array for this bloom filter - * - * @return Returns constant pointer to the bit array. - **/ - inline const std::uint8_t* getBitArray() const { - return bit_array_.get(); - } - template - void insert(const T &value) { - insert(reinterpret_cast(&value), sizeof(T)); + inline void insert(const T &value) { + insert(reinterpret_cast(&value), sizeof(T)); } /** @@ -211,14 +163,14 @@ class BloomFilterBlocked { * @param length Size of the value being inserted in bytes. */ inline void insert(const std::uint8_t *key_begin, const std::size_t length) { - SpinSharedMutexExclusiveLock exclusive_writer_lock(bloom_filter_insert_mutex_); - insertUnSafe(key_begin, length); + const std::uint32_t pos = hash_identity(key_begin, length); + bit_array_[pos >> 3].fetch_or(1 << (pos & 0x7), std::memory_order_relaxed); } - template - void insertUnSafe(const T &value) { - insertUnSafe(reinterpret_cast(&value), sizeof(T)); - } +// template +// void insertUnSafe(const T &value) { +// insertUnSafe(reinterpret_cast(&value), sizeof(T)); +// } /** * @brief Inserts a given value into the bloom filter. @@ -228,16 +180,16 @@ class BloomFilterBlocked { * @param key_begin A pointer to the value being inserted. * @param length Size of the value being inserted in bytes. */ - inline void insertUnSafe(const std::uint8_t *key_begin, const std::size_t length) { - Position first_pos = getFirstPosition(key_begin, length); - setBitAtPosition(first_pos); - Position other_pos; - for (std::uint8_t i = 1; i bool contains(const T &value) { @@ -258,18 +210,20 @@ class BloomFilterBlocked { inline bool contains( const std::uint8_t *__restrict__ key_begin, const std::size_t length) const { - Position first_pos = getFirstPosition(key_begin, length); - if (!getBitAtPosition(first_pos)) { - return false; - } - Position other_pos; - for (std::uint8_t i = 1; i < hash_fn_count_; ++i) { - other_pos = getOtherPosition(key_begin, length, first_pos, i); - if (!getBitAtPosition(other_pos)) { - return false; - } - } - return true; +// Position first_pos = getFirstPosition(key_begin, length); +// if (!getBitAtPosition(first_pos)) { +// return false; +// } +// Position other_pos; +// for (std::uint8_t i = 1; i < hash_fn_count_; ++i) { +// other_pos = getOtherPosition(key_begin, length, first_pos, i); +// if (!getBitAtPosition(other_pos)) { +// return false; +// } +// } +// return true; + const std::uint32_t pos = hash_identity(key_begin, length); + return ((bit_array_[pos >> 3].load(std::memory_order_relaxed) & (1 << (pos & 0x7))) > 0); } /** @@ -278,21 +232,21 @@ class BloomFilterBlocked { * * @param bloom_filter A const pointer to the bloom filter object to do bitwise-OR with. */ - inline void bitwiseOr(const BloomFilterBlocked *bloom_filter) { - SpinSharedMutexExclusiveLock exclusive_writer_lock(bloom_filter_insert_mutex_); - for (std::size_t byte_index = 0; byte_index < bloom_filter->getBitArraySize(); ++byte_index) { - (bit_array_.get())[byte_index] |= bloom_filter->getBitArray()[byte_index]; - } - } +// inline void bitwiseOr(const BloomFilterBlocked *bloom_filter) { +// SpinSharedMutexExclusiveLock exclusive_writer_lock(bloom_filter_insert_mutex_); +// for (std::size_t byte_index = 0; byte_index < bloom_filter->getBitArraySize(); ++byte_index) { +// (bit_array_.get())[byte_index] |= bloom_filter->getBitArray()[byte_index]; +// } +// } /** * @brief Return the number of elements currently inserted into bloom filter. * * @return The number of elements inserted into bloom filter. **/ - inline std::uint32_t element_count() const { - return inserted_element_count_; - } +// inline std::uint32_t element_count() const { +// return inserted_element_count_; +// } protected: Position getFirstPosition(const std::uint8_t *begin, std::size_t length) const { @@ -325,23 +279,23 @@ class BloomFilterBlocked { } } - void setBitAtPosition(const Position &pos) { - (bit_array_.get())[pos.byte_pos.byte_num] |= (1 << pos.byte_pos.index_in_byte); - } - - bool getBitAtPosition(const Position &pos) const { - return (bit_array_.get())[pos.byte_pos.byte_num] & (1 << pos.byte_pos.index_in_byte); - } +// inline void setBitAtPosition(const Position &pos) { +// (bit_array_.get())[pos.byte_pos.byte_num] |= (1 << pos.byte_pos.index_in_byte); +// } +// +// inline bool getBitAtPosition(const Position &pos) const { +// return (bit_array_.get())[pos.byte_pos.byte_num] & (1 << pos.byte_pos.index_in_byte); +// } inline std::uint32_t hash_identity( const std::uint8_t *__restrict__ begin, - std::size_t length) const { + const std::size_t length) const { std::uint32_t hash; if (length >= 4) - hash = *reinterpret_cast (begin); + hash = *reinterpret_cast(begin); else std::memcpy(&hash, begin, length); - return hash % (array_size_in_bytes_ * kNumBitsPerByte); + return hash % (array_size_in_bytes_ << 3); } inline std::uint32_t hash_multiplicative( @@ -368,8 +322,7 @@ class BloomFilterBlocked { private: const std::uint32_t hash_fn_count_; const std::uint64_t array_size_in_bytes_; - std::uint32_t inserted_element_count_; - const bool is_bit_array_owner_; +// std::uint32_t inserted_element_count_; static constexpr std::uint64_t kKnuthGoldenRatioNumber = 2654435761; const std::uint64_t hash_fn_[kMaxNumHashFns] = { // hash_fn_[i] is 2**(i+1) - 1 @@ -383,8 +336,7 @@ class BloomFilterBlocked { // 0x1fffffff * kKnuthGoldenRatioNumber // 0x3fffffff, 0x7fffffff, 0xffffffff }; - alignas(kCacheLineBytes) std::unique_ptr bit_array_; - alignas(kCacheLineBytes) mutable SpinSharedMutex bloom_filter_insert_mutex_; + alignas(kCacheLineBytes) std::vector> bit_array_; DISALLOW_COPY_AND_ASSIGN(BloomFilterBlocked); };