quickstep-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zuyu <...@git.apache.org>
Subject [GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...
Date Mon, 06 Feb 2017 05:12:48 GMT
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99517280
  
    --- Diff: storage/PackedPayloadHashTable.cpp ---
    @@ -0,0 +1,463 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#include "storage/PackedPayloadHashTable.hpp"
    +
    +#include <algorithm>
    +#include <cstddef>
    +#include <cstdint>
    +#include <cstdlib>
    +#include <vector>
    +
    +#include "expressions/aggregation/AggregationHandle.hpp"
    +#include "storage/HashTableKeyManager.hpp"
    +#include "storage/StorageBlob.hpp"
    +#include "storage/StorageBlockInfo.hpp"
    +#include "storage/StorageConstants.hpp"
    +#include "storage/StorageManager.hpp"
    +#include "storage/ValueAccessor.hpp"
    +#include "storage/ValueAccessorMultiplexer.hpp"
    +#include "threading/SpinMutex.hpp"
    +#include "threading/SpinSharedMutex.hpp"
    +#include "types/Type.hpp"
    +#include "types/containers/ColumnVectorsValueAccessor.hpp"
    +#include "utility/Alignment.hpp"
    +#include "utility/Macros.hpp"
    +#include "utility/PrimeNumber.hpp"
    +
    +#include "glog/logging.h"
    +
    +namespace quickstep {
    +
    +PackedPayloadHashTable::PackedPayloadHashTable(
    +    const std::vector<const Type *> &key_types,
    +    const std::size_t num_entries,
    +    const std::vector<AggregationHandle *> &handles,
    +    StorageManager *storage_manager)
    +    : key_types_(key_types),
    +      num_handles_(handles.size()),
    +      handles_(handles),
    +      total_payload_size_(ComputeTotalPayloadSize(handles)),
    +      storage_manager_(storage_manager),
    +      kBucketAlignment(alignof(std::atomic<std::size_t>)),
    +      kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
    +      key_manager_(key_types_, kValueOffset + total_payload_size_),
    +      bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
    +  std::size_t payload_offset_running_sum = sizeof(SpinMutex);
    +  for (const auto *handle : handles) {
    +    payload_offsets_.emplace_back(payload_offset_running_sum);
    +    payload_offset_running_sum += handle->getPayloadSize();
    +  }
    +
    +  // NOTE(jianqiao): Potential memory leak / double freeing by copying from
    +  // init_payload to buckets if payload contains out of line data.
    +  init_payload_ =
    +      static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
    +  DCHECK(init_payload_ != nullptr);
    +
    +  for (std::size_t i = 0; i < num_handles_; ++i) {
    +    handles_[i]->initPayload(init_payload_ + payload_offsets_[i]);
    +  }
    +
    +  // Bucket size always rounds up to the alignment requirement of the atomic
    +  // size_t "next" pointer at the front or a ValueT, whichever is larger.
    +  //
    +  // Give base HashTable information about what key components are stored
    +  // inline from 'key_manager_'.
    +  setKeyInline(key_manager_.getKeyInline());
    +
    +  // Pick out a prime number of slots and calculate storage requirements.
    +  std::size_t num_slots_tmp =
    +      get_next_prime_number(num_entries * kHashTableLoadFactor);
    +  std::size_t required_memory =
    +      sizeof(Header) + num_slots_tmp * sizeof(std::atomic<std::size_t>) +
    +      (num_slots_tmp / kHashTableLoadFactor) *
    +          (bucket_size_ + key_manager_.getEstimatedVariableKeySize());
    +  std::size_t num_storage_slots =
    +      this->storage_manager_->SlotsNeededForBytes(required_memory);
    +  if (num_storage_slots == 0) {
    +    FATAL_ERROR(
    +        "Storage requirement for SeparateChainingHashTable "
    +        "exceeds maximum allocation size.");
    +  }
    +
    +  // Get a StorageBlob to hold the hash table.
    +  const block_id blob_id =
    +      this->storage_manager_->createBlob(num_storage_slots);
    +  this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
    +
    +  void *aligned_memory_start = this->blob_->getMemoryMutable();
    +  std::size_t available_memory = num_storage_slots * kSlotSizeBytes;
    +  if (align(alignof(Header),
    +            sizeof(Header),
    +            aligned_memory_start,
    +            available_memory) == nullptr) {
    +    // With current values from StorageConstants.hpp, this should be
    +    // impossible. A blob is at least 1 MB, while a Header has alignment
    +    // requirement of just kCacheLineBytes (64 bytes).
    +    FATAL_ERROR(
    +        "StorageBlob used to hold resizable "
    +        "SeparateChainingHashTable is too small to meet alignment "
    +        "requirements of SeparateChainingHashTable::Header.");
    +  } else if (aligned_memory_start != this->blob_->getMemoryMutable()) {
    +    // This should also be impossible, since the StorageManager allocates slots
    +    // aligned to kCacheLineBytes.
    +    DEV_WARNING("StorageBlob memory adjusted by "
    +                << (num_storage_slots * kSlotSizeBytes - available_memory)
    +                << " bytes to meet alignment requirement for "
    +                << "SeparateChainingHashTable::Header.");
    +  }
    +
    +  // Locate the header.
    +  header_ = static_cast<Header *>(aligned_memory_start);
    +  aligned_memory_start =
    +      static_cast<char *>(aligned_memory_start) + sizeof(Header);
    +  available_memory -= sizeof(Header);
    +
    +  // Recompute the number of slots & buckets using the actual available memory.
    +  // Most likely, we got some extra free bucket space due to "rounding up" to
    +  // the storage blob's size. It's also possible (though very unlikely) that we
    +  // will wind up with fewer buckets than we initially wanted because of screwy
    +  // alignment requirements for ValueT.
    +  std::size_t num_buckets_tmp =
    +      available_memory /
    +      (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
    +       key_manager_.getEstimatedVariableKeySize());
    +  num_slots_tmp =
    +      get_previous_prime_number(num_buckets_tmp * kHashTableLoadFactor);
    +  num_buckets_tmp = num_slots_tmp / kHashTableLoadFactor;
    +  DEBUG_ASSERT(num_slots_tmp > 0);
    +  DEBUG_ASSERT(num_buckets_tmp > 0);
    +
    +  // Locate the slot array.
    +  slots_ = static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
    +  aligned_memory_start = static_cast<char *>(aligned_memory_start) +
    +                         sizeof(std::atomic<std::size_t>) * num_slots_tmp;
    +  available_memory -= sizeof(std::atomic<std::size_t>) * num_slots_tmp;
    +
    +  // Locate the buckets.
    +  buckets_ = aligned_memory_start;
    +  // Extra-paranoid: If ValueT has an alignment requirement greater than that
    +  // of std::atomic<std::size_t>, we may need to adjust the start of the bucket
    +  // array.
    +  if (align(kBucketAlignment, bucket_size_, buckets_, available_memory) ==
    +      nullptr) {
    +    FATAL_ERROR(
    +        "StorageBlob used to hold resizable "
    +        "SeparateChainingHashTable is too small to meet "
    +        "alignment requirements of buckets.");
    +  } else if (buckets_ != aligned_memory_start) {
    +    DEV_WARNING(
    +        "Bucket array start position adjusted to meet alignment "
    +        "requirement for SeparateChainingHashTable's value type.");
    +    if (num_buckets_tmp * bucket_size_ > available_memory) {
    +      --num_buckets_tmp;
    +    }
    +  }
    +
    +  // Fill in the header.
    +  header_->num_slots = num_slots_tmp;
    +  header_->num_buckets = num_buckets_tmp;
    +  header_->buckets_allocated.store(0, std::memory_order_relaxed);
    +  header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
    +  available_memory -= bucket_size_ * (header_->num_buckets);
    +
    +  // Locate variable-length key storage region, and give it all the remaining
    +  // bytes in the blob.
    +  key_manager_.setVariableLengthStorageInfo(
    +      static_cast<char *>(buckets_) + header_->num_buckets * bucket_size_,
    +      available_memory,
    +      &(header_->variable_length_bytes_allocated));
    +}
    +
    +PackedPayloadHashTable::~PackedPayloadHashTable() {
    +  if (blob_.valid()) {
    +    const block_id blob_id = blob_->getID();
    +    blob_.release();
    +    storage_manager_->deleteBlockOrBlobFile(blob_id);
    +  }
    +  std::free(init_payload_);
    +}
    +
    +void PackedPayloadHashTable::clear() {
    +  const std::size_t used_buckets =
    +      header_->buckets_allocated.load(std::memory_order_relaxed);
    +  // Destroy existing values, if necessary.
    +  destroyPayload();
    +
    +  // Zero-out slot array.
    +  std::memset(
    +      slots_, 0x0, sizeof(std::atomic<std::size_t>) * header_->num_slots);
    +
    +  // Zero-out used buckets.
    +  std::memset(buckets_, 0x0, used_buckets * bucket_size_);
    +
    +  header_->buckets_allocated.store(0, std::memory_order_relaxed);
    +  header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
    +  key_manager_.zeroNextVariableLengthKeyOffset();
    +}
    +
    +void PackedPayloadHashTable::destroyPayload() {
    +  const std::size_t num_buckets =
    +      header_->buckets_allocated.load(std::memory_order_relaxed);
    +  void *bucket_ptr = static_cast<char *>(buckets_) + kValueOffset;
    +  for (std::size_t bucket_num = 0; bucket_num < num_buckets; ++bucket_num) {
    +    for (std::size_t handle_id = 0; handle_id < num_handles_; ++handle_id) {
    +      void *value_internal_ptr =
    +          static_cast<char *>(bucket_ptr) + this->payload_offsets_[handle_id];
    +      handles_[handle_id]->destroyPayload(static_cast<std::uint8_t *>(value_internal_ptr));
    +    }
    +    bucket_ptr = static_cast<char *>(bucket_ptr) + bucket_size_;
    +  }
    +}
    +
    +bool PackedPayloadHashTable::upsertValueAccessorCompositeKey(
    +    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
    +    const std::vector<MultiSourceAttributeId> &key_attr_ids,
    +    const ValueAccessorMultiplexer &accessor_mux) {
    +  ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
    +  ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
    +
    +  base_accessor->beginIterationVirtual();
    +  if (derived_accessor == nullptr) {
    +    return upsertValueAccessorCompositeKeyInternal<false>(
    +        argument_ids,
    +        key_attr_ids,
    +        base_accessor,
    +        nullptr);
    +  } else {
    +    DCHECK(derived_accessor->getImplementationType()
    +               == ValueAccessor::Implementation::kColumnVectors);
    +    derived_accessor->beginIterationVirtual();
    +    return upsertValueAccessorCompositeKeyInternal<true>(
    +        argument_ids,
    +        key_attr_ids,
    +        base_accessor,
    +        static_cast<ColumnVectorsValueAccessor *>(derived_accessor));
    +  }
    +}
    +
    +void PackedPayloadHashTable::resize(const std::size_t extra_buckets,
    +                                    const std::size_t extra_variable_storage,
    +                                    const std::size_t retry_num) {
    +  // A retry should never be necessary with this implementation of HashTable.
    +  // Separate chaining ensures that any resized hash table with more buckets
    +  // than the original table will be able to hold more entries than the
    +  // original.
    +  DEBUG_ASSERT(retry_num == 0);
    --- End diff --
    
    Use `DCHECK_EQ` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message