Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0D23C200C24 for ; Wed, 8 Feb 2017 09:54:03 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0BC08160B5A; Wed, 8 Feb 2017 08:54:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9F524160B4E for ; Wed, 8 Feb 2017 09:54:00 +0100 (CET) Received: (qmail 21911 invoked by uid 500); 8 Feb 2017 08:53:59 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 21902 invoked by uid 99); 8 Feb 2017 08:53:59 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2017 08:53:59 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 43CCF18238F for ; Wed, 8 Feb 2017 08:53:59 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 5RMlzsm4MEwu for ; Wed, 8 Feb 2017 08:53:48 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id DE8215FCE7 for ; Wed, 8 Feb 2017 08:53:45 +0000 (UTC) Received: (qmail 20703 invoked by uid 99); 8 Feb 2017 08:53:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2017 08:53:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BD084DFBDB; Wed, 8 Feb 2017 08:53:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Wed, 08 Feb 2017 08:53:48 -0000 Message-Id: <0706ec860cc84f998c2d0382e4496df9@git.apache.org> In-Reply-To: <638ec293d28b41a0ae0a9757b24c2dfc@git.apache.org> References: <638ec293d28b41a0ae0a9757b24c2dfc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/18] incubator-quickstep git commit: - Adds CollisionFreeVectorTable to support specialized fast path aggregation for range-bounded single integer group-by key. - Supports copy elision for aggregation. archived-at: Wed, 08 Feb 2017 08:54:03 -0000 http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/PackedPayloadHashTable.cpp ---------------------------------------------------------------------- diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp new file mode 100644 index 0000000..bf5eaee --- /dev/null +++ b/storage/PackedPayloadHashTable.cpp @@ -0,0 +1,463 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "storage/PackedPayloadHashTable.hpp" + +#include +#include +#include +#include +#include + +#include "expressions/aggregation/AggregationHandle.hpp" +#include "storage/HashTableKeyManager.hpp" +#include "storage/StorageBlob.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageConstants.hpp" +#include "storage/StorageManager.hpp" +#include "storage/ValueAccessor.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" +#include "threading/SpinMutex.hpp" +#include "threading/SpinSharedMutex.hpp" +#include "types/Type.hpp" +#include "types/containers/ColumnVectorsValueAccessor.hpp" +#include "utility/Alignment.hpp" +#include "utility/Macros.hpp" +#include "utility/PrimeNumber.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +PackedPayloadHashTable::PackedPayloadHashTable( + const std::vector &key_types, + const std::size_t num_entries, + const std::vector &handles, + StorageManager *storage_manager) + : key_types_(key_types), + num_handles_(handles.size()), + handles_(handles), + total_payload_size_(ComputeTotalPayloadSize(handles)), + storage_manager_(storage_manager), + kBucketAlignment(alignof(std::atomic)), + kValueOffset(sizeof(std::atomic) + sizeof(std::size_t)), + key_manager_(key_types_, kValueOffset + total_payload_size_), + bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) { + std::size_t payload_offset_running_sum = sizeof(SpinMutex); + for (const auto *handle : handles) { + payload_offsets_.emplace_back(payload_offset_running_sum); + payload_offset_running_sum += handle->getPayloadSize(); + } + + // NOTE(jianqiao): Potential memory leak / double freeing by copying from + // init_payload to buckets if payload contains out of line data. + init_payload_ = + static_cast(calloc(this->total_payload_size_, 1)); + DCHECK(init_payload_ != nullptr); + + for (std::size_t i = 0; i < num_handles_; ++i) { + handles_[i]->initPayload(init_payload_ + payload_offsets_[i]); + } + + // Bucket size always rounds up to the alignment requirement of the atomic + // size_t "next" pointer at the front or a ValueT, whichever is larger. + // + // Give base HashTable information about what key components are stored + // inline from 'key_manager_'. + setKeyInline(key_manager_.getKeyInline()); + + // Pick out a prime number of slots and calculate storage requirements. + std::size_t num_slots_tmp = + get_next_prime_number(num_entries * kHashTableLoadFactor); + std::size_t required_memory = + sizeof(Header) + num_slots_tmp * sizeof(std::atomic) + + (num_slots_tmp / kHashTableLoadFactor) * + (bucket_size_ + key_manager_.getEstimatedVariableKeySize()); + std::size_t num_storage_slots = + this->storage_manager_->SlotsNeededForBytes(required_memory); + if (num_storage_slots == 0) { + FATAL_ERROR( + "Storage requirement for SeparateChainingHashTable " + "exceeds maximum allocation size."); + } + + // Get a StorageBlob to hold the hash table. + const block_id blob_id = + this->storage_manager_->createBlob(num_storage_slots); + this->blob_ = this->storage_manager_->getBlobMutable(blob_id); + + void *aligned_memory_start = this->blob_->getMemoryMutable(); + std::size_t available_memory = num_storage_slots * kSlotSizeBytes; + if (align(alignof(Header), + sizeof(Header), + aligned_memory_start, + available_memory) == nullptr) { + // With current values from StorageConstants.hpp, this should be + // impossible. A blob is at least 1 MB, while a Header has alignment + // requirement of just kCacheLineBytes (64 bytes). + FATAL_ERROR( + "StorageBlob used to hold resizable " + "SeparateChainingHashTable is too small to meet alignment " + "requirements of SeparateChainingHashTable::Header."); + } else if (aligned_memory_start != this->blob_->getMemoryMutable()) { + // This should also be impossible, since the StorageManager allocates slots + // aligned to kCacheLineBytes. + DEV_WARNING("StorageBlob memory adjusted by " + << (num_storage_slots * kSlotSizeBytes - available_memory) + << " bytes to meet alignment requirement for " + << "SeparateChainingHashTable::Header."); + } + + // Locate the header. + header_ = static_cast
(aligned_memory_start); + aligned_memory_start = + static_cast(aligned_memory_start) + sizeof(Header); + available_memory -= sizeof(Header); + + // Recompute the number of slots & buckets using the actual available memory. + // Most likely, we got some extra free bucket space due to "rounding up" to + // the storage blob's size. It's also possible (though very unlikely) that we + // will wind up with fewer buckets than we initially wanted because of screwy + // alignment requirements for ValueT. + std::size_t num_buckets_tmp = + available_memory / + (kHashTableLoadFactor * sizeof(std::atomic) + bucket_size_ + + key_manager_.getEstimatedVariableKeySize()); + num_slots_tmp = + get_previous_prime_number(num_buckets_tmp * kHashTableLoadFactor); + num_buckets_tmp = num_slots_tmp / kHashTableLoadFactor; + DEBUG_ASSERT(num_slots_tmp > 0); + DEBUG_ASSERT(num_buckets_tmp > 0); + + // Locate the slot array. + slots_ = static_cast *>(aligned_memory_start); + aligned_memory_start = static_cast(aligned_memory_start) + + sizeof(std::atomic) * num_slots_tmp; + available_memory -= sizeof(std::atomic) * num_slots_tmp; + + // Locate the buckets. + buckets_ = aligned_memory_start; + // Extra-paranoid: If ValueT has an alignment requirement greater than that + // of std::atomic, we may need to adjust the start of the bucket + // array. + if (align(kBucketAlignment, bucket_size_, buckets_, available_memory) == + nullptr) { + FATAL_ERROR( + "StorageBlob used to hold resizable " + "SeparateChainingHashTable is too small to meet " + "alignment requirements of buckets."); + } else if (buckets_ != aligned_memory_start) { + DEV_WARNING( + "Bucket array start position adjusted to meet alignment " + "requirement for SeparateChainingHashTable's value type."); + if (num_buckets_tmp * bucket_size_ > available_memory) { + --num_buckets_tmp; + } + } + + // Fill in the header. + header_->num_slots = num_slots_tmp; + header_->num_buckets = num_buckets_tmp; + header_->buckets_allocated.store(0, std::memory_order_relaxed); + header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed); + available_memory -= bucket_size_ * (header_->num_buckets); + + // Locate variable-length key storage region, and give it all the remaining + // bytes in the blob. + key_manager_.setVariableLengthStorageInfo( + static_cast(buckets_) + header_->num_buckets * bucket_size_, + available_memory, + &(header_->variable_length_bytes_allocated)); +} + +PackedPayloadHashTable::~PackedPayloadHashTable() { + if (blob_.valid()) { + const block_id blob_id = blob_->getID(); + blob_.release(); + storage_manager_->deleteBlockOrBlobFile(blob_id); + } + std::free(init_payload_); +} + +void PackedPayloadHashTable::clear() { + const std::size_t used_buckets = + header_->buckets_allocated.load(std::memory_order_relaxed); + // Destroy existing values, if necessary. + destroyPayload(); + + // Zero-out slot array. + std::memset( + slots_, 0x0, sizeof(std::atomic) * header_->num_slots); + + // Zero-out used buckets. + std::memset(buckets_, 0x0, used_buckets * bucket_size_); + + header_->buckets_allocated.store(0, std::memory_order_relaxed); + header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed); + key_manager_.zeroNextVariableLengthKeyOffset(); +} + +void PackedPayloadHashTable::destroyPayload() { + const std::size_t num_buckets = + header_->buckets_allocated.load(std::memory_order_relaxed); + void *bucket_ptr = static_cast(buckets_) + kValueOffset; + for (std::size_t bucket_num = 0; bucket_num < num_buckets; ++bucket_num) { + for (std::size_t handle_id = 0; handle_id < num_handles_; ++handle_id) { + void *value_internal_ptr = + static_cast(bucket_ptr) + this->payload_offsets_[handle_id]; + handles_[handle_id]->destroyPayload(static_cast(value_internal_ptr)); + } + bucket_ptr = static_cast(bucket_ptr) + bucket_size_; + } +} + +bool PackedPayloadHashTable::upsertValueAccessorCompositeKey( + const std::vector> &argument_ids, + const std::vector &key_attr_ids, + const ValueAccessorMultiplexer &accessor_mux) { + ValueAccessor *base_accessor = accessor_mux.getBaseAccessor(); + ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor(); + + base_accessor->beginIterationVirtual(); + if (derived_accessor == nullptr) { + return upsertValueAccessorCompositeKeyInternal( + argument_ids, + key_attr_ids, + base_accessor, + nullptr); + } else { + DCHECK(derived_accessor->getImplementationType() + == ValueAccessor::Implementation::kColumnVectors); + derived_accessor->beginIterationVirtual(); + return upsertValueAccessorCompositeKeyInternal( + argument_ids, + key_attr_ids, + base_accessor, + static_cast(derived_accessor)); + } +} + +void PackedPayloadHashTable::resize(const std::size_t extra_buckets, + const std::size_t extra_variable_storage, + const std::size_t retry_num) { + // A retry should never be necessary with this implementation of HashTable. + // Separate chaining ensures that any resized hash table with more buckets + // than the original table will be able to hold more entries than the + // original. + DEBUG_ASSERT(retry_num == 0); + + SpinSharedMutexExclusiveLock write_lock(this->resize_shared_mutex_); + + // Recheck whether the hash table is still full. Note that multiple threads + // might wait to rebuild this hash table simultaneously. Only the first one + // should do the rebuild. + if (!isFull(extra_variable_storage)) { + return; + } + + // Approximately double the number of buckets and slots. + // + // TODO(chasseur): It may be worth it to more than double the number of + // buckets here so that we can maintain a good, sparse fill factor for a + // longer time as more values are inserted. Such behavior should take into + // account kHashTableLoadFactor. + std::size_t resized_num_slots = get_next_prime_number( + (header_->num_buckets + extra_buckets / 2) * kHashTableLoadFactor * 2); + std::size_t variable_storage_required = + (resized_num_slots / kHashTableLoadFactor) * + key_manager_.getEstimatedVariableKeySize(); + const std::size_t original_variable_storage_used = + header_->variable_length_bytes_allocated.load(std::memory_order_relaxed); + // If this resize was triggered by a too-large variable-length key, bump up + // the variable-length storage requirement. + if ((extra_variable_storage > 0) && + (extra_variable_storage + original_variable_storage_used > + key_manager_.getVariableLengthKeyStorageSize())) { + variable_storage_required += extra_variable_storage; + } + + const std::size_t resized_memory_required = + sizeof(Header) + resized_num_slots * sizeof(std::atomic) + + (resized_num_slots / kHashTableLoadFactor) * bucket_size_ + + variable_storage_required; + const std::size_t resized_storage_slots = + this->storage_manager_->SlotsNeededForBytes(resized_memory_required); + if (resized_storage_slots == 0) { + FATAL_ERROR( + "Storage requirement for resized SeparateChainingHashTable " + "exceeds maximum allocation size."); + } + + // Get a new StorageBlob to hold the resized hash table. + const block_id resized_blob_id = + this->storage_manager_->createBlob(resized_storage_slots); + MutableBlobReference resized_blob = + this->storage_manager_->getBlobMutable(resized_blob_id); + + // Locate data structures inside the new StorageBlob. + void *aligned_memory_start = resized_blob->getMemoryMutable(); + std::size_t available_memory = resized_storage_slots * kSlotSizeBytes; + if (align(alignof(Header), + sizeof(Header), + aligned_memory_start, + available_memory) == nullptr) { + // Should be impossible, as noted in constructor. + FATAL_ERROR( + "StorageBlob used to hold resized SeparateChainingHashTable " + "is too small to meet alignment requirements of " + "LinearOpenAddressingHashTable::Header."); + } else if (aligned_memory_start != resized_blob->getMemoryMutable()) { + // Again, should be impossible. + DEV_WARNING("In SeparateChainingHashTable::resize(), StorageBlob " + << "memory adjusted by " + << (resized_num_slots * kSlotSizeBytes - available_memory) + << " bytes to meet alignment requirement for " + << "LinearOpenAddressingHashTable::Header."); + } + + Header *resized_header = static_cast
(aligned_memory_start); + aligned_memory_start = + static_cast(aligned_memory_start) + sizeof(Header); + available_memory -= sizeof(Header); + + // As in constructor, recompute the number of slots and buckets using the + // actual available memory. + std::size_t resized_num_buckets = + (available_memory - extra_variable_storage) / + (kHashTableLoadFactor * sizeof(std::atomic) + bucket_size_ + + key_manager_.getEstimatedVariableKeySize()); + resized_num_slots = + get_previous_prime_number(resized_num_buckets * kHashTableLoadFactor); + resized_num_buckets = resized_num_slots / kHashTableLoadFactor; + + // Locate slot array. + std::atomic *resized_slots = + static_cast *>(aligned_memory_start); + aligned_memory_start = static_cast(aligned_memory_start) + + sizeof(std::atomic) * resized_num_slots; + available_memory -= sizeof(std::atomic) * resized_num_slots; + + // As in constructor, we will be extra paranoid and use align() to locate the + // start of the array of buckets, as well. + void *resized_buckets = aligned_memory_start; + if (align( + kBucketAlignment, bucket_size_, resized_buckets, available_memory) == + nullptr) { + FATAL_ERROR( + "StorageBlob used to hold resized SeparateChainingHashTable " + "is too small to meet alignment requirements of buckets."); + } else if (resized_buckets != aligned_memory_start) { + DEV_WARNING( + "Bucket array start position adjusted to meet alignment " + "requirement for SeparateChainingHashTable's value type."); + if (resized_num_buckets * bucket_size_ + variable_storage_required > + available_memory) { + --resized_num_buckets; + } + } + aligned_memory_start = static_cast(aligned_memory_start) + + resized_num_buckets * bucket_size_; + available_memory -= resized_num_buckets * bucket_size_; + + void *resized_variable_length_key_storage = aligned_memory_start; + const std::size_t resized_variable_length_key_storage_size = available_memory; + + const std::size_t original_buckets_used = + header_->buckets_allocated.load(std::memory_order_relaxed); + + // Initialize the header. + resized_header->num_slots = resized_num_slots; + resized_header->num_buckets = resized_num_buckets; + resized_header->buckets_allocated.store(original_buckets_used, + std::memory_order_relaxed); + resized_header->variable_length_bytes_allocated.store( + original_variable_storage_used, std::memory_order_relaxed); + + // Bulk-copy buckets. This is safe because: + // 1. The "next" pointers will be adjusted when rebuilding chains below. + // 2. The hash codes will stay the same. + // 3. For key components: + // a. Inline keys will stay exactly the same. + // b. Offsets into variable-length storage will remain valid, because + // we also do a byte-for-byte copy of variable-length storage below. + // c. Absolute external pointers will still point to the same address. + // d. Relative pointers are not used with resizable hash tables. + // 4. If values are not trivially copyable, then we invoke ValueT's copy + // or move constructor with placement new. + // NOTE(harshad) - Regarding point 4 above, as this is a specialized + // hash table implemented for aggregation, the values are trivially copyable, + // therefore we don't need to invoke payload values' copy/move constructors. + std::memcpy(resized_buckets, buckets_, original_buckets_used * bucket_size_); + + // Copy over variable-length key components, if any. + if (original_variable_storage_used > 0) { + DEBUG_ASSERT(original_variable_storage_used == + key_manager_.getNextVariableLengthKeyOffset()); + DEBUG_ASSERT(original_variable_storage_used <= + resized_variable_length_key_storage_size); + std::memcpy(resized_variable_length_key_storage, + key_manager_.getVariableLengthKeyStorage(), + original_variable_storage_used); + } + + destroyPayload(); + + // Make resized structures active. + std::swap(this->blob_, resized_blob); + header_ = resized_header; + slots_ = resized_slots; + buckets_ = resized_buckets; + key_manager_.setVariableLengthStorageInfo( + resized_variable_length_key_storage, + resized_variable_length_key_storage_size, + &(resized_header->variable_length_bytes_allocated)); + + // Drop the old blob. + const block_id old_blob_id = resized_blob->getID(); + resized_blob.release(); + this->storage_manager_->deleteBlockOrBlobFile(old_blob_id); + + // Rebuild chains. + void *current_bucket = buckets_; + for (std::size_t bucket_num = 0; bucket_num < original_buckets_used; + ++bucket_num) { + std::atomic *next_ptr = + static_cast *>(current_bucket); + const std::size_t hash_code = *reinterpret_cast( + static_cast(current_bucket) + + sizeof(std::atomic)); + + const std::size_t slot_number = hash_code % header_->num_slots; + std::size_t slot_ptr_value = 0; + if (slots_[slot_number].compare_exchange_strong( + slot_ptr_value, bucket_num + 1, std::memory_order_relaxed)) { + // This bucket is the first in the chain for this block, so reset its + // next pointer to 0. + next_ptr->store(0, std::memory_order_relaxed); + } else { + // A chain already exists starting from this slot, so put this bucket at + // the head. + next_ptr->store(slot_ptr_value, std::memory_order_relaxed); + slots_[slot_number].store(bucket_num + 1, std::memory_order_relaxed); + } + current_bucket = static_cast(current_bucket) + bucket_size_; + } +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/PackedPayloadHashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp new file mode 100644 index 0000000..f87a1de --- /dev/null +++ b/storage/PackedPayloadHashTable.hpp @@ -0,0 +1,995 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_ +#define QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "catalog/CatalogTypedefs.hpp" +#include "expressions/aggregation/AggregationHandle.hpp" +#include "storage/HashTableBase.hpp" +#include "storage/HashTableKeyManager.hpp" +#include "storage/StorageBlob.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" +#include "storage/ValueAccessorUtil.hpp" +#include "threading/SpinMutex.hpp" +#include "threading/SpinSharedMutex.hpp" +#include "types/TypedValue.hpp" +#include "types/containers/ColumnVectorsValueAccessor.hpp" +#include "utility/HashPair.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +class StorageManager; +class Type; +class ValueAccessor; + +/** \addtogroup Storage + * @{ + */ + +/** + * @brief Aggregation hash table implementation in which the payload can be just + * a bunch of bytes. This implementation is suitable for aggregation with + * multiple aggregation handles (e.g. SUM, MAX, MIN etc). + * + * At present the hash table uses separate chaining to resolve collisions, i.e. + * Keys/values are stored in a separate region of memory from the base hash + * table slot array. Every bucket has a "next" pointer so that entries that + * collide (i.e. map to the same base slot) form chains of pointers with each + * other. + **/ +class PackedPayloadHashTable : public AggregationStateHashTableBase { + public: + /** + * @brief Constructor. + * + * @param key_types A vector of one or more types (>1 indicates a composite + * key). + * @param num_entries The estimated number of entries this hash table will + * hold. + * @param handles The aggregation handles. + * @param storage_manager The StorageManager to use (a StorageBlob will be + * allocated to hold this hash table's contents). + **/ + PackedPayloadHashTable( + const std::vector &key_types, + const std::size_t num_entries, + const std::vector &handles, + StorageManager *storage_manager); + + ~PackedPayloadHashTable() override; + + /** + * @brief Erase all entries in this hash table. + * + * @warning This method is not guaranteed to be threadsafe. + **/ + void clear(); + + void destroyPayload() override; + + /** + * @brief Use aggregation handles to update (multiple) aggregation states in + * this hash table, with group-by keys and arguments drawn from the + * given ValueAccessors. New states are first inserted if not already + * present. + * + * @note This method is threadsafe with regard to other calls to + * upsertCompositeKey() and upsertValueAccessorCompositeKey(). + * + * @param argument_ids The multi-source attribute IDs of each argument + * component to be read from \p accessor_mux. + * @param key_ids The multi-source attribute IDs of each group-by key + * component to be read from \p accessor_mux. + * @param accessor_mux A ValueAccessorMultiplexer object that contains the + * ValueAccessors which will be used to access keys. beginIteration() + * should be called on the accessors before calling this method. + * @return True on success, false if upsert failed because there was not + * enough space to insert new entries for all the keys in accessor + * (note that some entries may still have been upserted, and + * accessors' iterations will be left on the first tuple which could + * not be inserted). + **/ + bool upsertValueAccessorCompositeKey( + const std::vector> &argument_ids, + const std::vector &key_ids, + const ValueAccessorMultiplexer &accessor_mux) override; + + /** + * @return The ID of the StorageBlob used to store this hash table. + **/ + inline block_id getBlobId() const { + return blob_->getID(); + } + + /** + * @warning This method assumes that no concurrent calls to + * upsertCompositeKey() or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call). + * Concurrent calls to getSingleCompositeKey(), forEach(), and + * forEachCompositeKey() are safe. + * + * @return The number of entries in this HashTable. + **/ + inline std::size_t numEntries() const { + return header_->buckets_allocated.load(std::memory_order_relaxed); + } + + /** + * @brief Use aggregation handles to merge the given aggregation states into + * the aggregation states mapped to the given key. New states are first + * inserted if not already present. + * + * @warning The key must not be null. + * @note This method is threadsafe with regard to other calls to + * upsertCompositeKey() and upsertValueAccessorCompositeKey(). + * + * @param key The key. + * @param source_state The source aggregation states to be merged into this + * hash table. + * @return True on success, false if upsert failed because there was not + * enough space to insert a new entry in this hash table. + **/ + inline bool upsertCompositeKey(const std::vector &key, + const std::uint8_t *source_state); + + /** + * @brief Apply a functor to an aggregation state mapped to the given key. + * First inserting a new state if one is not already present. + * + * @warning The key must not be null. + * @note This method is threadsafe with regard to other calls to + * upsertCompositeKey() and upsertValueAccessorCompositeKey(). + * + * @param key The key. + * @param functor A pointer to a functor, which should provide a call + * operator which takes an aggregation state (of type std::uint8_t *) + * as an argument. + * @param index The index of the target aggregation state among those states + * mapped to \p key. + * @return True on success, false if upsert failed because there was not + * enough space to insert a new entry in this hash table. + **/ + template + inline bool upsertCompositeKey(const std::vector &key, + FunctorT *functor, + const std::size_t index); + + /** + * @brief Lookup a composite key against this hash table to find a matching + * entry. + * + * @warning The key must not be null. + * @warning This method assumes that no concurrent calls to + * upsertCompositeKey() or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call and as long as the returned pointer may be + * dereferenced). Concurrent calls to getSingleCompositeKey(), + * forEach(), and forEachCompositeKey() are safe. + * + * @param key The key to look up. + * @return The value of a matched entry if a matching key is found. + * Otherwise, return NULL. + **/ + inline const std::uint8_t* getSingleCompositeKey( + const std::vector &key) const; + + /** + * @brief Lookup a composite key against this hash table to find a matching + * entry. Then return the aggregation state component with the + * specified index. + * + * @warning The key must not be null. + * @warning This method assumes that no concurrent calls to + * upsertCompositeKey() or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call and as long as the returned pointer may be + * dereferenced). Concurrent calls to getSingleCompositeKey(), + * forEach(), and forEachCompositeKey() are safe. + * + * @param key The key to look up. + * @param index The index of the target aggregation state among those states + * mapped to \p key. + * @return The aggregation state of the specified index if a matching key is + * found. Otherwise, return NULL. + **/ + inline const std::uint8_t* getSingleCompositeKey( + const std::vector &key, + const std::size_t index) const; + + /** + * @brief Apply a functor to each (key, value) pair in this hash table. + * + * @warning This method assumes that no concurrent calls to + * upsertCompositeKey() or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call and as long as the returned pointer may be + * dereferenced). Concurrent calls to getSingleCompositeKey(), + * forEach(), and forEachCompositeKey() are safe. + * + * @param functor A pointer to a functor, which should provide a call operator + * which takes 2 arguments: const TypedValue&, const std::uint8_t*. + * The call operator will be invoked once on each key, value pair in + * this hash table. + * @return The number of key-value pairs visited. + **/ + template + inline std::size_t forEach(FunctorT *functor) const; + + /** + * @brief Apply a functor to each (key, aggregation state) pair in this hash + * table, where the aggregation state is retrieved from the value + * that maps to the corresponding key with the specified index. + * + * @warning This method assumes that no concurrent calls to + * upsertCompositeKey() or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call and as long as the returned pointer may be + * dereferenced). Concurrent calls to getSingleCompositeKey(), + * forEach(), and forEachCompositeKey() are safe. + * + * @param functor A pointer to a functor, which should provide a call operator + * which takes 2 arguments: const TypedValue&, const std::uint8_t*. + * The call operator will be invoked once on each (key, aggregation state) + * pair in this hash table. + * @param index The index of the target aggregation state among those states + * mapped to \p key. + * @return The number of key-value pairs visited. + **/ + template + inline std::size_t forEach(FunctorT *functor, const int index) const; + + /** + * @brief Apply a functor to each key, value pair in this hash table. + * Composite key version. + * + * @warning This method assumes that no concurrent calls to + * upsertCompositeKey() or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call and as long as the returned pointer may be + * dereferenced). Concurrent calls to getSingleCompositeKey(), + * forEach(), and forEachCompositeKey() are safe. + * + * @param functor A pointer to a functor, which should provide a call operator + * which takes 2 arguments: const TypedValue&, const std::uint8_t*. + * The call operator will be invoked once on each key, value pair in + * this hash table. + * @return The number of key-value pairs visited. + **/ + template + inline std::size_t forEachCompositeKey(FunctorT *functor) const; + + /** + * @brief Apply a functor to each (key, aggregation state) pair in this hash + * table, where the aggregation state is retrieved from the value + * that maps to the corresponding key with the specified index. + * Composite key version. + * + * @warning This method assumes that no concurrent calls to + * upsertCompositeKey() or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call and as long as the returned pointer may be + * dereferenced). Concurrent calls to getSingleCompositeKey(), + * forEach(), and forEachCompositeKey() are safe. + * + * @param functor A pointer to a functor, which should provide a call operator + * which takes 2 arguments: const TypedValue&, const std::uint8_t*. + * The call operator will be invoked once on each (key, aggregation state) + * pair in this hash table. + * @param index The index of the target aggregation state among those states + * mapped to \p key. + * @return The number of key-value pairs visited. + **/ + template + inline std::size_t forEachCompositeKey(FunctorT *functor, + const std::size_t index) const; + + private: + void resize(const std::size_t extra_buckets, + const std::size_t extra_variable_storage, + const std::size_t retry_num = 0); + + inline std::size_t calculateVariableLengthCompositeKeyCopySize( + const std::vector &key) const { + std::size_t total = 0; + for (std::vector::size_type idx = 0; idx < key.size(); ++idx) { + if (!(*key_inline_)[idx]) { + total += key[idx].getDataSize(); + } + } + return total; + } + + inline bool getNextEntry(TypedValue *key, + const std::uint8_t **value, + std::size_t *entry_num) const; + + inline bool getNextEntryCompositeKey(std::vector *key, + const std::uint8_t **value, + std::size_t *entry_num) const; + + inline std::uint8_t* upsertCompositeKeyInternal( + const std::vector &key, + const std::size_t variable_key_size); + + template + inline bool upsertValueAccessorCompositeKeyInternal( + const std::vector> &argument_ids, + const std::vector &key_ids, + ValueAccessor *base_accessor, + ColumnVectorsValueAccessor *derived_accessor); + + // Generate a hash for a composite key by hashing each component of 'key' and + // mixing their bits with CombineHashes(). + inline std::size_t hashCompositeKey(const std::vector &key) const; + + // Set information about which key components are stored inline. This usually + // comes from a HashTableKeyManager, and is set by the constructor of a + // subclass of HashTable. + inline void setKeyInline(const std::vector *key_inline) { + scalar_key_inline_ = key_inline->front(); + key_inline_ = key_inline; + } + + inline static std::size_t ComputeTotalPayloadSize( + const std::vector &handles) { + std::size_t total_payload_size = sizeof(SpinMutex); + for (const auto *handle : handles) { + total_payload_size += handle->getPayloadSize(); + } + return total_payload_size; + } + + // Assign '*key_vector' with the attribute values specified by 'key_ids' at + // the current position of 'accessor'. If 'check_for_null_keys' is true, stops + // and returns true if any of the values is null, otherwise returns false. + template + inline static bool GetCompositeKeyFromValueAccessor( + const std::vector &key_ids, + const ValueAccessorT *accessor, + const ColumnVectorsValueAccessor *derived_accessor, + std::vector *key_vector) { + for (std::size_t key_idx = 0; key_idx < key_ids.size(); ++key_idx) { + const MultiSourceAttributeId &key_id = key_ids[key_idx]; + if (use_two_accessors && key_id.source == ValueAccessorSource::kDerived) { + (*key_vector)[key_idx] = derived_accessor->getTypedValue(key_id.attr_id); + } else { + (*key_vector)[key_idx] = accessor->getTypedValue(key_id.attr_id); + } + if (check_for_null_keys && (*key_vector)[key_idx].isNull()) { + return true; + } + } + return false; + } + + struct Header { + std::size_t num_slots; + std::size_t num_buckets; + alignas(kCacheLineBytes) std::atomic buckets_allocated; + alignas(kCacheLineBytes) + std::atomic variable_length_bytes_allocated; + }; + + // Type(s) of keys. + const std::vector key_types_; + + // Information about whether key components are stored inline or in a + // separate variable-length storage region. This is usually determined by a + // HashTableKeyManager and set by calling setKeyInline(). + bool scalar_key_inline_; + const std::vector *key_inline_; + + const std::size_t num_handles_; + const std::vector handles_; + + std::size_t total_payload_size_; + std::vector payload_offsets_; + std::uint8_t *init_payload_; + + StorageManager *storage_manager_; + MutableBlobReference blob_; + + // Locked in shared mode for most operations, exclusive mode during resize. + // Not locked at all for non-resizable HashTables. + alignas(kCacheLineBytes) SpinSharedMutex resize_shared_mutex_; + + std::size_t kBucketAlignment; + + // Value's offset in a bucket is the first alignof(ValueT) boundary after the + // next pointer and hash code. + std::size_t kValueOffset; + + // Round bucket size up to a multiple of kBucketAlignment. + inline std::size_t ComputeBucketSize(const std::size_t fixed_key_size) { + return (((kValueOffset + this->total_payload_size_ + fixed_key_size - 1) / + kBucketAlignment) + + 1) * + kBucketAlignment; + } + + // Attempt to find an empty bucket to insert 'hash_code' into, starting after + // '*bucket' in the chain (or, if '*bucket' is NULL, starting from the slot + // array). Returns true and stores SIZE_T_MAX in '*pending_chain_ptr' if an + // empty bucket is found. Returns false if 'allow_duplicate_keys' is false + // and a hash collision is found (caller should then check whether there is a + // genuine key collision or the hash collision is spurious). Returns false + // and sets '*bucket' to NULL if there are no more empty buckets in the hash + // table. If 'variable_key_allocation_required' is nonzero, this method will + // attempt to allocate storage for a variable-length key BEFORE allocating a + // bucket, so that no bucket number below 'header_->num_buckets' is ever + // deallocated after being allocated. + inline bool locateBucketForInsertion( + const std::size_t hash_code, + const std::size_t variable_key_allocation_required, + void **bucket, + std::atomic **pending_chain_ptr, + std::size_t *pending_chain_ptr_finish_value); + + // Write a scalar 'key' and its 'hash_code' into the '*bucket', which was + // found by locateBucketForInsertion(). Assumes that storage for a + // variable-length key copy (if any) was already allocated by a successful + // call to allocateVariableLengthKeyStorage(). + inline void writeScalarKeyToBucket( + const TypedValue &key, + const std::size_t hash_code, + void *bucket); + + // Write a composite 'key' and its 'hash_code' into the '*bucket', which was + // found by locateBucketForInsertion(). Assumes that storage for + // variable-length key copies (if any) was already allocated by a successful + // call to allocateVariableLengthKeyStorage(). + inline void writeCompositeKeyToBucket( + const std::vector &key, + const std::size_t hash_code, + void *bucket); + + // Determine whether it is actually necessary to resize this hash table. + // Checks that there is at least one unallocated bucket, and that there is + // at least 'extra_variable_storage' bytes of variable-length storage free. + inline bool isFull(const std::size_t extra_variable_storage) const; + + // Helper object to manage key storage. + HashTableKeyManager key_manager_; + + // In-memory structure is as follows: + // - SeparateChainingHashTable::Header + // - Array of slots, interpreted as follows: + // - 0 = Points to nothing (empty) + // - SIZE_T_MAX = Pending (some thread is starting a chain from this + // slot and will overwrite it soon) + // - Anything else = The number of the first bucket in the chain for + // this slot PLUS ONE (i.e. subtract one to get the actual bucket + // number). + // - Array of buckets, each of which is: + // - atomic size_t "next" pointer, interpreted the same as slots above. + // - size_t hash value + // - possibly some unused bytes as needed so that ValueT's alignment + // requirement is met + // - ValueT value slot + // - fixed-length key storage (which may include pointers to external + // memory or offsets of variable length keys stored within this hash + // table) + // - possibly some additional unused bytes so that bucket size is a + // multiple of both alignof(std::atomic) and + // alignof(ValueT) + // - Variable-length key storage region (referenced by offsets stored in + // fixed-length keys). + Header *header_; + + std::atomic *slots_; + void *buckets_; + const std::size_t bucket_size_; + + DISALLOW_COPY_AND_ASSIGN(PackedPayloadHashTable); +}; + +/** @} */ + +// ---------------------------------------------------------------------------- +// Implementations of template class methods follow. + +class HashTableMerger { + public: + /** + * @brief Constructor + * + * @param handle The Aggregation handle being used. + * @param destination_hash_table The destination hash table to which other + * hash tables will be merged. + **/ + explicit HashTableMerger(PackedPayloadHashTable *destination_hash_table) + : destination_hash_table_(destination_hash_table) {} + + /** + * @brief The operator for the functor. + * + * @param group_by_key The group by key being merged. + * @param source_state The aggregation state for the given key in the source + * aggregation hash table. + **/ + inline void operator()(const std::vector &group_by_key, + const std::uint8_t *source_state) { + destination_hash_table_->upsertCompositeKey(group_by_key, source_state); + } + + private: + PackedPayloadHashTable *destination_hash_table_; + + DISALLOW_COPY_AND_ASSIGN(HashTableMerger); +}; + +inline std::size_t PackedPayloadHashTable::hashCompositeKey( + const std::vector &key) const { + DEBUG_ASSERT(!key.empty()); + DEBUG_ASSERT(key.size() == key_types_.size()); + std::size_t hash = key.front().getHash(); + for (std::vector::const_iterator key_it = key.begin() + 1; + key_it != key.end(); + ++key_it) { + hash = CombineHashes(hash, key_it->getHash()); + } + return hash; +} + +inline bool PackedPayloadHashTable::getNextEntry(TypedValue *key, + const std::uint8_t **value, + std::size_t *entry_num) const { + if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) { + const char *bucket = + static_cast(buckets_) + (*entry_num) * bucket_size_; + *key = key_manager_.getKeyComponentTyped(bucket, 0); + *value = reinterpret_cast(bucket + kValueOffset); + ++(*entry_num); + return true; + } else { + return false; + } +} + +inline bool PackedPayloadHashTable::getNextEntryCompositeKey( + std::vector *key, + const std::uint8_t **value, + std::size_t *entry_num) const { + if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) { + const char *bucket = + static_cast(buckets_) + (*entry_num) * bucket_size_; + for (std::vector::size_type key_idx = 0; + key_idx < this->key_types_.size(); + ++key_idx) { + key->emplace_back(key_manager_.getKeyComponentTyped(bucket, key_idx)); + } + *value = reinterpret_cast(bucket + kValueOffset); + ++(*entry_num); + return true; + } else { + return false; + } +} + +inline bool PackedPayloadHashTable::locateBucketForInsertion( + const std::size_t hash_code, + const std::size_t variable_key_allocation_required, + void **bucket, + std::atomic **pending_chain_ptr, + std::size_t *pending_chain_ptr_finish_value) { + if (*bucket == nullptr) { + *pending_chain_ptr = &(slots_[hash_code % header_->num_slots]); + } else { + *pending_chain_ptr = static_cast *>(*bucket); + } + for (;;) { + std::size_t existing_chain_ptr = 0; + if ((*pending_chain_ptr) + ->compare_exchange_strong(existing_chain_ptr, + std::numeric_limits::max(), + std::memory_order_acq_rel)) { + // Got to the end of the chain. Allocate a new bucket. + + // First, allocate variable-length key storage, if needed (i.e. if this + // is an upsert and we didn't allocate up-front). + if (!key_manager_.allocateVariableLengthKeyStorage( + variable_key_allocation_required)) { + // Ran out of variable-length storage. + (*pending_chain_ptr)->store(0, std::memory_order_release); + *bucket = nullptr; + return false; + } + + const std::size_t allocated_bucket_num = + header_->buckets_allocated.fetch_add(1, std::memory_order_relaxed); + if (allocated_bucket_num >= header_->num_buckets) { + // Ran out of buckets. + header_->buckets_allocated.fetch_sub(1, std::memory_order_relaxed); + (*pending_chain_ptr)->store(0, std::memory_order_release); + *bucket = nullptr; + return false; + } else { + *bucket = + static_cast(buckets_) + allocated_bucket_num * bucket_size_; + *pending_chain_ptr_finish_value = allocated_bucket_num + 1; + return true; + } + } + // Spin until the real "next" pointer is available. + while (existing_chain_ptr == std::numeric_limits::max()) { + existing_chain_ptr = + (*pending_chain_ptr)->load(std::memory_order_acquire); + } + if (existing_chain_ptr == 0) { + // Other thread had to roll back, so try again. + continue; + } + // Chase the next pointer. + *bucket = + static_cast(buckets_) + (existing_chain_ptr - 1) * bucket_size_; + *pending_chain_ptr = static_cast *>(*bucket); + const std::size_t hash_in_bucket = *reinterpret_cast( + static_cast(*bucket) + + sizeof(std::atomic)); + if (hash_in_bucket == hash_code) { + return false; + } + } +} + +inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey( + const std::vector &key) const { + DEBUG_ASSERT(this->key_types_.size() == key.size()); + + const std::size_t hash_code = this->hashCompositeKey(key); + std::size_t bucket_ref = + slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed); + while (bucket_ref != 0) { + DEBUG_ASSERT(bucket_ref != std::numeric_limits::max()); + const char *bucket = + static_cast(buckets_) + (bucket_ref - 1) * bucket_size_; + const std::size_t bucket_hash = *reinterpret_cast( + bucket + sizeof(std::atomic)); + if ((bucket_hash == hash_code) && + key_manager_.compositeKeyCollisionCheck(key, bucket)) { + // Match located. + return reinterpret_cast(bucket + kValueOffset); + } + bucket_ref = + reinterpret_cast *>(bucket)->load( + std::memory_order_relaxed); + } + + // Reached the end of the chain and didn't find a match. + return nullptr; +} + +inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey( + const std::vector &key, + const std::size_t index) const { + DEBUG_ASSERT(this->key_types_.size() == key.size()); + + const std::size_t hash_code = this->hashCompositeKey(key); + std::size_t bucket_ref = + slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed); + while (bucket_ref != 0) { + DEBUG_ASSERT(bucket_ref != std::numeric_limits::max()); + const char *bucket = + static_cast(buckets_) + (bucket_ref - 1) * bucket_size_; + const std::size_t bucket_hash = *reinterpret_cast( + bucket + sizeof(std::atomic)); + if ((bucket_hash == hash_code) && + key_manager_.compositeKeyCollisionCheck(key, bucket)) { + // Match located. + return reinterpret_cast(bucket + kValueOffset) + + this->payload_offsets_[index]; + } + bucket_ref = + reinterpret_cast *>(bucket)->load( + std::memory_order_relaxed); + } + + // Reached the end of the chain and didn't find a match. + return nullptr; +} + +inline bool PackedPayloadHashTable::upsertCompositeKey( + const std::vector &key, + const std::uint8_t *source_state) { + const std::size_t variable_size = + calculateVariableLengthCompositeKeyCopySize(key); + for (;;) { + { + SpinSharedMutexSharedLock resize_lock(resize_shared_mutex_); + std::uint8_t *value = + upsertCompositeKeyInternal(key, variable_size); + if (value != nullptr) { + SpinMutexLock lock(*(reinterpret_cast(value))); + for (unsigned int k = 0; k < num_handles_; ++k) { + handles_[k]->mergeStates(source_state + payload_offsets_[k], + value + payload_offsets_[k]); + } + return true; + } + } + resize(0, variable_size); + } +} + +template +inline bool PackedPayloadHashTable::upsertCompositeKey( + const std::vector &key, + FunctorT *functor, + const std::size_t index) { + const std::size_t variable_size = + calculateVariableLengthCompositeKeyCopySize(key); + for (;;) { + { + SpinSharedMutexSharedLock resize_lock(resize_shared_mutex_); + std::uint8_t *value = + upsertCompositeKeyInternal(key, variable_size); + if (value != nullptr) { + (*functor)(value + payload_offsets_[index]); + return true; + } + } + resize(0, variable_size); + } +} + + +inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal( + const std::vector &key, + const std::size_t variable_key_size) { + if (variable_key_size > 0) { + // Don't allocate yet, since the key may already be present. However, we + // do check if either the allocated variable storage space OR the free + // space is big enough to hold the key (at least one must be true: either + // the key is already present and allocated, or we need to be able to + // allocate enough space for it). + std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load( + std::memory_order_relaxed); + if ((allocated_bytes < variable_key_size) && + (allocated_bytes + variable_key_size > + key_manager_.getVariableLengthKeyStorageSize())) { + return nullptr; + } + } + + const std::size_t hash_code = this->hashCompositeKey(key); + void *bucket = nullptr; + std::atomic *pending_chain_ptr; + std::size_t pending_chain_ptr_finish_value; + for (;;) { + if (locateBucketForInsertion(hash_code, + variable_key_size, + &bucket, + &pending_chain_ptr, + &pending_chain_ptr_finish_value)) { + // Found an empty bucket. + break; + } else if (bucket == nullptr) { + // Ran out of buckets or variable-key space. + return nullptr; + } else if (key_manager_.compositeKeyCollisionCheck(key, bucket)) { + // Found an already-existing entry for this key. + return reinterpret_cast(static_cast(bucket) + + kValueOffset); + } + } + + // We are now writing to an empty bucket. + // Write the key and hash. + writeCompositeKeyToBucket(key, hash_code, bucket); + + std::uint8_t *value = static_cast(bucket) + kValueOffset; + std::memcpy(value, init_payload_, this->total_payload_size_); + + // Update the previous chaing pointer to point to the new bucket. + pending_chain_ptr->store(pending_chain_ptr_finish_value, + std::memory_order_release); + + // Return the value. + return value; +} + +template +inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal( + const std::vector> &argument_ids, + const std::vector &key_ids, + ValueAccessor *base_accessor, + ColumnVectorsValueAccessor *derived_accessor) { + std::size_t variable_size; + std::vector key_vector; + key_vector.resize(key_ids.size()); + + return InvokeOnAnyValueAccessor( + base_accessor, + [&](auto *accessor) -> bool { // NOLINT(build/c++11) + bool continuing = true; + while (continuing) { + { + continuing = false; + SpinSharedMutexSharedLock lock(resize_shared_mutex_); + while (accessor->next()) { + if (use_two_accessors) { + derived_accessor->next(); + } + if (this->GetCompositeKeyFromValueAccessor( + key_ids, + accessor, + derived_accessor, + &key_vector)) { + continue; + } + variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector); + std::uint8_t *value = this->upsertCompositeKeyInternal( + key_vector, variable_size); + if (value == nullptr) { + continuing = true; + break; + } else { + SpinMutexLock lock(*(reinterpret_cast(value))); + for (unsigned int k = 0; k < num_handles_; ++k) { + const auto &ids = argument_ids[k]; + if (ids.empty()) { + handles_[k]->updateStateNullary(value + payload_offsets_[k]); + } else { + const MultiSourceAttributeId &arg_id = ids.front(); + if (use_two_accessors && arg_id.source == ValueAccessorSource::kDerived) { + DCHECK_NE(arg_id.attr_id, kInvalidAttributeID); + handles_[k]->updateStateUnary(derived_accessor->getTypedValue(arg_id.attr_id), + value + payload_offsets_[k]); + } else { + handles_[k]->updateStateUnary(accessor->getTypedValue(arg_id.attr_id), + value + payload_offsets_[k]); + } + } + } + } + } + } + if (continuing) { + this->resize(0, variable_size); + accessor->previous(); + if (use_two_accessors) { + derived_accessor->previous(); + } + } + } + return true; + }); +} + +inline void PackedPayloadHashTable::writeScalarKeyToBucket( + const TypedValue &key, + const std::size_t hash_code, + void *bucket) { + *reinterpret_cast(static_cast(bucket) + + sizeof(std::atomic)) = + hash_code; + key_manager_.writeKeyComponentToBucket(key, 0, bucket, nullptr); +} + +inline void PackedPayloadHashTable::writeCompositeKeyToBucket( + const std::vector &key, + const std::size_t hash_code, + void *bucket) { + DEBUG_ASSERT(key.size() == this->key_types_.size()); + *reinterpret_cast(static_cast(bucket) + + sizeof(std::atomic)) = + hash_code; + for (std::size_t idx = 0; idx < this->key_types_.size(); ++idx) { + key_manager_.writeKeyComponentToBucket(key[idx], idx, bucket, nullptr); + } +} + +inline bool PackedPayloadHashTable::isFull( + const std::size_t extra_variable_storage) const { + if (header_->buckets_allocated.load(std::memory_order_relaxed) >= + header_->num_buckets) { + // All buckets are allocated. + return true; + } + + if (extra_variable_storage > 0) { + if (extra_variable_storage + + header_->variable_length_bytes_allocated.load( + std::memory_order_relaxed) > + key_manager_.getVariableLengthKeyStorageSize()) { + // Not enough variable-length key storage space. + return true; + } + } + + return false; +} + +template +inline std::size_t PackedPayloadHashTable::forEach(FunctorT *functor) const { + std::size_t entries_visited = 0; + std::size_t entry_num = 0; + TypedValue key; + const std::uint8_t *value_ptr; + while (getNextEntry(&key, &value_ptr, &entry_num)) { + ++entries_visited; + (*functor)(key, value_ptr); + } + return entries_visited; +} + +template +inline std::size_t PackedPayloadHashTable::forEach( + FunctorT *functor, const int index) const { + std::size_t entries_visited = 0; + std::size_t entry_num = 0; + TypedValue key; + const std::uint8_t *value_ptr; + while (getNextEntry(&key, &value_ptr, &entry_num)) { + ++entries_visited; + (*functor)(key, value_ptr + payload_offsets_[index]); + key.clear(); + } + return entries_visited; +} + +template +inline std::size_t PackedPayloadHashTable::forEachCompositeKey( + FunctorT *functor) const { + std::size_t entries_visited = 0; + std::size_t entry_num = 0; + std::vector key; + const std::uint8_t *value_ptr; + while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) { + ++entries_visited; + (*functor)(key, value_ptr); + key.clear(); + } + return entries_visited; +} + +template +inline std::size_t PackedPayloadHashTable::forEachCompositeKey( + FunctorT *functor, + const std::size_t index) const { + std::size_t entries_visited = 0; + std::size_t entry_num = 0; + std::vector key; + const std::uint8_t *value_ptr; + while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) { + ++entries_visited; + (*functor)(key, value_ptr + payload_offsets_[index]); + key.clear(); + } + return entries_visited; +} + +} // namespace quickstep + +#endif // QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/PartitionedHashTablePool.hpp ---------------------------------------------------------------------- diff --git a/storage/PartitionedHashTablePool.hpp b/storage/PartitionedHashTablePool.hpp index 95d1810..0e62511 100644 --- a/storage/PartitionedHashTablePool.hpp +++ b/storage/PartitionedHashTablePool.hpp @@ -21,22 +21,19 @@ #define QUICKSTEP_STORAGE_PARTITIONED_HASH_TABLE_POOL_HPP_ #include -#include +#include #include -#include #include -#include "expressions/aggregation/AggregationHandle.hpp" #include "storage/HashTableBase.hpp" -#include "storage/FastHashTable.hpp" -#include "storage/FastHashTableFactory.hpp" +#include "storage/HashTableFactory.hpp" #include "utility/Macros.hpp" -#include "utility/StringUtil.hpp" #include "glog/logging.h" namespace quickstep { +class AggregationHandle; class StorageManager; class Type; @@ -54,33 +51,6 @@ class PartitionedHashTablePool { /** * @brief Constructor. * - * @param estimated_num_entries The maximum number of entries in a hash table. - * @param num_partitions The number of partitions (i.e. number of HashTables) - * @param hash_table_impl_type The type of hash table implementation. - * @param group_by_types A vector of pointer of types which form the group by - * key. - * @param agg_handle The aggregation handle. - * @param storage_manager A pointer to the storage manager. - **/ - PartitionedHashTablePool(const std::size_t estimated_num_entries, - const std::size_t num_partitions, - const HashTableImplType hash_table_impl_type, - const std::vector &group_by_types, - AggregationHandle *agg_handle, - StorageManager *storage_manager) - : estimated_num_entries_( - setHashTableSize(estimated_num_entries, num_partitions)), - num_partitions_(num_partitions), - hash_table_impl_type_(hash_table_impl_type), - group_by_types_(group_by_types), - agg_handle_(DCHECK_NOTNULL(agg_handle)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) { - initializeAllHashTables(); - } - - /** - * @brief Constructor. - * * @note This constructor is relevant for the HashTable specialized for * aggregation. * @@ -89,8 +59,6 @@ class PartitionedHashTablePool { * @param hash_table_impl_type The type of hash table implementation. * @param group_by_types A vector of pointer of types which form the group by * key. - * @param payload_sizes The sizes of the payload elements (i.e. - * AggregationStates). * @param handles The aggregation handles. * @param storage_manager A pointer to the storage manager. **/ @@ -98,7 +66,6 @@ class PartitionedHashTablePool { const std::size_t num_partitions, const HashTableImplType hash_table_impl_type, const std::vector &group_by_types, - const std::vector &payload_sizes, const std::vector &handles, StorageManager *storage_manager) : estimated_num_entries_( @@ -106,7 +73,6 @@ class PartitionedHashTablePool { num_partitions_(num_partitions), hash_table_impl_type_(hash_table_impl_type), group_by_types_(group_by_types), - payload_sizes_(payload_sizes), handles_(handles), storage_manager_(DCHECK_NOTNULL(storage_manager)) { initializeAllHashTables(); @@ -150,25 +116,17 @@ class PartitionedHashTablePool { private: void initializeAllHashTables() { for (std::size_t part_num = 0; part_num < num_partitions_; ++part_num) { - AggregationStateHashTableBase *part_hash_table = createNewHashTableFast(); + AggregationStateHashTableBase *part_hash_table = createNewHashTable(); hash_tables_.push_back( std::unique_ptr(part_hash_table)); } } AggregationStateHashTableBase* createNewHashTable() { - return agg_handle_->createGroupByHashTable(hash_table_impl_type_, - group_by_types_, - estimated_num_entries_, - storage_manager_); - } - - AggregationStateHashTableBase* createNewHashTableFast() { - return AggregationStateFastHashTableFactory::CreateResizable( + return AggregationStateHashTableFactory::CreateResizable( hash_table_impl_type_, group_by_types_, estimated_num_entries_, - payload_sizes_, handles_, storage_manager_); } @@ -189,10 +147,6 @@ class PartitionedHashTablePool { const HashTableImplType hash_table_impl_type_; const std::vector group_by_types_; - - std::vector payload_sizes_; - - AggregationHandle *agg_handle_; const std::vector handles_; StorageManager *storage_manager_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/StorageBlock.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp index de2d25b..0cc7735 100644 --- a/storage/StorageBlock.cpp +++ b/storage/StorageBlock.cpp @@ -19,8 +19,8 @@ #include "storage/StorageBlock.hpp" -#include #include +#include #include #include #include @@ -28,7 +28,6 @@ #include "catalog/CatalogRelationSchema.hpp" #include "catalog/CatalogTypedefs.hpp" -#include "expressions/aggregation/AggregationHandle.hpp" #include "expressions/predicate/Predicate.hpp" #include "expressions/scalar/Scalar.hpp" #include "storage/BasicColumnStoreTupleStorageSubBlock.hpp" @@ -37,7 +36,6 @@ #include "storage/CompressedColumnStoreTupleStorageSubBlock.hpp" #include "storage/CompressedPackedRowStoreTupleStorageSubBlock.hpp" #include "storage/CountedReference.hpp" -#include "storage/HashTableBase.hpp" #include "storage/IndexSubBlock.hpp" #include "storage/InsertDestinationInterface.hpp" #include "storage/SMAIndexSubBlock.hpp" @@ -396,166 +394,6 @@ void StorageBlock::selectSimple(const std::vector &selection, accessor.get()); } -AggregationState* StorageBlock::aggregate( - const AggregationHandle &handle, - const std::vector> &arguments, - const std::vector *arguments_as_attributes, - const TupleIdSequence *filter) const { -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - // If all the arguments to this aggregate are plain relation attributes, - // aggregate directly on a ValueAccessor from this block to avoid a copy. - if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) { - DCHECK_EQ(arguments.size(), arguments_as_attributes->size()) - << "Mismatch between number of arguments and number of attribute_ids"; - return aggregateHelperValueAccessor(handle, *arguments_as_attributes, filter); - } - // TODO(shoban): We may want to optimize for ScalarLiteral here. -#endif - - // Call aggregateHelperColumnVector() to materialize each argument as a - // ColumnVector, then aggregate over those. - return aggregateHelperColumnVector(handle, arguments, filter); -} - -void StorageBlock::aggregateGroupBy( - const std::vector>> &arguments, - const std::vector> &group_by, - const TupleIdSequence *filter, - AggregationStateHashTableBase *hash_table, - std::vector> *reuse_group_by_vectors) const { - DCHECK_GT(group_by.size(), 0u) - << "Called aggregateGroupBy() with zero GROUP BY expressions"; - - SubBlocksReference sub_blocks_ref(*tuple_store_, - indices_, - indices_consistent_); - - // IDs of 'arguments' as attributes in the ValueAccessor we create below. - std::vector argument_ids; - - // IDs of GROUP BY key element(s) in the ValueAccessor we create below. - std::vector key_ids; - - // An intermediate ValueAccessor that stores the materialized 'arguments' for - // this aggregate, as well as the GROUP BY expression values. - ColumnVectorsValueAccessor temp_result; - { - std::unique_ptr accessor(tuple_store_->createValueAccessor(filter)); - attribute_id attr_id = 0; - - // First, put GROUP BY keys into 'temp_result'. - if (reuse_group_by_vectors->empty()) { - // Compute GROUP BY values from group_by Scalars, and store them in - // reuse_group_by_vectors for reuse by other aggregates on this same - // block. - reuse_group_by_vectors->reserve(group_by.size()); - for (const std::unique_ptr &group_by_element : group_by) { - reuse_group_by_vectors->emplace_back( - group_by_element->getAllValues(accessor.get(), &sub_blocks_ref)); - temp_result.addColumn(reuse_group_by_vectors->back().get(), false); - key_ids.push_back(attr_id++); - } - } else { - // Reuse precomputed GROUP BY values from reuse_group_by_vectors. - DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size()) - << "Wrong number of reuse_group_by_vectors"; - for (const std::unique_ptr &reuse_cv : *reuse_group_by_vectors) { - temp_result.addColumn(reuse_cv.get(), false); - key_ids.push_back(attr_id++); - } - } - - // Compute argument vectors and add them to 'temp_result'. - for (const std::vector> &argument : arguments) { - for (const std::unique_ptr &args : argument) { - temp_result.addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref)); - argument_ids.push_back(attr_id++); - } - if (argument.empty()) { - argument_ids.push_back(kInvalidAttributeID); - } - } - } - - hash_table->upsertValueAccessorCompositeKeyFast(argument_ids, - &temp_result, - key_ids, - true); -} - - -void StorageBlock::aggregateDistinct( - const AggregationHandle &handle, - const std::vector> &arguments, - const std::vector *arguments_as_attributes, - const std::vector> &group_by, - const TupleIdSequence *filter, - AggregationStateHashTableBase *distinctify_hash_table, - std::vector> *reuse_group_by_vectors) const { - DCHECK_GT(arguments.size(), 0u) - << "Called aggregateDistinct() with zero argument expressions"; - DCHECK((group_by.size() == 0 || reuse_group_by_vectors != nullptr)); - - std::vector key_ids; - - // An intermediate ValueAccessor that stores the materialized 'arguments' for - // this aggregate, as well as the GROUP BY expression values. - ColumnVectorsValueAccessor temp_result; - { - std::unique_ptr accessor(tuple_store_->createValueAccessor(filter)); - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - // If all the arguments to this aggregate are plain relation attributes, - // aggregate directly on a ValueAccessor from this block to avoid a copy. - if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) { - DCHECK_EQ(arguments.size(), arguments_as_attributes->size()) - << "Mismatch between number of arguments and number of attribute_ids"; - DCHECK_EQ(group_by.size(), 0u); - handle.insertValueAccessorIntoDistinctifyHashTable( - accessor.get(), *arguments_as_attributes, distinctify_hash_table); - return; - } -#endif - - SubBlocksReference sub_blocks_ref(*tuple_store_, - indices_, - indices_consistent_); - attribute_id attr_id = 0; - - if (!group_by.empty()) { - // Put GROUP BY keys into 'temp_result'. - if (reuse_group_by_vectors->empty()) { - // Compute GROUP BY values from group_by Scalars, and store them in - // reuse_group_by_vectors for reuse by other aggregates on this same - // block. - reuse_group_by_vectors->reserve(group_by.size()); - for (const std::unique_ptr &group_by_element : group_by) { - reuse_group_by_vectors->emplace_back( - group_by_element->getAllValues(accessor.get(), &sub_blocks_ref)); - temp_result.addColumn(reuse_group_by_vectors->back().get(), false); - key_ids.push_back(attr_id++); - } - } else { - // Reuse precomputed GROUP BY values from reuse_group_by_vectors. - DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size()) - << "Wrong number of reuse_group_by_vectors"; - for (const std::unique_ptr &reuse_cv : *reuse_group_by_vectors) { - temp_result.addColumn(reuse_cv.get(), false); - key_ids.push_back(attr_id++); - } - } - } - // Compute argument vectors and add them to 'temp_result'. - for (const std::unique_ptr &argument : arguments) { - temp_result.addColumn(argument->getAllValues(accessor.get(), &sub_blocks_ref)); - key_ids.push_back(attr_id++); - } - } - - handle.insertValueAccessorIntoDistinctifyHashTable( - &temp_result, key_ids, distinctify_hash_table); -} - // TODO(chasseur): Vectorization for updates. StorageBlock::UpdateResult StorageBlock::update( const unordered_map> &assignments, @@ -1262,61 +1100,6 @@ std::unordered_map* StorageBlock::generateUpdatedValue return update_map; } -AggregationState* StorageBlock::aggregateHelperColumnVector( - const AggregationHandle &handle, - const std::vector> &arguments, - const TupleIdSequence *matches) const { - if (arguments.empty()) { - // Special case. This is a nullary aggregate (i.e. COUNT(*)). - return handle.accumulateNullary(matches == nullptr ? tuple_store_->numTuples() - : matches->size()); - } else { - // Set up a ValueAccessor that will be used when materializing argument - // values below (possibly filtered based on the '*matches' to a filter - // predicate). - std::unique_ptr accessor; - if (matches == nullptr) { - accessor.reset(tuple_store_->createValueAccessor()); - } else { - accessor.reset(tuple_store_->createValueAccessor(matches)); - } - - SubBlocksReference sub_blocks_ref(*tuple_store_, - indices_, - indices_consistent_); - - // Materialize each argument's values for this block as a ColumnVector. - std::vector> column_vectors; - for (const std::unique_ptr &argument : arguments) { - column_vectors.emplace_back(argument->getAllValues(accessor.get(), &sub_blocks_ref)); - } - - // Have the AggregationHandle actually do the aggregation. - return handle.accumulateColumnVectors(column_vectors); - } -} - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -AggregationState* StorageBlock::aggregateHelperValueAccessor( - const AggregationHandle &handle, - const std::vector &argument_ids, - const TupleIdSequence *matches) const { - // Set up a ValueAccessor to aggregate over (possibly filtered based on the - // '*matches' to a filter predicate). - std::unique_ptr accessor; - if (matches == nullptr) { - accessor.reset(tuple_store_->createValueAccessor()); - } else { - accessor.reset(tuple_store_->createValueAccessor(matches)); - } - - // Have the AggregationHandle actually do the aggregation. - return handle.accumulateValueAccessor( - accessor.get(), - argument_ids); -} -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - void StorageBlock::updateHeader() { DEBUG_ASSERT(*static_cast(block_memory_) == block_header_.ByteSize()); @@ -1346,59 +1129,4 @@ const std::size_t StorageBlock::getNumTuples() const { return tuple_store_->numTuples(); } -void StorageBlock::aggregateGroupByPartitioned( - const std::vector>> &arguments, - const std::vector> &group_by, - const TupleIdSequence *filter, - const std::size_t num_partitions, - ColumnVectorsValueAccessor *temp_result, - std::vector *argument_ids, - std::vector *key_ids, - std::vector> *reuse_group_by_vectors) const { - DCHECK(!group_by.empty()) - << "Called aggregateGroupByPartitioned() with zero GROUP BY expressions"; - - SubBlocksReference sub_blocks_ref(*tuple_store_, - indices_, - indices_consistent_); - - std::unique_ptr accessor( - tuple_store_->createValueAccessor(filter)); - - attribute_id attr_id = 0; - - // First, put GROUP BY keys into 'temp_result'. - if (reuse_group_by_vectors->empty()) { - // Compute GROUP BY values from group_by Scalars, and store them in - // reuse_group_by_vectors for reuse by other aggregates on this same - // block. - reuse_group_by_vectors->reserve(group_by.size()); - for (const std::unique_ptr &group_by_element : group_by) { - reuse_group_by_vectors->emplace_back( - group_by_element->getAllValues(accessor.get(), &sub_blocks_ref)); - temp_result->addColumn(reuse_group_by_vectors->back().get(), false); - key_ids->push_back(attr_id++); - } - } else { - // Reuse precomputed GROUP BY values from reuse_group_by_vectors. - DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size()) - << "Wrong number of reuse_group_by_vectors"; - for (const std::unique_ptr &reuse_cv : *reuse_group_by_vectors) { - temp_result->addColumn(reuse_cv.get(), false); - key_ids->push_back(attr_id++); - } - } - - // Compute argument vectors and add them to 'temp_result'. - for (const std::vector> &argument : arguments) { - for (const std::unique_ptr &args : argument) { - temp_result->addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref)); - argument_ids->push_back(attr_id++); - } - if (argument.empty()) { - argument_ids->push_back(kInvalidAttributeID); - } - } -} - } // namespace quickstep