Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7E9CC200C16 for ; Thu, 26 Jan 2017 01:44:06 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7D170160B5A; Thu, 26 Jan 2017 00:44:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 08FE0160B4E for ; Thu, 26 Jan 2017 01:44:04 +0100 (CET) Received: (qmail 40094 invoked by uid 500); 26 Jan 2017 00:44:04 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 40085 invoked by uid 99); 26 Jan 2017 00:44:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Jan 2017 00:44:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id BEC59184907 for ; Thu, 26 Jan 2017 00:44:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.218 X-Spam-Level: X-Spam-Status: No, score=-6.218 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 5SS9r79v9FZi for ; Thu, 26 Jan 2017 00:43:58 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 7322C5FB47 for ; Thu, 26 Jan 2017 00:43:56 +0000 (UTC) Received: (qmail 39212 invoked by uid 99); 26 Jan 2017 00:43:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Jan 2017 00:43:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 966FFDFD73; Thu, 26 Jan 2017 00:43:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Thu, 26 Jan 2017 00:43:56 -0000 Message-Id: <6ed5cc2f0b5748218d782e00d8c29bb8@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-quickstep git commit: Added tests for Partitioned Hash Join. archived-at: Thu, 26 Jan 2017 00:44:06 -0000 Added tests for Partitioned Hash Join. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/caa556d1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/caa556d1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/caa556d1 Branch: refs/heads/partitioned-hash-join-test Commit: caa556d1539150006e0b631de6c32b03f9efde6f Parents: 968ce3f Author: Zuyu Zhang Authored: Mon Jan 23 17:19:13 2017 -0800 Committer: Zuyu Zhang Committed: Wed Jan 25 16:43:37 2017 -0800 ---------------------------------------------------------------------- relational_operators/CMakeLists.txt | 2 + .../tests/HashJoinOperator_unittest.cpp | 693 +++++++++++++++++-- 2 files changed, 645 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/caa556d1/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index 78da7b8..4e56396 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -610,6 +610,8 @@ target_link_libraries(HashJoinOperator_unittest quickstep_catalog_CatalogDatabase quickstep_catalog_CatalogRelation quickstep_catalog_CatalogTypedefs + quickstep_catalog_PartitionScheme + quickstep_catalog_PartitionSchemeHeader quickstep_expressions_Expressions_proto quickstep_expressions_predicate_ComparisonPredicate quickstep_expressions_predicate_Predicate http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/caa556d1/relational_operators/tests/HashJoinOperator_unittest.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/tests/HashJoinOperator_unittest.cpp b/relational_operators/tests/HashJoinOperator_unittest.cpp index 60f05ea..03350d4 100644 --- a/relational_operators/tests/HashJoinOperator_unittest.cpp +++ b/relational_operators/tests/HashJoinOperator_unittest.cpp @@ -34,6 +34,8 @@ #include "catalog/CatalogDatabase.hpp" #include "catalog/CatalogRelation.hpp" #include "catalog/CatalogTypedefs.hpp" +#include "catalog/PartitionScheme.hpp" +#include "catalog/PartitionSchemeHeader.hpp" #include "expressions/Expressions.pb.h" #include "expressions/predicate/ComparisonPredicate.hpp" #include "expressions/predicate/Predicate.hpp" @@ -83,6 +85,8 @@ using std::snprintf; #endif +using std::make_unique; +using std::size_t; using std::unique_ptr; namespace quickstep { @@ -98,6 +102,7 @@ constexpr std::size_t kQueryId = 0; constexpr int kOpIndex = 0; constexpr std::size_t kSinglePartition = 1; +constexpr std::size_t kMultiplePartitions = 4; } // namespace @@ -144,56 +149,6 @@ class HashJoinOperatorTest : public ::testing::TestWithParam fact_table_->addAttribute(new CatalogAttribute(fact_table_, "int", int_type)); fact_table_->addAttribute(new CatalogAttribute(fact_table_, "char", char_type)); fact_table_->addAttribute(new CatalogAttribute(fact_table_, "varchar", varchar_type)); - - // Create StorageLayout - std::unique_ptr dim_layout(createStorageLayout(*dim_table_)); - std::unique_ptr fact_layout(createStorageLayout(*fact_table_)); - - // Insert tuples to dim table. - std::unique_ptr tuple; - MutableBlockReference storage_block; - for (tuple_id i = 0; i < kNumDimTuples; i += kBlockSize) { - // Create block. - block_id block_id = storage_manager_->createBlock(*dim_table_, *dim_layout); - storage_block = storage_manager_->getBlockMutable(block_id, *dim_table_); - dim_table_->addBlock(block_id); - - // Insert tuples. - tuple_id block_bound = i + kBlockSize < kNumDimTuples ? i + kBlockSize : kNumDimTuples; - for (tuple_id tid = i; tid < block_bound; ++tid) { - // First attribute (long): a sequence id. - // Second attribute (int): a looped value to test duplicate keys. - // Third attribute (char): an identical value to test Cartesian product. - // Forth attribute (varchar): a value to test duplicate variable-length keys. - tuple.reset(createTuple(*dim_table_, tid, tid % kBlockSize, 100, tid / 2 * 2)); - EXPECT_TRUE(storage_block->insertTupleInBatch(*tuple)); - } - storage_block->rebuild(); - } - - // Insert tuples to fact table. - for (tuple_id i = 0; i < kNumFactTuples; i += kBlockSize) { - // Create block - block_id block_id = storage_manager_->createBlock(*fact_table_, *fact_layout); - storage_block = storage_manager_->getBlockMutable(block_id, *fact_table_); - fact_table_->addBlock(block_id); - - // Insert tuples - tuple_id block_bound = i + kBlockSize < kNumFactTuples ? i + kBlockSize : kNumFactTuples; - for (tuple_id tid = i; tid < block_bound; ++tid) { - // First attribute (long): a sequence id to join with dim_table.long. Each tuple has - // exact one match. - // Second attribute (int): a sequence id to join with dim_table.int. Each tuple in the - // first kBlockSize tuples has mutiple matches. Other tuples - // have no match. - // Third attribute (char): an identical value to test Cartesian product. - // Forth attribute (varchar): a sequence id to join with dim_table.var_char. Each tuple - // has two matches. - tuple.reset(createTuple(*fact_table_, tid, tid, 100, tid)); - EXPECT_TRUE(storage_block->insertTupleInBatch(*tuple)); - } - storage_block->rebuild(); - } } virtual void TearDown() { @@ -259,6 +214,127 @@ class HashJoinOperatorTest : public ::testing::TestWithParam return new Tuple(std::move(attr_values)); } + void insertTuplesWithoutPartitions() { + // Create StorageLayout + std::unique_ptr dim_layout(createStorageLayout(*dim_table_)); + std::unique_ptr fact_layout(createStorageLayout(*fact_table_)); + + // Insert tuples to dim table. + std::unique_ptr tuple; + MutableBlockReference storage_block; + for (tuple_id i = 0; i < kNumDimTuples; i += kBlockSize) { + // Create block. + block_id block_id = storage_manager_->createBlock(*dim_table_, *dim_layout); + storage_block = storage_manager_->getBlockMutable(block_id, *dim_table_); + dim_table_->addBlock(block_id); + + // Insert tuples. + tuple_id block_bound = i + kBlockSize < kNumDimTuples ? i + kBlockSize : kNumDimTuples; + for (tuple_id tid = i; tid < block_bound; ++tid) { + // First attribute (long): a sequence id. + // Second attribute (int): a looped value to test duplicate keys. + // Third attribute (char): an identical value to test Cartesian product. + // Forth attribute (varchar): a value to test duplicate variable-length keys. + tuple.reset(createTuple(*dim_table_, tid, tid % kBlockSize, 100, tid / 2 * 2)); + EXPECT_TRUE(storage_block->insertTupleInBatch(*tuple)); + } + storage_block->rebuild(); + } + + // Insert tuples to fact table. + for (tuple_id i = 0; i < kNumFactTuples; i += kBlockSize) { + // Create block + block_id block_id = storage_manager_->createBlock(*fact_table_, *fact_layout); + storage_block = storage_manager_->getBlockMutable(block_id, *fact_table_); + fact_table_->addBlock(block_id); + + // Insert tuples + tuple_id block_bound = i + kBlockSize < kNumFactTuples ? i + kBlockSize : kNumFactTuples; + for (tuple_id tid = i; tid < block_bound; ++tid) { + // First attribute (long): a sequence id to join with dim_table.long. Each tuple has + // exact one match. + // Second attribute (int): a sequence id to join with dim_table.int. Each tuple in the + // first kBlockSize tuples has mutiple matches. Other tuples + // have no match. + // Third attribute (char): an identical value to test Cartesian product. + // Forth attribute (varchar): a sequence id to join with dim_table.var_char. Each tuple + // has two matches. + tuple.reset(createTuple(*fact_table_, tid, tid, 100, tid)); + EXPECT_TRUE(storage_block->insertTupleInBatch(*tuple)); + } + storage_block->rebuild(); + } + } + + void insertTuplesWithSingleAttributePartitions() { + // Set PartitionScheme. + dim_part_scheme_ = new PartitionScheme( + new HashPartitionSchemeHeader(kMultiplePartitions, dim_table_->getAttributeByName("long")->getID())); + dim_table_->setPartitionScheme(dim_part_scheme_); + + fact_part_scheme_ = new PartitionScheme( + new HashPartitionSchemeHeader(kMultiplePartitions, fact_table_->getAttributeByName("long")->getID())); + fact_table_->setPartitionScheme(fact_part_scheme_); + + // Create StorageLayout + std::unique_ptr dim_layout(createStorageLayout(*dim_table_)); + std::unique_ptr fact_layout(createStorageLayout(*fact_table_)); + + // Create blocks per partition. The index is the partition id. + std::vector dim_partitioned_blocks; + for (partition_id part_id = 0; part_id < kMultiplePartitions; ++part_id) { + const block_id block = storage_manager_->createBlock(*dim_table_, *dim_layout); + dim_part_scheme_->addBlockToPartition(block, part_id); + // For a simpler teardown. + dim_table_->addBlock(block); + + dim_partitioned_blocks.push_back(storage_manager_->getBlockMutable(block, *dim_table_)); + } + + // Insert tuples to dim table. + for (tuple_id tid = 0; tid < kNumDimTuples; ++tid) { + // First attribute (long): a sequence id. + // Second attribute (int): a looped value to test duplicate keys. + // Third attribute (char): an identical value to test Cartesian product. + // Forth attribute (varchar): a value to test duplicate variable-length keys. + unique_ptr tuple(createTuple(*dim_table_, tid, tid % kBlockSize, 100, tid / 2 * 2)); + EXPECT_TRUE(dim_partitioned_blocks[tid % kMultiplePartitions]->insertTupleInBatch(*tuple)); + } + + for (size_t i = 0; i < dim_partitioned_blocks.size(); ++i) { + dim_partitioned_blocks[i]->rebuild(); + } + + // Create blocks per partition. The index is the partition id. + std::vector fact_partitioned_blocks; + for (partition_id part_id = 0; part_id < kMultiplePartitions; ++part_id) { + const block_id block = storage_manager_->createBlock(*fact_table_, *fact_layout); + fact_part_scheme_->addBlockToPartition(block, part_id); + // For a simpler teardown. + fact_table_->addBlock(block); + + fact_partitioned_blocks.push_back(storage_manager_->getBlockMutable(block, *fact_table_)); + } + + // Insert tuples to fact table. + for (tuple_id tid = 0; tid < kNumFactTuples; ++tid) { + // First attribute (long): a sequence id to join with dim_table.long. Each tuple has + // exact one match. + // Second attribute (int): a sequence id to join with dim_table.int. Each tuple in the + // first kBlockSize tuples has mutiple matches. Other tuples + // have no match. + // Third attribute (char): an identical value to test Cartesian product. + // Forth attribute (varchar): a sequence id to join with dim_table.var_char. Each tuple + // has two matches. + unique_ptr tuple(createTuple(*fact_table_, tid, tid, 100, tid)); + EXPECT_TRUE(fact_partitioned_blocks[tid % kMultiplePartitions]->insertTupleInBatch(*tuple)); + } + + for (size_t i = 0; i < fact_partitioned_blocks.size(); ++i) { + fact_partitioned_blocks[i]->rebuild(); + } + } + void fetchAndExecuteWorkOrders(RelationalOperator *op) { // Note: We treat each operator as an individual query plan DAG. The // index for each operator should be set, so that the WorkOrdersContainer @@ -293,9 +369,13 @@ class HashJoinOperatorTest : public ::testing::TestWithParam unique_ptr db_; // The following CatalogRelations are owned by db_. CatalogRelation *dim_table_, *fact_table_; + // The following PartitionSchemes are owned by its own CatalogRelation, respectively. + PartitionScheme *dim_part_scheme_ = nullptr, *fact_part_scheme_ = nullptr; }; TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) { + insertTuplesWithoutPartitions(); + // Setup the hash table proto in the query context proto. serialization::QueryContext query_context_proto; query_context_proto.set_query_id(kQueryId); @@ -439,6 +519,8 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) { } TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) { + insertTuplesWithoutPartitions(); + // Setup the hash table proto in the query context proto. serialization::QueryContext query_context_proto; query_context_proto.set_query_id(kQueryId); @@ -612,6 +694,8 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) { } TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) { + insertTuplesWithoutPartitions(); + // Setup the hash table proto in the query context proto. serialization::QueryContext query_context_proto; query_context_proto.set_query_id(kQueryId); @@ -750,6 +834,8 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) { } TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) { + insertTuplesWithoutPartitions(); + // Setup the hash table proto in the query context proto. serialization::QueryContext query_context_proto; query_context_proto.set_query_id(kQueryId); @@ -920,6 +1006,8 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) { } TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) { + insertTuplesWithoutPartitions(); + // Setup the hash table proto in the query context proto. serialization::QueryContext query_context_proto; query_context_proto.set_query_id(kQueryId); @@ -1100,6 +1188,8 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) { // Same as above test, but add an additional residual filter predicate. TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) { + insertTuplesWithoutPartitions(); + // Setup the hash table proto in the query context proto. serialization::QueryContext query_context_proto; query_context_proto.set_query_id(kQueryId); @@ -1288,6 +1378,509 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) { db_->dropRelationById(output_relation_id); } +// Hash join tests with single attribute partitions. +TEST_P(HashJoinOperatorTest, SinlgeAttributePartitionedLongKeyHashJoinTest) { + insertTuplesWithSingleAttributePartitions(); + + // Setup the hash table proto in the query context proto. + serialization::QueryContext query_context_proto; + query_context_proto.set_query_id(kQueryId); + + const QueryContext::join_hash_table_id join_hash_table_index = + query_context_proto.join_hash_tables_size(); + + serialization::QueryContext::HashTableContext *hash_table_context_proto = + query_context_proto.add_join_hash_tables(); + hash_table_context_proto->set_num_partitions(kMultiplePartitions); + + serialization::HashTable *hash_table_proto = + hash_table_context_proto->mutable_join_hash_table(); + switch (GetParam()) { + case HashTableImplType::kLinearOpenAddressing: + hash_table_proto->set_hash_table_impl_type( + serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING); + break; + case HashTableImplType::kSeparateChaining: + hash_table_proto->set_hash_table_impl_type( + serialization::HashTableImplType::SEPARATE_CHAINING); + break; + case HashTableImplType::kSimpleScalarSeparateChaining: + if (TypedValue::HashIsReversible(kLong)) { + hash_table_proto->set_hash_table_impl_type( + serialization::HashTableImplType::SIMPLE_SCALAR_SEPARATE_CHAINING); + break; + } else { + // Can't use SimpleScalarSeparateChainingHashTable for long keys on + // this platform. + return; + } + default: + FATAL_ERROR("Unknown HashTable type requested for join."); + } + + const Type &long_type = LongType::InstanceNonNullable(); + + hash_table_proto->add_key_types()->MergeFrom(long_type.getProto()); + hash_table_proto->set_estimated_num_entries(kNumDimTuples); + + const CatalogAttribute &dim_col_long = *dim_table_->getAttributeByName("long"); + const CatalogAttribute &fact_col_long = *fact_table_->getAttributeByName("long"); + + // Create the builder operator. + unique_ptr builder(new BuildHashOperator( + kQueryId, + *dim_table_, + true /* is_stored */, + { dim_col_long.getID() }, + dim_col_long.getType().isNullable(), + kMultiplePartitions, + join_hash_table_index)); + + // Create the prober operator with one selection attribute. + const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size(); + ScalarAttribute scalar_attr(dim_col_long); + query_context_proto.add_scalar_groups()->add_scalars()->MergeFrom(scalar_attr.getProto()); + + // Create result_table, owned by db_. + CatalogRelation *result_table = new CatalogRelation(NULL, "result_table", 102); + result_table->addAttribute(new CatalogAttribute(result_table, "long", long_type)); + + const relation_id output_relation_id = db_->addRelation(result_table); + + // Setup the InsertDestination proto in the query context proto. + const QueryContext::insert_destination_id output_destination_index = + query_context_proto.insert_destinations_size(); + serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations(); + + insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL); + insert_destination_proto->set_relation_id(output_relation_id); + insert_destination_proto->set_relational_op_index(kOpIndex); + + unique_ptr prober(new HashJoinOperator( + kQueryId, + *dim_table_, + *fact_table_, + true /* is_stored */, + { fact_col_long.getID() }, + fact_col_long.getType().isNullable(), + kMultiplePartitions, + *result_table, + output_destination_index, + join_hash_table_index, + QueryContext::kInvalidPredicateId /* residual_predicate_index */, + selection_index)); + + // Set up the QueryContext. + query_context_ = + make_unique(query_context_proto, *db_, storage_manager_.get(), foreman_client_id_, &bus_); + + // Execute the operators. + fetchAndExecuteWorkOrders(builder.get()); + + prober->informAllBlockingDependenciesMet(); + fetchAndExecuteWorkOrders(prober.get()); + + // Check result values + // Note that the results might be in a different order. + std::size_t num_result_tuples = 0; + std::unique_ptr counts(new std::size_t[kNumDimTuples]); + std::memset(counts.get(), 0, sizeof(std::size_t) * kNumDimTuples); + + DCHECK(query_context_); + InsertDestination *insert_destination = query_context_->getInsertDestination(prober->getInsertDestinationID()); + DCHECK(insert_destination); + + const std::vector &result_blocks = insert_destination->getTouchedBlocks(); + for (std::size_t bid = 0; bid < result_blocks.size(); ++bid) { + BlockReference result_block = storage_manager_->getBlock(result_blocks[bid], + insert_destination->getRelation()); + const TupleStorageSubBlock &result_tuple_sub_block = result_block->getTupleStorageSubBlock(); + num_result_tuples += result_tuple_sub_block.numTuples(); + for (tuple_id i = 0; i <= result_tuple_sub_block.getMaxTupleID(); ++i) { + if (result_tuple_sub_block.hasTupleWithID(i)) { + TypedValue typed_value = result_tuple_sub_block.getAttributeValueTyped( + i, result_table->getAttributeByName("long")->getID()); + std::int64_t value = typed_value.getLiteral(); + ASSERT_GE(value, 0); + ASSERT_LT(value, static_cast(kNumDimTuples)); + ++counts[value]; + } + } + + // Drop the block. + result_block.release(); + storage_manager_->deleteBlockOrBlobFile(result_blocks[bid]); + } + EXPECT_EQ(static_cast(kNumDimTuples), num_result_tuples); + + for (tuple_id i = 0; i < kNumDimTuples; ++i) { + EXPECT_EQ(1u, counts[i]); + } + + // Create cleaner operator. + auto cleaner = make_unique(kQueryId, kMultiplePartitions, join_hash_table_index); + cleaner->informAllBlockingDependenciesMet(); + fetchAndExecuteWorkOrders(cleaner.get()); + + db_->dropRelationById(output_relation_id); +} + +TEST_P(HashJoinOperatorTest, SinlgeAttributePartitionedCompositeKeyHashJoinTest) { + insertTuplesWithSingleAttributePartitions(); + + // Setup the hash table proto in the query context proto. + serialization::QueryContext query_context_proto; + query_context_proto.set_query_id(kQueryId); + + const QueryContext::join_hash_table_id join_hash_table_index = + query_context_proto.join_hash_tables_size(); + + serialization::QueryContext::HashTableContext *hash_table_context_proto = + query_context_proto.add_join_hash_tables(); + hash_table_context_proto->set_num_partitions(kMultiplePartitions); + + serialization::HashTable *hash_table_proto = + hash_table_context_proto->mutable_join_hash_table(); + switch (GetParam()) { + case HashTableImplType::kLinearOpenAddressing: + hash_table_proto->set_hash_table_impl_type( + serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING); + break; + case HashTableImplType::kSeparateChaining: + hash_table_proto->set_hash_table_impl_type( + serialization::HashTableImplType::SEPARATE_CHAINING); + break; + case HashTableImplType::kSimpleScalarSeparateChaining: + // Can't use SimpleScalarSeparateChainingHashTable with composite keys. + return; + default: + FATAL_ERROR("Unknown HashTable type requested for join."); + } + + const Type &long_type = LongType::InstanceNonNullable(); + const Type &varchar_type = VarCharType::InstanceNonNullable(kCharLength); + + hash_table_proto->add_key_types()->MergeFrom(long_type.getProto()); + hash_table_proto->add_key_types()->MergeFrom(varchar_type.getProto()); + hash_table_proto->set_estimated_num_entries(kNumDimTuples); + + const CatalogAttribute &dim_col_long = *dim_table_->getAttributeByName("long"); + const CatalogAttribute &dim_col_varchar = *dim_table_->getAttributeByName("varchar"); + const CatalogAttribute &fact_col_long = *fact_table_->getAttributeByName("long"); + const CatalogAttribute &fact_col_varchar = *fact_table_->getAttributeByName("varchar"); + + // Create the builder operator. + unique_ptr builder(new BuildHashOperator( + kQueryId, + *dim_table_, + true /* is_stored */, + { dim_col_long.getID(), dim_col_varchar.getID() }, + dim_col_long.getType().isNullable() || dim_col_varchar.getType().isNullable(), + kMultiplePartitions, + join_hash_table_index)); + + // Create the prober operator with two selection attributes. + const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size(); + serialization::QueryContext::ScalarGroup *scalar_group_proto = query_context_proto.add_scalar_groups(); + + ScalarAttribute scalar_attr_dim(dim_col_long); + scalar_group_proto->add_scalars()->MergeFrom(scalar_attr_dim.getProto()); + ScalarAttribute scalar_attr_fact(fact_col_long); + scalar_group_proto->add_scalars()->MergeFrom(scalar_attr_fact.getProto()); + + // Create result_table, owned by db_. + CatalogRelation *result_table = new CatalogRelation(NULL, "result_table", 102); + result_table->addAttribute(new CatalogAttribute(result_table, "dim_long", long_type)); + result_table->addAttribute(new CatalogAttribute(result_table, "fact_long", long_type)); + + const relation_id output_relation_id = db_->addRelation(result_table); + + // Setup the InsertDestination proto in the query context proto. + const QueryContext::insert_destination_id output_destination_index = + query_context_proto.insert_destinations_size(); + serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations(); + + insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL); + insert_destination_proto->set_relation_id(output_relation_id); + insert_destination_proto->set_relational_op_index(kOpIndex); + + unique_ptr prober(new HashJoinOperator( + kQueryId, + *dim_table_, + *fact_table_, + true /* is_stored */, + { fact_col_long.getID(), fact_col_varchar.getID() }, + fact_col_long.getType().isNullable() || fact_col_varchar.getType().isNullable(), + kMultiplePartitions, + *result_table, + output_destination_index, + join_hash_table_index, + QueryContext::kInvalidPredicateId /* residual_predicate_index */, + selection_index)); + + // Set up the QueryContext. + query_context_ = + make_unique(query_context_proto, *db_, storage_manager_.get(), foreman_client_id_, &bus_); + + // Execute the operators. + fetchAndExecuteWorkOrders(builder.get()); + + prober->informAllBlockingDependenciesMet(); + fetchAndExecuteWorkOrders(prober.get()); + + // Check result values + // Note that the results might be in a different order. + std::size_t num_result_tuples = 0; + + std::unique_ptr dim_counts(new std::size_t[kNumDimTuples]); + std::memset(dim_counts.get(), 0, sizeof(std::size_t) * kNumDimTuples); + + std::unique_ptr fact_counts(new std::size_t[kNumFactTuples]); + std::memset(fact_counts.get(), 0, sizeof(std::size_t) * kNumFactTuples); + + InsertDestination *insert_destination = query_context_->getInsertDestination(prober->getInsertDestinationID()); + DCHECK(insert_destination); + + const std::vector &result_blocks = insert_destination->getTouchedBlocks(); + for (std::size_t bid = 0; bid < result_blocks.size(); ++bid) { + BlockReference result_block = storage_manager_->getBlock(result_blocks[bid], + insert_destination->getRelation()); + const TupleStorageSubBlock &result_tuple_sub_block = result_block->getTupleStorageSubBlock(); + num_result_tuples += result_tuple_sub_block.numTuples(); + for (tuple_id i = 0; i <= result_tuple_sub_block.getMaxTupleID(); ++i) { + if (result_tuple_sub_block.hasTupleWithID(i)) { + TypedValue typed_value = result_tuple_sub_block.getAttributeValueTyped( + i, result_table->getAttributeByName("dim_long")->getID()); + std::int64_t value = typed_value.getLiteral(); + ASSERT_GE(value, 0); + ASSERT_LT(value, static_cast(kNumDimTuples)); + ++dim_counts[value]; + + typed_value = result_tuple_sub_block.getAttributeValueTyped( + i, result_table->getAttributeByName("fact_long")->getID()); + value = typed_value.getLiteral(); + ASSERT_GE(value, 0); + ASSERT_LT(value, static_cast(kNumFactTuples)); + ++fact_counts[value]; + } + } + + // Drop the block. + result_block.release(); + storage_manager_->deleteBlockOrBlobFile(result_blocks[bid]); + } + EXPECT_EQ(static_cast(kNumDimTuples) / 2, num_result_tuples); + + for (tuple_id i = 0; i < kNumDimTuples; ++i) { + if (i & 0x1) { + EXPECT_EQ(0u, dim_counts[i]); + } else { + EXPECT_EQ(1u, dim_counts[i]); + } + } + for (tuple_id i = 0; i < kNumFactTuples; ++i) { + if (i >= kNumDimTuples) { + EXPECT_EQ(0u, fact_counts[i]); + } else { + if (i & 0x1) { + EXPECT_EQ(0u, fact_counts[i]); + } else { + EXPECT_EQ(1u, fact_counts[i]); + } + } + } + + // Create cleaner operator. + auto cleaner = make_unique(kQueryId, kMultiplePartitions, join_hash_table_index); + cleaner->informAllBlockingDependenciesMet(); + fetchAndExecuteWorkOrders(cleaner.get()); + + db_->dropRelationById(output_relation_id); +} + +TEST_P(HashJoinOperatorTest, SinlgeAttributePartitionedCompositeKeyHashJoinWithResidualPredicateTest) { + insertTuplesWithSingleAttributePartitions(); + + // Setup the hash table proto in the query context proto. + serialization::QueryContext query_context_proto; + query_context_proto.set_query_id(kQueryId); + + const QueryContext::join_hash_table_id join_hash_table_index = + query_context_proto.join_hash_tables_size(); + + serialization::QueryContext::HashTableContext *hash_table_context_proto = + query_context_proto.add_join_hash_tables(); + hash_table_context_proto->set_num_partitions(kMultiplePartitions); + + serialization::HashTable *hash_table_proto = + hash_table_context_proto->mutable_join_hash_table(); + switch (GetParam()) { + case HashTableImplType::kLinearOpenAddressing: + hash_table_proto->set_hash_table_impl_type( + serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING); + break; + case HashTableImplType::kSeparateChaining: + hash_table_proto->set_hash_table_impl_type( + serialization::HashTableImplType::SEPARATE_CHAINING); + break; + case HashTableImplType::kSimpleScalarSeparateChaining: + // Can't use SimpleScalarSeparateChainingHashTable with composite keys. + return; + default: + FATAL_ERROR("Unknown HashTable type requested for join."); + } + + const Type &long_type = LongType::InstanceNonNullable(); + const Type &varchar_type = VarCharType::InstanceNonNullable(kCharLength); + + hash_table_proto->add_key_types()->MergeFrom(long_type.getProto()); + hash_table_proto->add_key_types()->MergeFrom(varchar_type.getProto()); + hash_table_proto->set_estimated_num_entries(kNumDimTuples); + + const CatalogAttribute &dim_col_long = *dim_table_->getAttributeByName("long"); + const CatalogAttribute &dim_col_varchar = *dim_table_->getAttributeByName("varchar"); + const CatalogAttribute &fact_col_long = *fact_table_->getAttributeByName("long"); + const CatalogAttribute &fact_col_varchar = *fact_table_->getAttributeByName("varchar"); + + // Create the builder operator. + unique_ptr builder(new BuildHashOperator( + kQueryId, + *dim_table_, + true /* is_stored */, + { dim_col_long.getID(), dim_col_varchar.getID() }, + dim_col_long.getType().isNullable() || dim_col_varchar.getType().isNullable(), + kMultiplePartitions, + join_hash_table_index)); + + // Create prober operator with two selection attributes. + const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size(); + serialization::QueryContext::ScalarGroup *scalar_group_proto = query_context_proto.add_scalar_groups(); + + ScalarAttribute scalar_attr_dim(dim_col_long); + scalar_group_proto->add_scalars()->MergeFrom(scalar_attr_dim.getProto()); + ScalarAttribute scalar_attr_fact(fact_col_long); + scalar_group_proto->add_scalars()->MergeFrom(scalar_attr_fact.getProto()); + + // Create result_table, owned by db_. + CatalogRelation *result_table = new CatalogRelation(NULL, "result_table", 102); + result_table->addAttribute(new CatalogAttribute(result_table, "dim_long", long_type)); + result_table->addAttribute(new CatalogAttribute(result_table, "fact_long", long_type)); + + const relation_id output_relation_id = db_->addRelation(result_table); + + // Setup the InsertDestination proto in the query context proto. + const QueryContext::insert_destination_id output_destination_index = + query_context_proto.insert_destinations_size(); + serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations(); + + insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL); + insert_destination_proto->set_relation_id(output_relation_id); + insert_destination_proto->set_relational_op_index(kOpIndex); + + // Include a residual predicate that selects a subset of the joined tuples. + unique_ptr residual_pred(new ComparisonPredicate( + ComparisonFactory::GetComparison( + ComparisonID::kLess), + new ScalarAttribute(dim_col_long), + new ScalarLiteral(TypedValue(static_cast(15)), LongType::InstanceNonNullable()))); + + const QueryContext::predicate_id residual_pred_index = query_context_proto.predicates_size(); + query_context_proto.add_predicates()->MergeFrom(residual_pred->getProto()); + + unique_ptr prober(new HashJoinOperator( + kQueryId, + *dim_table_, + *fact_table_, + true /* is_stored */, + { fact_col_long.getID(), fact_col_varchar.getID() }, + fact_col_long.getType().isNullable() || fact_col_varchar.getType().isNullable(), + kMultiplePartitions, + *result_table, + output_destination_index, + join_hash_table_index, + residual_pred_index, + selection_index)); + + // Set up the QueryContext. + query_context_ = + make_unique(query_context_proto, *db_, storage_manager_.get(), foreman_client_id_, &bus_); + + // Execute the operators. + fetchAndExecuteWorkOrders(builder.get()); + + prober->informAllBlockingDependenciesMet(); + fetchAndExecuteWorkOrders(prober.get()); + + // Check result values + // Note that the results might be in a different order. + std::size_t num_result_tuples = 0; + + std::unique_ptr dim_counts(new std::size_t[kNumDimTuples]); + std::memset(dim_counts.get(), 0, sizeof(std::size_t) * kNumDimTuples); + + std::unique_ptr fact_counts(new std::size_t[kNumFactTuples]); + std::memset(fact_counts.get(), 0, sizeof(std::size_t) * kNumFactTuples); + + InsertDestination *insert_destination = query_context_->getInsertDestination(prober->getInsertDestinationID()); + DCHECK(insert_destination); + + const std::vector &result_blocks = insert_destination->getTouchedBlocks(); + for (std::size_t bid = 0; bid < result_blocks.size(); ++bid) { + BlockReference result_block = storage_manager_->getBlock(result_blocks[bid], + insert_destination->getRelation()); + const TupleStorageSubBlock &result_tuple_sub_block = result_block->getTupleStorageSubBlock(); + num_result_tuples += result_tuple_sub_block.numTuples(); + for (tuple_id i = 0; i <= result_tuple_sub_block.getMaxTupleID(); ++i) { + if (result_tuple_sub_block.hasTupleWithID(i)) { + TypedValue typed_value = result_tuple_sub_block.getAttributeValueTyped( + i, result_table->getAttributeByName("dim_long")->getID()); + std::int64_t value = typed_value.getLiteral(); + ASSERT_GE(value, 0); + ASSERT_LT(value, static_cast(kNumDimTuples)); + ++dim_counts[value]; + + typed_value = result_tuple_sub_block.getAttributeValueTyped( + i, result_table->getAttributeByName("fact_long")->getID()); + value = typed_value.getLiteral(); + ASSERT_GE(value, 0); + ASSERT_LT(value, static_cast(kNumFactTuples)); + ++fact_counts[value]; + } + } + + // Drop the block. + result_block.release(); + storage_manager_->deleteBlockOrBlobFile(result_blocks[bid]); + } + EXPECT_EQ(8u, num_result_tuples); + + for (tuple_id i = 0; i < kNumDimTuples; ++i) { + if ((i & 0x1) || (i >= 15)) { + EXPECT_EQ(0u, dim_counts[i]); + } else { + EXPECT_EQ(1u, dim_counts[i]); + } + } + for (tuple_id i = 0; i < kNumFactTuples; ++i) { + if (i >= 15) { + EXPECT_EQ(0u, fact_counts[i]); + } else { + if (i & 0x1) { + EXPECT_EQ(0u, fact_counts[i]); + } else { + EXPECT_EQ(1u, fact_counts[i]); + } + } + } + + // Create cleaner operator. + auto cleaner = make_unique(kQueryId, kMultiplePartitions, join_hash_table_index); + cleaner->informAllBlockingDependenciesMet(); + fetchAndExecuteWorkOrders(cleaner.get()); + + db_->dropRelationById(output_relation_id); +} + // Note: INSTANTIATE_TEST_CASE_P has variadic arguments part. If the variable argument part // is empty, C++11 standard says it should produce a warning. A warning is converted // to an error since we use -Werror as a compiler parameter. It causes Travis to build.