Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 704E8200B58 for ; Wed, 13 Jul 2016 04:04:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6E909160A79; Wed, 13 Jul 2016 02:04:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5365A160A56 for ; Wed, 13 Jul 2016 04:04:42 +0200 (CEST) Received: (qmail 42421 invoked by uid 500); 13 Jul 2016 02:04:41 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 42407 invoked by uid 99); 13 Jul 2016 02:04:41 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Jul 2016 02:04:41 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 03D7E180647 for ; Wed, 13 Jul 2016 02:04:41 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id ZHWH4sCicoRo for ; Wed, 13 Jul 2016 02:04:33 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id E4C3660D12 for ; Wed, 13 Jul 2016 02:04:30 +0000 (UTC) Received: (qmail 42393 invoked by uid 99); 13 Jul 2016 02:04:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Jul 2016 02:04:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0EFA7E1021; Wed, 13 Jul 2016 02:04:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hbdeshmukh@apache.org To: commits@quickstep.incubator.apache.org Date: Wed, 13 Jul 2016 02:04:30 -0000 Message-Id: <153824390a5e496ab2ed1fa3122f5693@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-quickstep git commit: Introduced QueryManagerBase, and renamed QueryManagerSingleNode. archived-at: Wed, 13 Jul 2016 02:04:44 -0000 Repository: incubator-quickstep Updated Branches: refs/heads/master 7671a5893 -> b4e25edce http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/tests/QueryManagerSingleNode_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp new file mode 100644 index 0000000..52cee20 --- /dev/null +++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp @@ -0,0 +1,942 @@ +/** + * Copyright 2011-2015 Quickstep Technologies LLC. + * Copyright 2015-2016 Pivotal Software, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include +#include +#include +#include +#include + +#include "catalog/CatalogDatabase.hpp" +#include "catalog/CatalogRelation.hpp" +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/QueryContext.hpp" +#include "query_execution/QueryContext.pb.h" +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionState.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryManagerSingleNode.hpp" +#include "query_execution/WorkOrdersContainer.hpp" +#include "query_execution/WorkerDirectory.hpp" +#include "query_execution/WorkerMessage.hpp" +#include "query_optimizer/QueryHandle.hpp" +#include "query_optimizer/QueryPlan.hpp" +#include "relational_operators/RelationalOperator.hpp" +#include "relational_operators/WorkOrder.hpp" +#include "storage/InsertDestination.hpp" +#include "storage/InsertDestination.pb.h" +#include "storage/StorageBlock.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageManager.hpp" +#include "utility/DAG.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" +#include "gtest/gtest.h" + +#include "tmb/id_typedefs.h" +#include "tmb/tagged_message.h" + +namespace tmb { class MessageBus; } + +using std::move; +using std::unique_ptr; +using std::vector; + +using tmb::client_id; + +namespace quickstep { + +class WorkOrderProtosContainer; + +class MockWorkOrder : public WorkOrder { + public: + explicit MockWorkOrder(const int op_index) + : WorkOrder(0), op_index_(op_index) {} + + void execute() override { + VLOG(3) << "WorkOrder[" << op_index_ << "] executing."; + } + + inline QueryPlan::DAGNodeIndex getOpIndex() const { + return op_index_; + } + + private: + const QueryPlan::DAGNodeIndex op_index_; + + DISALLOW_COPY_AND_ASSIGN(MockWorkOrder); +}; + +class MockOperator: public RelationalOperator { + public: + enum function_name { + kFeedInputBlock = 0, + kFeedInputBlocks, + kDoneFeedingInputBlocks, + kGetAllWorkOrders + }; + + MockOperator(const bool produce_workorders, + const bool has_streaming_input, + const int max_getworkorder_iters = 1, + const int max_workorders = INT_MAX) + : RelationalOperator(0 /* Query Id */), + produce_workorders_(produce_workorders), + has_streaming_input_(has_streaming_input), + max_workorders_(max_workorders), + max_getworkorder_iters_(max_getworkorder_iters), + num_calls_get_workorders_(0), + num_workorders_generated_(0), + num_calls_feedblock_(0), + num_calls_feedblocks_(0), + num_calls_donefeedingblocks_(0) { + } + +#define MOCK_OP_LOG(x) VLOG(x) << "Op[" << op_index_ << "]: " << __func__ << ": " + + // The methods below are used to check whether QueryManager calls the Relational + // operator, how many times it calls a particular method etc. + inline int getNumWorkOrders() const { + return num_workorders_generated_; + } + + inline int getNumCalls(const function_name fname) const { + switch (fname) { + case kFeedInputBlock: + return num_calls_feedblock_; + case kFeedInputBlocks: + return num_calls_feedblocks_; + case kDoneFeedingInputBlocks: + return num_calls_donefeedingblocks_; + case kGetAllWorkOrders: + return num_calls_get_workorders_; + default: + return -1; + } + } + + inline bool getBlockingDependenciesMet() const { + MOCK_OP_LOG(3) << "met."; + return blocking_dependencies_met_; + } + + void setInsertDestinationID(const QueryContext::insert_destination_id insert_destination_index) { + insert_destination_index_ = insert_destination_index; + } + + // Mock to trigger doneFeedingInputBlocks for the dependent operators + // in QueryManager::markOperatorFinished. + void setOutputRelationID(const relation_id rel_id) { + output_relation_id_ = rel_id; + } + + // Override methods from the base class. + bool getAllWorkOrders( + WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id foreman_client_id, + tmb::MessageBus *bus) override { + ++num_calls_get_workorders_; + if (produce_workorders_) { + if (has_streaming_input_) { + if ((num_calls_feedblock_ > 0 || num_calls_feedblocks_ > 0) && (num_workorders_generated_ < max_workorders_)) { + MOCK_OP_LOG(3) << "[stream] generate WorkOrder"; + container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_); + ++num_workorders_generated_; + } + } else { + if (blocking_dependencies_met_ && (num_workorders_generated_ < max_workorders_)) { + MOCK_OP_LOG(3) << "[static] generate WorkOrder"; + container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_); + ++num_workorders_generated_; + } + } + } + MOCK_OP_LOG(3) << "count(" << num_calls_get_workorders_ << ") " + << "return(" << (num_calls_get_workorders_ == max_getworkorder_iters_) << ")"; + return num_calls_get_workorders_ == max_getworkorder_iters_; + } + + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override { + return true; + } + + void feedInputBlock(const block_id input_block_id, + const relation_id input_relation_id) override { + ++num_calls_feedblock_; + MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")"; + } + + void feedInputBlocks(const relation_id rel_id, + std::vector *partially_filled_blocks) override { + ++num_calls_feedblocks_; + MOCK_OP_LOG(3) << "count(" << num_calls_feedblocks_ << ")"; + } + + void doneFeedingInputBlocks(const relation_id rel_id) override { + ++num_calls_donefeedingblocks_; + MOCK_OP_LOG(3) << "count(" << num_calls_donefeedingblocks_ << ")"; + } + + QueryContext::insert_destination_id getInsertDestinationID() const override { + return insert_destination_index_; + } + + const relation_id getOutputRelationID() const override { + return output_relation_id_; + } + + private: + const bool produce_workorders_; + const bool has_streaming_input_; + const int max_workorders_; + const int max_getworkorder_iters_; + + int num_calls_get_workorders_; + int num_workorders_generated_; + int num_calls_feedblock_; + int num_calls_feedblocks_; + int num_calls_donefeedingblocks_; + + QueryContext::insert_destination_id insert_destination_index_ = QueryContext::kInvalidInsertDestinationId; + + relation_id output_relation_id_ = -1; + +#undef MOCK_OP_LOG + + DISALLOW_COPY_AND_ASSIGN(MockOperator); +}; + + +class QueryManagerTest : public ::testing::Test { + protected: + virtual void SetUp() { + db_.reset(new CatalogDatabase(nullptr /* catalog */, "database")); + storage_manager_.reset(new StorageManager("./")); + bus_.Initialize(); + query_handle_.reset(new QueryHandle(0)); // dummy query ID. + query_plan_ = query_handle_->getQueryPlanMutable(); + query_handle_->getQueryContextProtoMutable()->set_query_id(query_handle_->query_id()); + } + + inline void constructQueryManager() { + query_manager_.reset(new QueryManagerSingleNode( + 0, 1, query_handle_.get(), db_.get(), storage_manager_.get(), &bus_)); + } + + inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const { + return query_manager_->getQueryExecutionState().getNumQueuedWorkOrders(index); + } + + inline const int getNumOperatorsFinished() const { + return query_manager_->getQueryExecutionState().getNumOperatorsFinished(); + } + + inline bool getOperatorFinishedStatus(const QueryPlan::DAGNodeIndex index) const { + return query_manager_->getQueryExecutionState().hasExecutionFinished(index); + } + + inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) { + VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]"; + serialization::DataPipelineMessage proto; + proto.set_operator_index(source_operator_index); + + proto.set_block_id(0); // dummy block ID + proto.set_relation_id(0); // dummy relation ID. + proto.set_query_id(0); // dummy query ID. + + // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. + const std::size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(std::malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + tmb::TaggedMessage tagged_message(static_cast(proto_bytes), + proto_length, + kDataPipelineMessage); + std::free(proto_bytes); + query_manager_->processMessage(tagged_message); + return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); + } + + inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) { + VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]"; + TaggedMessage tagged_message; + serialization::NormalWorkOrderCompletionMessage proto; + proto.set_operator_index(index); + proto.set_worker_thread_index(1); // dummy worker ID. + proto.set_query_id(0); // dummy query ID. + + // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(std::malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kWorkOrderCompleteMessage); + std::free(proto_bytes); + query_manager_->processMessage(message); + + return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); + } + + inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) { + VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]"; + serialization::RebuildWorkOrderCompletionMessage proto; + proto.set_operator_index(index); + proto.set_worker_thread_index(1); // dummy worker thread ID. + proto.set_query_id(0); // dummy query ID. + + // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(std::malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kRebuildWorkOrderCompleteMessage); + + std::free(proto_bytes); + query_manager_->processMessage(message); + + return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); + } + + inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) { + VLOG(3) << "Place OutputBlock message for Op[" << index << "]"; + serialization::DataPipelineMessage proto; + proto.set_operator_index(index); + + proto.set_block_id(0); // dummy block ID + proto.set_relation_id(0); // dummy relation ID. + proto.set_query_id(0); // dummy query ID. + + // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. + const std::size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(std::malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + tmb::TaggedMessage tagged_message(static_cast(proto_bytes), + proto_length, + kDataPipelineMessage); + std::free(proto_bytes); + query_manager_->processMessage(tagged_message); + return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); + } + + unique_ptr db_; + unique_ptr storage_manager_; + + QueryPlan *query_plan_; + unique_ptr query_handle_; + unique_ptr query_manager_; + + MessageBusImpl bus_; + + client_id worker_client_id_; + + unique_ptr workers_; +}; + +TEST_F(QueryManagerTest, SingleNodeDAGNoWorkOrdersTest) { + // This test creates a DAG of a single node. No workorders are generated. + query_plan_->addRelationalOperator(new MockOperator(false, false)); + + const MockOperator &op = static_cast( + query_plan_->getQueryPlanDAG().getNodePayload(0)); + + constructQueryManager(); + + // op doesn't have any dependencies. + EXPECT_TRUE(op.getBlockingDependenciesMet()); + + // We expect one call for op's getAllWorkOrders(). + EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks)); +} + +TEST_F(QueryManagerTest, SingleNodeDAGStaticWorkOrdersTest) { + // This test creates a DAG of a single node. Static workorders are generated. + const QueryPlan::DAGNodeIndex id = + query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); + + const MockOperator &op = static_cast( + query_plan_->getQueryPlanDAG().getNodePayload(id)); + + constructQueryManager(); + + // op doesn't have any dependencies. + EXPECT_TRUE(op.getBlockingDependenciesMet()); + + // We expect one call for op's getAllWorkOrders(). + EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks)); + + // One workorder is generated. + EXPECT_EQ(1, op.getNumWorkOrders()); + + unique_ptr worker_message; + worker_message.reset(query_manager_->getNextWorkerMessage(0, -1)); + EXPECT_TRUE(worker_message != nullptr); + + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(0u, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + EXPECT_EQ(1, getNumWorkOrdersInExecution(id)); + EXPECT_EQ(0, getNumOperatorsFinished()); + + // Send a message to QueryManager upon workorder completion. + // Last event processed by QueryManager. + EXPECT_TRUE(placeWorkOrderCompleteMessage(id)); + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id)); + EXPECT_EQ(1, getNumOperatorsFinished()); + EXPECT_TRUE(getOperatorFinishedStatus(id)); +} + +TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) { + // This test creates a DAG of a single node. WorkOrders are generated + // dynamically as pending work orders complete execution, i.e., + // getAllWorkOrders() is called multiple times. getAllWorkOrders() will be + // called 5 times and 3 work orders will be returned, i.e., 1st 3 calls to + // getAllWorkOrders() insert 1 WorkOrder and return false, and the next will + // insert no WorkOrder and return true. + + // TODO(shoban): This test can not be more robust than this because of fixed + // scaffolding of mocking. If we use gMock, we can do much better. + const QueryPlan::DAGNodeIndex id = + query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3)); + + const MockOperator &op = static_cast( + query_plan_->getQueryPlanDAG().getNodePayload(id)); + + constructQueryManager(); + + // op doesn't have any dependencies. + EXPECT_TRUE(op.getBlockingDependenciesMet()); + + for (int i = 0; i < 3; ++i) { + // We expect one call for op's getAllWorkOrders(). + EXPECT_EQ(i + 1, op.getNumCalls(MockOperator::kGetAllWorkOrders)); + + // One workorder is generated. + // EXPECT_EQ(1, getWorkerInputQueueSize()); + EXPECT_EQ(i + 1, op.getNumWorkOrders()); + + unique_ptr worker_message; + worker_message.reset(query_manager_->getNextWorkerMessage(id, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(id, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + EXPECT_EQ(1, getNumWorkOrdersInExecution(id)); + EXPECT_EQ(0, getNumOperatorsFinished()); + + if (i < 2) { + // Send a message to QueryManager upon workorder completion. + EXPECT_FALSE(placeWorkOrderCompleteMessage(id)); + } else { + // Send a message to QueryManager upon workorder completion. + // Last event. + EXPECT_TRUE(placeWorkOrderCompleteMessage(id)); + } + } + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id)); + + EXPECT_EQ(1, getNumOperatorsFinished()); + EXPECT_TRUE(getOperatorFinishedStatus(id)); + + // We place this check in the end, since it's true throughout the test. + EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks)); +} + +TEST_F(QueryManagerTest, TwoNodesDAGBlockingLinkTest) { + // We use two nodes in the DAG with a blocking link between them. + // There is no streaming of data involved in this test. + const QueryPlan::DAGNodeIndex id1 = + query_plan_->addRelationalOperator(new MockOperator(true, false)); + const QueryPlan::DAGNodeIndex id2 = + query_plan_->addRelationalOperator(new MockOperator(true, false)); + + // Create a blocking link. + query_plan_->addDirectDependency(id2, id1, true); + + static_cast( + query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1)) + ->setOutputRelationID(0xdead); + + const MockOperator &op1 = static_cast( + query_plan_->getQueryPlanDAG().getNodePayload(id1)); + const MockOperator &op2 = static_cast( + query_plan_->getQueryPlanDAG().getNodePayload(id2)); + + constructQueryManager(); + + // op1 doesn't have any dependencies + EXPECT_TRUE(op1.getBlockingDependenciesMet()); + + // Only op1 should receive a call to getAllWorkOrders initially. + EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks)); + + EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks)); + + // Only op1 should produce a workorder. + EXPECT_EQ(1, op1.getNumWorkOrders()); + EXPECT_EQ(0, op2.getNumWorkOrders()); + + // Foreman hasn't yet got workorder completion response for the workorder. + unique_ptr worker_message; + worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(id1, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + EXPECT_EQ(1, getNumWorkOrdersInExecution(id1)); + EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); + EXPECT_EQ(0, getNumOperatorsFinished()); + + // Send a message to Foreman upon workorder (generated by op1) completion. + EXPECT_FALSE(placeWorkOrderCompleteMessage(id1)); + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); + // op1 is over now, op2 still to go. + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); + EXPECT_EQ(1, getNumOperatorsFinished()); + + EXPECT_TRUE(getOperatorFinishedStatus(id1)); + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); + EXPECT_FALSE(getOperatorFinishedStatus(id2)); + + worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(id2, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + EXPECT_EQ(1, getNumWorkOrdersInExecution(id2)); + + // op1 is op2's blocking dependency. + EXPECT_TRUE(op2.getBlockingDependenciesMet()); + + EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); + // op2 should get first call of getAllWorkOrders() when op1 is over. + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + + EXPECT_EQ(1, op2.getNumWorkOrders()); + + // Send a message to QueryManager upon workorder (generated by op2) completion. + // Note that the worker hasn't yet popped the workorder. Usually this won't + // happen as workers pop workorders first, execute and then send the response. + EXPECT_TRUE(placeWorkOrderCompleteMessage(id2)); + + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); + EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); + + EXPECT_EQ(2, getNumOperatorsFinished()); + EXPECT_TRUE(getOperatorFinishedStatus(id1)); + EXPECT_TRUE(getOperatorFinishedStatus(id2)); + + // Expect no additional calls to getAllWorkOrders. + EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); +} + +TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) { + // We use two nodes in the DAG with a non-blocking link between them. + // We stream output of op1 to op2. Sequeuce of events is as follows: + // 1. op1 creates a workorder. + // 2. We send a "block full" (from op1) to QueryManager. + // 3. op2 creates a workorder because of step 2. + const QueryPlan::DAGNodeIndex id1 = + query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); + const QueryPlan::DAGNodeIndex id2 = + query_plan_->addRelationalOperator(new MockOperator(true, true, 3)); + + // Create a non-blocking link. + query_plan_->addDirectDependency(id2, id1, false); + + static_cast( + query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1)) + ->setOutputRelationID(0xdead); + + const MockOperator &op1 = static_cast( + query_plan_->getQueryPlanDAG().getNodePayload(id1)); + const MockOperator &op2 = static_cast( + query_plan_->getQueryPlanDAG().getNodePayload(id2)); + + constructQueryManager(); + + // As none of the operators have a blocking link, blocking dependencies should + // be met. + EXPECT_TRUE(op1.getBlockingDependenciesMet()); + EXPECT_TRUE(op2.getBlockingDependenciesMet()); + + EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(1, op1.getNumWorkOrders()); + EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks)); + + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + // op2 will generate workorder only after receiving a streaming input. + EXPECT_EQ(0, op2.getNumWorkOrders()); + EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks)); + + unique_ptr worker_message; + worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(id1, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + // Send a message to QueryManager upon block getting full (output of op1). + EXPECT_FALSE(placeOutputBlockMessage(id1)); + + // op1 is not finished yet because the response of workorder completion hasn't + // been received yet by the QueryManager. + EXPECT_FALSE(getOperatorFinishedStatus(id1)); + EXPECT_FALSE(getOperatorFinishedStatus(id2)); + + // No additional call to op1's getAllWorkOrders. + EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks)); + + // Output from op1 should be fed to op2. + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks)); + + // A call to op2's getAllWorkOrders because of the streamed input. + EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(1, op2.getNumWorkOrders()); + + // Place a message of a workorder completion of op1 on Foreman's input queue. + EXPECT_FALSE(placeWorkOrderCompleteMessage(id1)); + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); + EXPECT_TRUE(getOperatorFinishedStatus(id1)); + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); + + // An additional call to op2's getAllWorkOrders because of completion of op1. + EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(2, op2.getNumWorkOrders()); + + worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(id2, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + // Place a message of a workorder completion of op2 on Foreman's input queue. + EXPECT_FALSE(placeWorkOrderCompleteMessage(id2)); + + EXPECT_TRUE(getOperatorFinishedStatus(id1)); + + worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(id2, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + EXPECT_EQ(1, getNumWorkOrdersInExecution(id2)); + EXPECT_FALSE(getOperatorFinishedStatus(id2)); + + // Send a message to Foreman upon workorder (generated by op2) completion. + EXPECT_TRUE(placeWorkOrderCompleteMessage(id2)); + + EXPECT_TRUE(getOperatorFinishedStatus(id1)); + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); + EXPECT_TRUE(getOperatorFinishedStatus(id2)); +} + +TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) { + // In this test, we create a 2-node DAG with a non-blocking link between them. + // There is no streaming of data from op1 to op2 during the execution of op1. + // op1 produces a partially filled block at the end of its execution which is + // rebuilt and then fed to op2. + const QueryPlan::DAGNodeIndex id1 = + query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); + const QueryPlan::DAGNodeIndex id2 = + query_plan_->addRelationalOperator(new MockOperator(true, true, 3, 1)); + + // Create a non-blocking link. + query_plan_->addDirectDependency(id2, id1, false); + + // Create a relation, owned by db_.*/ + CatalogRelation *relation = + new CatalogRelation(nullptr /* catalog_database */, "test_relation"); + const relation_id output_relation_id = db_->addRelation(relation); + + // Setup the InsertDestination proto in the query context proto. + serialization::QueryContext *query_context_proto = + query_handle_->getQueryContextProtoMutable(); + + const QueryContext::insert_destination_id insert_destination_index = + query_context_proto->insert_destinations_size(); + serialization::InsertDestination *insert_destination_proto = + query_context_proto->add_insert_destinations(); + + insert_destination_proto->set_insert_destination_type( + serialization::InsertDestinationType::BLOCK_POOL); + insert_destination_proto->set_relation_id(output_relation_id); + insert_destination_proto->set_relational_op_index(id1); + + MockOperator *op1_mutable = static_cast( + query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1)); + op1_mutable->setInsertDestinationID(insert_destination_index); + op1_mutable->setOutputRelationID(output_relation_id); + + const MockOperator &op1 = static_cast( + query_plan_->getQueryPlanDAG().getNodePayload(id1)); + const MockOperator &op2 = static_cast( + query_plan_->getQueryPlanDAG().getNodePayload(id2)); + + constructQueryManager(); + + // NOTE(zuyu): An operator generally has no ideas about partially filled + // blocks, but InsertDestination in QueryContext does. + // Mock to add partially filled blocks in the InsertDestination. + InsertDestination *insert_destination = + query_manager_->getQueryContextMutable()->getInsertDestination( + insert_destination_index); + DCHECK(insert_destination != nullptr); + MutableBlockReference block_ref; + static_cast(insert_destination) + ->available_block_refs_.push_back(move(block_ref)); + + // There's no blocking dependency in the DAG. + EXPECT_TRUE(op1.getBlockingDependenciesMet()); + EXPECT_TRUE(op2.getBlockingDependenciesMet()); + + EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(1, op1.getNumWorkOrders()); + + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(0, op2.getNumWorkOrders()); + + unique_ptr worker_message; + worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(id1, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + // Send a message to QueryManager upon workorder (generated by op1) completion. + EXPECT_FALSE(placeWorkOrderCompleteMessage(id1)); + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); + + worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1)); + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kRebuildWorkOrder, + worker_message->getType()); + + EXPECT_EQ(id1, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + // op1 generates a rebuild workorder. The block is rebuilt and streamed + // to Foreman. + EXPECT_FALSE(placeDataPipelineMessage(id1)); + + EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1)); + // Based on the streamed input, op2's getAllWorkOrders should produce a + // workorder. + EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(1, op2.getNumWorkOrders()); + + worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + + EXPECT_EQ(id2, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + EXPECT_TRUE(getOperatorFinishedStatus(id1)); + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); + EXPECT_FALSE(getOperatorFinishedStatus(id2)); + EXPECT_EQ(1, getNumWorkOrdersInExecution(id2)); + + // Send a message to QueryManager upon workorder (generated by op2) completion. + EXPECT_TRUE(placeWorkOrderCompleteMessage(id2)); + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); + + EXPECT_TRUE(getOperatorFinishedStatus(id2)); +} + +TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) { + // When an operator produces workorders but no output, the QueryManager should + // check the dependents of this operator to make progress. + const QueryPlan::DAGNodeIndex kNumNodes = 5; + std::vector ids; + ids.reserve(kNumNodes); + + for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { + if (i == 0) { + ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, false)); + } else { + ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, true)); + } + VLOG(3) << ids[i]; + } + + /** + * The DAG looks like this: + * + * op1 -> op2 -> op3 -> op4 -> op5 + * + **/ + for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) { + query_plan_->addDirectDependency(ids[i + 1], ids[i], false); + static_cast(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(ids[i])) + ->setOutputRelationID(0xdead); + } + + std::vector operators; + for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { + operators.push_back(static_cast(&query_plan_->getQueryPlanDAG().getNodePayload(ids[i]))); + } + + constructQueryManager(); + + // operators[0] should have produced a workorder by now. + EXPECT_EQ(1, operators[0]->getNumWorkOrders()); + + unique_ptr worker_message; + worker_message.reset(query_manager_->getNextWorkerMessage(ids[0], -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + + EXPECT_EQ(ids[0], worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + EXPECT_EQ(1, getNumWorkOrdersInExecution(ids[0])); + EXPECT_FALSE(getOperatorFinishedStatus(ids[0])); + + for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { + EXPECT_EQ(1, operators[ids[i]]->getNumCalls(MockOperator::kGetAllWorkOrders)); + } + + // Send a message to QueryManager upon workorder (generated by operators[0]) + // completion. + EXPECT_TRUE(placeWorkOrderCompleteMessage(ids[0])); + + for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { + EXPECT_EQ(0, getNumWorkOrdersInExecution(ids[i])); + EXPECT_TRUE(getOperatorFinishedStatus(ids[i])); + if (i < kNumNodes - 1) { + EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks)); + } + } +} + +TEST_F(QueryManagerTest, OutOfOrderWorkOrderCompletionTest) { + // Consider two operators, both generate one workorder each. The dependent's + // workorder finishes before dependency's workorder. + const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); + const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1)); + + // Create a non-blocking link. + query_plan_->addDirectDependency(id2, id1, false); + + constructQueryManager(); + + unique_ptr worker_message; + worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + + EXPECT_EQ(id1, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + // Send a message to QueryManager upon a block (output of op1) getting full. + EXPECT_FALSE(placeOutputBlockMessage(id1)); + + // op1 is not finished yet because the response of workorder completion hasn't + // been received yet. + EXPECT_FALSE(getOperatorFinishedStatus(id1)); + EXPECT_FALSE(getOperatorFinishedStatus(id2)); + + worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + + EXPECT_EQ(id2, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + // As mentioned earlier, op2 finishes before op1. + EXPECT_FALSE(placeWorkOrderCompleteMessage(id2)); + + // op1's workorder execution is over. + EXPECT_TRUE(placeWorkOrderCompleteMessage(id1)); + + EXPECT_TRUE(getOperatorFinishedStatus(id1)); + EXPECT_TRUE(getOperatorFinishedStatus(id2)); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/tests/QueryManager_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp deleted file mode 100644 index 37e2cdd..0000000 --- a/query_execution/tests/QueryManager_unittest.cpp +++ /dev/null @@ -1,940 +0,0 @@ -/** - * Copyright 2011-2015 Quickstep Technologies LLC. - * Copyright 2015-2016 Pivotal Software, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -#include -#include -#include -#include - -#include "catalog/CatalogDatabase.hpp" -#include "catalog/CatalogRelation.hpp" -#include "catalog/CatalogTypedefs.hpp" -#include "query_execution/QueryContext.hpp" -#include "query_execution/QueryContext.pb.h" -#include "query_execution/QueryExecutionMessages.pb.h" -#include "query_execution/QueryExecutionState.hpp" -#include "query_execution/QueryExecutionTypedefs.hpp" -#include "query_execution/QueryManager.hpp" -#include "query_execution/WorkOrdersContainer.hpp" -#include "query_execution/WorkerDirectory.hpp" -#include "query_execution/WorkerMessage.hpp" -#include "query_optimizer/QueryHandle.hpp" -#include "query_optimizer/QueryPlan.hpp" -#include "relational_operators/RelationalOperator.hpp" -#include "relational_operators/WorkOrder.hpp" -#include "storage/InsertDestination.hpp" -#include "storage/InsertDestination.pb.h" -#include "storage/StorageBlock.hpp" -#include "storage/StorageBlockInfo.hpp" -#include "storage/StorageManager.hpp" -#include "utility/DAG.hpp" -#include "utility/Macros.hpp" - -#include "glog/logging.h" -#include "gtest/gtest.h" - -#include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" -#include "tmb/tagged_message.h" - -using std::move; -using std::unique_ptr; -using std::vector; - -using tmb::client_id; - -namespace quickstep { - -class WorkOrderProtosContainer; - -class MockWorkOrder : public WorkOrder { - public: - explicit MockWorkOrder(const int op_index) - : WorkOrder(0), op_index_(op_index) {} - - void execute() override { - VLOG(3) << "WorkOrder[" << op_index_ << "] executing."; - } - - inline QueryPlan::DAGNodeIndex getOpIndex() const { - return op_index_; - } - - private: - const QueryPlan::DAGNodeIndex op_index_; - - DISALLOW_COPY_AND_ASSIGN(MockWorkOrder); -}; - -class MockOperator: public RelationalOperator { - public: - enum function_name { - kFeedInputBlock = 0, - kFeedInputBlocks, - kDoneFeedingInputBlocks, - kGetAllWorkOrders - }; - - MockOperator(const bool produce_workorders, - const bool has_streaming_input, - const int max_getworkorder_iters = 1, - const int max_workorders = INT_MAX) - : RelationalOperator(0 /* Query Id */), - produce_workorders_(produce_workorders), - has_streaming_input_(has_streaming_input), - max_workorders_(max_workorders), - max_getworkorder_iters_(max_getworkorder_iters), - num_calls_get_workorders_(0), - num_workorders_generated_(0), - num_calls_feedblock_(0), - num_calls_feedblocks_(0), - num_calls_donefeedingblocks_(0) { - } - -#define MOCK_OP_LOG(x) VLOG(x) << "Op[" << op_index_ << "]: " << __func__ << ": " - - // The methods below are used to check whether QueryManager calls the Relational - // operator, how many times it calls a particular method etc. - inline int getNumWorkOrders() const { - return num_workorders_generated_; - } - - inline int getNumCalls(const function_name fname) const { - switch (fname) { - case kFeedInputBlock: - return num_calls_feedblock_; - case kFeedInputBlocks: - return num_calls_feedblocks_; - case kDoneFeedingInputBlocks: - return num_calls_donefeedingblocks_; - case kGetAllWorkOrders: - return num_calls_get_workorders_; - default: - return -1; - } - } - - inline bool getBlockingDependenciesMet() const { - MOCK_OP_LOG(3) << "met."; - return blocking_dependencies_met_; - } - - void setInsertDestinationID(const QueryContext::insert_destination_id insert_destination_index) { - insert_destination_index_ = insert_destination_index; - } - - // Mock to trigger doneFeedingInputBlocks for the dependent operators - // in QueryManager::markOperatorFinished. - void setOutputRelationID(const relation_id rel_id) { - output_relation_id_ = rel_id; - } - - // Override methods from the base class. - bool getAllWorkOrders( - WorkOrdersContainer *container, - QueryContext *query_context, - StorageManager *storage_manager, - const tmb::client_id foreman_client_id, - tmb::MessageBus *bus) override { - ++num_calls_get_workorders_; - if (produce_workorders_) { - if (has_streaming_input_) { - if ((num_calls_feedblock_ > 0 || num_calls_feedblocks_ > 0) && (num_workorders_generated_ < max_workorders_)) { - MOCK_OP_LOG(3) << "[stream] generate WorkOrder"; - container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_); - ++num_workorders_generated_; - } - } else { - if (blocking_dependencies_met_ && (num_workorders_generated_ < max_workorders_)) { - MOCK_OP_LOG(3) << "[static] generate WorkOrder"; - container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_); - ++num_workorders_generated_; - } - } - } - MOCK_OP_LOG(3) << "count(" << num_calls_get_workorders_ << ") " - << "return(" << (num_calls_get_workorders_ == max_getworkorder_iters_) << ")"; - return num_calls_get_workorders_ == max_getworkorder_iters_; - } - - bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override { - return true; - } - - void feedInputBlock(const block_id input_block_id, - const relation_id input_relation_id) override { - ++num_calls_feedblock_; - MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")"; - } - - void feedInputBlocks(const relation_id rel_id, - std::vector *partially_filled_blocks) override { - ++num_calls_feedblocks_; - MOCK_OP_LOG(3) << "count(" << num_calls_feedblocks_ << ")"; - } - - void doneFeedingInputBlocks(const relation_id rel_id) override { - ++num_calls_donefeedingblocks_; - MOCK_OP_LOG(3) << "count(" << num_calls_donefeedingblocks_ << ")"; - } - - QueryContext::insert_destination_id getInsertDestinationID() const override { - return insert_destination_index_; - } - - const relation_id getOutputRelationID() const override { - return output_relation_id_; - } - - private: - const bool produce_workorders_; - const bool has_streaming_input_; - const int max_workorders_; - const int max_getworkorder_iters_; - - int num_calls_get_workorders_; - int num_workorders_generated_; - int num_calls_feedblock_; - int num_calls_feedblocks_; - int num_calls_donefeedingblocks_; - - QueryContext::insert_destination_id insert_destination_index_ = QueryContext::kInvalidInsertDestinationId; - - relation_id output_relation_id_ = -1; - -#undef MOCK_OP_LOG - - DISALLOW_COPY_AND_ASSIGN(MockOperator); -}; - - -class QueryManagerTest : public ::testing::Test { - protected: - virtual void SetUp() { - db_.reset(new CatalogDatabase(nullptr /* catalog */, "database")); - storage_manager_.reset(new StorageManager("./")); - bus_.Initialize(); - query_handle_.reset(new QueryHandle(0)); // dummy query ID. - query_plan_ = query_handle_->getQueryPlanMutable(); - query_handle_->getQueryContextProtoMutable()->set_query_id(query_handle_->query_id()); - } - - inline void constructQueryManager() { - query_manager_.reset(new QueryManager( - 0, 1, query_handle_.get(), db_.get(), storage_manager_.get(), &bus_)); - } - - inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const { - return query_manager_->getQueryExecutionState().getNumQueuedWorkOrders(index); - } - - inline const int getNumOperatorsFinished() const { - return query_manager_->getQueryExecutionState().getNumOperatorsFinished(); - } - - inline bool getOperatorFinishedStatus(const QueryPlan::DAGNodeIndex index) const { - return query_manager_->getQueryExecutionState().hasExecutionFinished(index); - } - - inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) { - VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]"; - serialization::DataPipelineMessage proto; - proto.set_operator_index(source_operator_index); - - proto.set_block_id(0); // dummy block ID - proto.set_relation_id(0); // dummy relation ID. - proto.set_query_id(0); // dummy query ID. - - // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. - const std::size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast(std::malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - tmb::TaggedMessage tagged_message(static_cast(proto_bytes), - proto_length, - kDataPipelineMessage); - std::free(proto_bytes); - query_manager_->processMessage(tagged_message); - return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); - } - - inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) { - VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]"; - TaggedMessage tagged_message; - serialization::NormalWorkOrderCompletionMessage proto; - proto.set_operator_index(index); - proto.set_worker_thread_index(1); // dummy worker ID. - proto.set_query_id(0); // dummy query ID. - - // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast(std::malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast(proto_bytes), - proto_length, - kWorkOrderCompleteMessage); - std::free(proto_bytes); - query_manager_->processMessage(message); - - return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); - } - - inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) { - VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]"; - serialization::RebuildWorkOrderCompletionMessage proto; - proto.set_operator_index(index); - proto.set_worker_thread_index(1); // dummy worker thread ID. - proto.set_query_id(0); // dummy query ID. - - // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast(std::malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast(proto_bytes), - proto_length, - kRebuildWorkOrderCompleteMessage); - - std::free(proto_bytes); - query_manager_->processMessage(message); - - return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); - } - - inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) { - VLOG(3) << "Place OutputBlock message for Op[" << index << "]"; - serialization::DataPipelineMessage proto; - proto.set_operator_index(index); - - proto.set_block_id(0); // dummy block ID - proto.set_relation_id(0); // dummy relation ID. - proto.set_query_id(0); // dummy query ID. - - // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. - const std::size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast(std::malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - tmb::TaggedMessage tagged_message(static_cast(proto_bytes), - proto_length, - kDataPipelineMessage); - std::free(proto_bytes); - query_manager_->processMessage(tagged_message); - return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); - } - - unique_ptr db_; - unique_ptr storage_manager_; - - QueryPlan *query_plan_; - unique_ptr query_handle_; - unique_ptr query_manager_; - - MessageBusImpl bus_; - - client_id worker_client_id_; - - unique_ptr workers_; -}; - -TEST_F(QueryManagerTest, SingleNodeDAGNoWorkOrdersTest) { - // This test creates a DAG of a single node. No workorders are generated. - query_plan_->addRelationalOperator(new MockOperator(false, false)); - - const MockOperator &op = static_cast( - query_plan_->getQueryPlanDAG().getNodePayload(0)); - - constructQueryManager(); - - // op doesn't have any dependencies. - EXPECT_TRUE(op.getBlockingDependenciesMet()); - - // We expect one call for op's getAllWorkOrders(). - EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks)); -} - -TEST_F(QueryManagerTest, SingleNodeDAGStaticWorkOrdersTest) { - // This test creates a DAG of a single node. Static workorders are generated. - const QueryPlan::DAGNodeIndex id = - query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); - - const MockOperator &op = static_cast( - query_plan_->getQueryPlanDAG().getNodePayload(id)); - - constructQueryManager(); - - // op doesn't have any dependencies. - EXPECT_TRUE(op.getBlockingDependenciesMet()); - - // We expect one call for op's getAllWorkOrders(). - EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks)); - - // One workorder is generated. - EXPECT_EQ(1, op.getNumWorkOrders()); - - unique_ptr worker_message; - worker_message.reset(query_manager_->getNextWorkerMessage(0, -1)); - EXPECT_TRUE(worker_message != nullptr); - - EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, - worker_message->getType()); - EXPECT_EQ(0u, worker_message->getRelationalOpIndex()); - - delete worker_message->getWorkOrder(); - - EXPECT_EQ(1, getNumWorkOrdersInExecution(id)); - EXPECT_EQ(0, getNumOperatorsFinished()); - - // Send a message to QueryManager upon workorder completion. - // Last event processed by QueryManager. - EXPECT_TRUE(placeWorkOrderCompleteMessage(id)); - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id)); - EXPECT_EQ(1, getNumOperatorsFinished()); - EXPECT_TRUE(getOperatorFinishedStatus(id)); -} - -TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) { - // This test creates a DAG of a single node. WorkOrders are generated - // dynamically as pending work orders complete execution, i.e., - // getAllWorkOrders() is called multiple times. getAllWorkOrders() will be - // called 5 times and 3 work orders will be returned, i.e., 1st 3 calls to - // getAllWorkOrders() insert 1 WorkOrder and return false, and the next will - // insert no WorkOrder and return true. - - // TODO(shoban): This test can not be more robust than this because of fixed - // scaffolding of mocking. If we use gMock, we can do much better. - const QueryPlan::DAGNodeIndex id = - query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3)); - - const MockOperator &op = static_cast( - query_plan_->getQueryPlanDAG().getNodePayload(id)); - - constructQueryManager(); - - // op doesn't have any dependencies. - EXPECT_TRUE(op.getBlockingDependenciesMet()); - - for (int i = 0; i < 3; ++i) { - // We expect one call for op's getAllWorkOrders(). - EXPECT_EQ(i + 1, op.getNumCalls(MockOperator::kGetAllWorkOrders)); - - // One workorder is generated. - // EXPECT_EQ(1, getWorkerInputQueueSize()); - EXPECT_EQ(i + 1, op.getNumWorkOrders()); - - unique_ptr worker_message; - worker_message.reset(query_manager_->getNextWorkerMessage(id, -1)); - - EXPECT_TRUE(worker_message != nullptr); - EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, - worker_message->getType()); - EXPECT_EQ(id, worker_message->getRelationalOpIndex()); - - delete worker_message->getWorkOrder(); - - EXPECT_EQ(1, getNumWorkOrdersInExecution(id)); - EXPECT_EQ(0, getNumOperatorsFinished()); - - if (i < 2) { - // Send a message to QueryManager upon workorder completion. - EXPECT_FALSE(placeWorkOrderCompleteMessage(id)); - } else { - // Send a message to QueryManager upon workorder completion. - // Last event. - EXPECT_TRUE(placeWorkOrderCompleteMessage(id)); - } - } - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id)); - - EXPECT_EQ(1, getNumOperatorsFinished()); - EXPECT_TRUE(getOperatorFinishedStatus(id)); - - // We place this check in the end, since it's true throughout the test. - EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks)); -} - -TEST_F(QueryManagerTest, TwoNodesDAGBlockingLinkTest) { - // We use two nodes in the DAG with a blocking link between them. - // There is no streaming of data involved in this test. - const QueryPlan::DAGNodeIndex id1 = - query_plan_->addRelationalOperator(new MockOperator(true, false)); - const QueryPlan::DAGNodeIndex id2 = - query_plan_->addRelationalOperator(new MockOperator(true, false)); - - // Create a blocking link. - query_plan_->addDirectDependency(id2, id1, true); - - static_cast( - query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1)) - ->setOutputRelationID(0xdead); - - const MockOperator &op1 = static_cast( - query_plan_->getQueryPlanDAG().getNodePayload(id1)); - const MockOperator &op2 = static_cast( - query_plan_->getQueryPlanDAG().getNodePayload(id2)); - - constructQueryManager(); - - // op1 doesn't have any dependencies - EXPECT_TRUE(op1.getBlockingDependenciesMet()); - - // Only op1 should receive a call to getAllWorkOrders initially. - EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks)); - - EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks)); - - // Only op1 should produce a workorder. - EXPECT_EQ(1, op1.getNumWorkOrders()); - EXPECT_EQ(0, op2.getNumWorkOrders()); - - // Foreman hasn't yet got workorder completion response for the workorder. - unique_ptr worker_message; - worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1)); - - EXPECT_TRUE(worker_message != nullptr); - EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, - worker_message->getType()); - EXPECT_EQ(id1, worker_message->getRelationalOpIndex()); - - delete worker_message->getWorkOrder(); - - EXPECT_EQ(1, getNumWorkOrdersInExecution(id1)); - EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); - EXPECT_EQ(0, getNumOperatorsFinished()); - - // Send a message to Foreman upon workorder (generated by op1) completion. - EXPECT_FALSE(placeWorkOrderCompleteMessage(id1)); - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); - // op1 is over now, op2 still to go. - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); - EXPECT_EQ(1, getNumOperatorsFinished()); - - EXPECT_TRUE(getOperatorFinishedStatus(id1)); - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); - EXPECT_FALSE(getOperatorFinishedStatus(id2)); - - worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); - EXPECT_TRUE(worker_message != nullptr); - EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, - worker_message->getType()); - EXPECT_EQ(id2, worker_message->getRelationalOpIndex()); - - delete worker_message->getWorkOrder(); - - EXPECT_EQ(1, getNumWorkOrdersInExecution(id2)); - - // op1 is op2's blocking dependency. - EXPECT_TRUE(op2.getBlockingDependenciesMet()); - - EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); - // op2 should get first call of getAllWorkOrders() when op1 is over. - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - - EXPECT_EQ(1, op2.getNumWorkOrders()); - - // Send a message to QueryManager upon workorder (generated by op2) completion. - // Note that the worker hasn't yet popped the workorder. Usually this won't - // happen as workers pop workorders first, execute and then send the response. - EXPECT_TRUE(placeWorkOrderCompleteMessage(id2)); - - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); - EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); - - EXPECT_EQ(2, getNumOperatorsFinished()); - EXPECT_TRUE(getOperatorFinishedStatus(id1)); - EXPECT_TRUE(getOperatorFinishedStatus(id2)); - - // Expect no additional calls to getAllWorkOrders. - EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); -} - -TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) { - // We use two nodes in the DAG with a non-blocking link between them. - // We stream output of op1 to op2. Sequeuce of events is as follows: - // 1. op1 creates a workorder. - // 2. We send a "block full" (from op1) to QueryManager. - // 3. op2 creates a workorder because of step 2. - const QueryPlan::DAGNodeIndex id1 = - query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); - const QueryPlan::DAGNodeIndex id2 = - query_plan_->addRelationalOperator(new MockOperator(true, true, 3)); - - // Create a non-blocking link. - query_plan_->addDirectDependency(id2, id1, false); - - static_cast( - query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1)) - ->setOutputRelationID(0xdead); - - const MockOperator &op1 = static_cast( - query_plan_->getQueryPlanDAG().getNodePayload(id1)); - const MockOperator &op2 = static_cast( - query_plan_->getQueryPlanDAG().getNodePayload(id2)); - - constructQueryManager(); - - // As none of the operators have a blocking link, blocking dependencies should - // be met. - EXPECT_TRUE(op1.getBlockingDependenciesMet()); - EXPECT_TRUE(op2.getBlockingDependenciesMet()); - - EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(1, op1.getNumWorkOrders()); - EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks)); - - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - // op2 will generate workorder only after receiving a streaming input. - EXPECT_EQ(0, op2.getNumWorkOrders()); - EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks)); - - unique_ptr worker_message; - worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1)); - - EXPECT_TRUE(worker_message != nullptr); - EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, - worker_message->getType()); - EXPECT_EQ(id1, worker_message->getRelationalOpIndex()); - - delete worker_message->getWorkOrder(); - - // Send a message to QueryManager upon block getting full (output of op1). - EXPECT_FALSE(placeOutputBlockMessage(id1)); - - // op1 is not finished yet because the response of workorder completion hasn't - // been received yet by the QueryManager. - EXPECT_FALSE(getOperatorFinishedStatus(id1)); - EXPECT_FALSE(getOperatorFinishedStatus(id2)); - - // No additional call to op1's getAllWorkOrders. - EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks)); - - // Output from op1 should be fed to op2. - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks)); - - // A call to op2's getAllWorkOrders because of the streamed input. - EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(1, op2.getNumWorkOrders()); - - // Place a message of a workorder completion of op1 on Foreman's input queue. - EXPECT_FALSE(placeWorkOrderCompleteMessage(id1)); - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); - EXPECT_TRUE(getOperatorFinishedStatus(id1)); - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); - - // An additional call to op2's getAllWorkOrders because of completion of op1. - EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(2, op2.getNumWorkOrders()); - - worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); - - EXPECT_TRUE(worker_message != nullptr); - EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, - worker_message->getType()); - EXPECT_EQ(id2, worker_message->getRelationalOpIndex()); - - delete worker_message->getWorkOrder(); - - // Place a message of a workorder completion of op2 on Foreman's input queue. - EXPECT_FALSE(placeWorkOrderCompleteMessage(id2)); - - EXPECT_TRUE(getOperatorFinishedStatus(id1)); - - worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); - - EXPECT_TRUE(worker_message != nullptr); - EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, - worker_message->getType()); - EXPECT_EQ(id2, worker_message->getRelationalOpIndex()); - - delete worker_message->getWorkOrder(); - - EXPECT_EQ(1, getNumWorkOrdersInExecution(id2)); - EXPECT_FALSE(getOperatorFinishedStatus(id2)); - - // Send a message to Foreman upon workorder (generated by op2) completion. - EXPECT_TRUE(placeWorkOrderCompleteMessage(id2)); - - EXPECT_TRUE(getOperatorFinishedStatus(id1)); - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); - EXPECT_TRUE(getOperatorFinishedStatus(id2)); -} - -TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) { - // In this test, we create a 2-node DAG with a non-blocking link between them. - // There is no streaming of data from op1 to op2 during the execution of op1. - // op1 produces a partially filled block at the end of its execution which is - // rebuilt and then fed to op2. - const QueryPlan::DAGNodeIndex id1 = - query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); - const QueryPlan::DAGNodeIndex id2 = - query_plan_->addRelationalOperator(new MockOperator(true, true, 3, 1)); - - // Create a non-blocking link. - query_plan_->addDirectDependency(id2, id1, false); - - // Create a relation, owned by db_.*/ - CatalogRelation *relation = - new CatalogRelation(nullptr /* catalog_database */, "test_relation"); - const relation_id output_relation_id = db_->addRelation(relation); - - // Setup the InsertDestination proto in the query context proto. - serialization::QueryContext *query_context_proto = - query_handle_->getQueryContextProtoMutable(); - - const QueryContext::insert_destination_id insert_destination_index = - query_context_proto->insert_destinations_size(); - serialization::InsertDestination *insert_destination_proto = - query_context_proto->add_insert_destinations(); - - insert_destination_proto->set_insert_destination_type( - serialization::InsertDestinationType::BLOCK_POOL); - insert_destination_proto->set_relation_id(output_relation_id); - insert_destination_proto->set_relational_op_index(id1); - - MockOperator *op1_mutable = static_cast( - query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1)); - op1_mutable->setInsertDestinationID(insert_destination_index); - op1_mutable->setOutputRelationID(output_relation_id); - - const MockOperator &op1 = static_cast( - query_plan_->getQueryPlanDAG().getNodePayload(id1)); - const MockOperator &op2 = static_cast( - query_plan_->getQueryPlanDAG().getNodePayload(id2)); - - constructQueryManager(); - - // NOTE(zuyu): An operator generally has no ideas about partially filled - // blocks, but InsertDestination in QueryContext does. - // Mock to add partially filled blocks in the InsertDestination. - InsertDestination *insert_destination = - query_manager_->getQueryContextMutable()->getInsertDestination( - insert_destination_index); - DCHECK(insert_destination != nullptr); - MutableBlockReference block_ref; - static_cast(insert_destination) - ->available_block_refs_.push_back(move(block_ref)); - - // There's no blocking dependency in the DAG. - EXPECT_TRUE(op1.getBlockingDependenciesMet()); - EXPECT_TRUE(op2.getBlockingDependenciesMet()); - - EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(1, op1.getNumWorkOrders()); - - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(0, op2.getNumWorkOrders()); - - unique_ptr worker_message; - worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1)); - - EXPECT_TRUE(worker_message != nullptr); - EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, - worker_message->getType()); - EXPECT_EQ(id1, worker_message->getRelationalOpIndex()); - - delete worker_message->getWorkOrder(); - - // Send a message to QueryManager upon workorder (generated by op1) completion. - EXPECT_FALSE(placeWorkOrderCompleteMessage(id1)); - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); - - worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1)); - EXPECT_TRUE(worker_message != nullptr); - EXPECT_EQ(WorkerMessage::WorkerMessageType::kRebuildWorkOrder, - worker_message->getType()); - - EXPECT_EQ(id1, worker_message->getRelationalOpIndex()); - - delete worker_message->getWorkOrder(); - - // op1 generates a rebuild workorder. The block is rebuilt and streamed - // to Foreman. - EXPECT_FALSE(placeDataPipelineMessage(id1)); - - EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1)); - // Based on the streamed input, op2's getAllWorkOrders should produce a - // workorder. - EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(1, op2.getNumWorkOrders()); - - worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); - - EXPECT_TRUE(worker_message != nullptr); - EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, - worker_message->getType()); - - EXPECT_EQ(id2, worker_message->getRelationalOpIndex()); - - delete worker_message->getWorkOrder(); - - EXPECT_TRUE(getOperatorFinishedStatus(id1)); - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); - EXPECT_FALSE(getOperatorFinishedStatus(id2)); - EXPECT_EQ(1, getNumWorkOrdersInExecution(id2)); - - // Send a message to QueryManager upon workorder (generated by op2) completion. - EXPECT_TRUE(placeWorkOrderCompleteMessage(id2)); - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); - - EXPECT_TRUE(getOperatorFinishedStatus(id2)); -} - -TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) { - // When an operator produces workorders but no output, the QueryManager should - // check the dependents of this operator to make progress. - const QueryPlan::DAGNodeIndex kNumNodes = 5; - std::vector ids; - ids.reserve(kNumNodes); - - for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { - if (i == 0) { - ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, false)); - } else { - ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, true)); - } - VLOG(3) << ids[i]; - } - - /** - * The DAG looks like this: - * - * op1 -> op2 -> op3 -> op4 -> op5 - * - **/ - for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) { - query_plan_->addDirectDependency(ids[i + 1], ids[i], false); - static_cast(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(ids[i])) - ->setOutputRelationID(0xdead); - } - - std::vector operators; - for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { - operators.push_back(static_cast(&query_plan_->getQueryPlanDAG().getNodePayload(ids[i]))); - } - - constructQueryManager(); - - // operators[0] should have produced a workorder by now. - EXPECT_EQ(1, operators[0]->getNumWorkOrders()); - - unique_ptr worker_message; - worker_message.reset(query_manager_->getNextWorkerMessage(ids[0], -1)); - - EXPECT_TRUE(worker_message != nullptr); - EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, - worker_message->getType()); - - EXPECT_EQ(ids[0], worker_message->getRelationalOpIndex()); - - delete worker_message->getWorkOrder(); - - EXPECT_EQ(1, getNumWorkOrdersInExecution(ids[0])); - EXPECT_FALSE(getOperatorFinishedStatus(ids[0])); - - for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { - EXPECT_EQ(1, operators[ids[i]]->getNumCalls(MockOperator::kGetAllWorkOrders)); - } - - // Send a message to QueryManager upon workorder (generated by operators[0]) - // completion. - EXPECT_TRUE(placeWorkOrderCompleteMessage(ids[0])); - - for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { - EXPECT_EQ(0, getNumWorkOrdersInExecution(ids[i])); - EXPECT_TRUE(getOperatorFinishedStatus(ids[i])); - if (i < kNumNodes - 1) { - EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks)); - } - } -} - -TEST_F(QueryManagerTest, OutOfOrderWorkOrderCompletionTest) { - // Consider two operators, both generate one workorder each. The dependent's - // workorder finishes before dependency's workorder. - const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); - const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1)); - - // Create a non-blocking link. - query_plan_->addDirectDependency(id2, id1, false); - - constructQueryManager(); - - unique_ptr worker_message; - worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1)); - - EXPECT_TRUE(worker_message != nullptr); - EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, - worker_message->getType()); - - EXPECT_EQ(id1, worker_message->getRelationalOpIndex()); - - delete worker_message->getWorkOrder(); - - // Send a message to QueryManager upon a block (output of op1) getting full. - EXPECT_FALSE(placeOutputBlockMessage(id1)); - - // op1 is not finished yet because the response of workorder completion hasn't - // been received yet. - EXPECT_FALSE(getOperatorFinishedStatus(id1)); - EXPECT_FALSE(getOperatorFinishedStatus(id2)); - - worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); - EXPECT_TRUE(worker_message != nullptr); - EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, - worker_message->getType()); - - EXPECT_EQ(id2, worker_message->getRelationalOpIndex()); - - delete worker_message->getWorkOrder(); - - // As mentioned earlier, op2 finishes before op1. - EXPECT_FALSE(placeWorkOrderCompleteMessage(id2)); - - // op1's workorder execution is over. - EXPECT_TRUE(placeWorkOrderCompleteMessage(id1)); - - EXPECT_TRUE(getOperatorFinishedStatus(id1)); - EXPECT_TRUE(getOperatorFinishedStatus(id2)); -} - -} // namespace quickstep