quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [16/42] incubator-quickstep git commit: Added an util to get block domain from BlockLocator.
Date Sun, 18 Dec 2016 01:34:44 GMT
Added an util to get block domain from BlockLocator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/56555117
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/56555117
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/56555117

Branch: refs/heads/exact-filter
Commit: 565551173e797c65651eef0408c74b55e27a5796
Parents: f7d1543
Author: Zuyu Zhang <zuyuz@apache.org>
Authored: Sun Nov 20 15:17:46 2016 -0800
Committer: Zuyu Zhang <zuyuz@apache.org>
Committed: Sun Nov 20 19:58:14 2016 -0800

----------------------------------------------------------------------
 query_execution/BlockLocatorUtil.cpp            | 93 ++++++++++++++++++++
 query_execution/BlockLocatorUtil.hpp            | 59 +++++++++++++
 query_execution/CMakeLists.txt                  |  9 ++
 query_execution/tests/BlockLocator_unittest.cpp | 42 +--------
 query_optimizer/tests/CMakeLists.txt            |  4 +-
 .../DistributedExecutionGeneratorTestRunner.cpp | 54 +++---------
 .../DistributedExecutionGeneratorTestRunner.hpp |  4 +-
 storage/CMakeLists.txt                          |  4 +-
 storage/tests/DataExchange_unittest.cpp         | 50 ++---------
 9 files changed, 187 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_execution/BlockLocatorUtil.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocatorUtil.cpp b/query_execution/BlockLocatorUtil.cpp
new file mode 100644
index 0000000..d2d1e96
--- /dev/null
+++ b/query_execution/BlockLocatorUtil.cpp
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_execution/BlockLocatorUtil.hpp"
+
+#include <cstdlib>
+#include <string>
+#include <utility>
+
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/tagged_message.h"
+
+using tmb::TaggedMessage;
+using tmb::MessageBus;
+using tmb::client_id;
+
+namespace quickstep {
+namespace block_locator {
+
+namespace S = ::quickstep::serialization;
+
+block_id_domain getBlockDomain(const std::string &network_address,
+                               const client_id cli_id,
+                               client_id *locator_client_id,
+                               MessageBus *bus) {
+  tmb::Address address;
+  address.All(true);
+  // NOTE(zuyu): The singleton BlockLocator would need only one copy of the message.
+  tmb::MessageStyle style;
+
+  S::BlockDomainRegistrationMessage proto;
+  proto.set_domain_network_address(network_address);
+
+  const int proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kBlockDomainRegistrationMessage);
+  std::free(proto_bytes);
+
+  DLOG(INFO) << "Client (id '" << cli_id
+             << "') broadcasts BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
+             << "') to BlockLocator.";
+
+  CHECK(MessageBus::SendStatus::kOK ==
+      bus->Send(cli_id, address, style, std::move(message)));
+
+  const tmb::AnnotatedMessage annotated_message(bus->Receive(cli_id, 0, true));
+  const TaggedMessage &tagged_message = annotated_message.tagged_message;
+  CHECK_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
+
+  *locator_client_id = annotated_message.sender;
+
+  DLOG(INFO) << "Client (id '" << cli_id
+             << "') received BlockDomainRegistrationResponseMessage (typed '"
+             << kBlockDomainRegistrationResponseMessage
+             << "') from BlockLocator (id '" << *locator_client_id << "').";
+
+  S::BlockDomainMessage response_proto;
+  CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+  return static_cast<block_id_domain>(response_proto.block_domain());
+}
+
+}  // namespace block_locator
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_execution/BlockLocatorUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocatorUtil.hpp b/query_execution/BlockLocatorUtil.hpp
new file mode 100644
index 0000000..74f65e4
--- /dev/null
+++ b/query_execution/BlockLocatorUtil.hpp
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_UTIL_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_UTIL_HPP_
+
+#include <string>
+
+#include "storage/StorageBlockInfo.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+namespace block_locator {
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief Broadcast to find BlockLocator to get a block domain for
+ * StorageManager with the given network address.
+ *
+ * @param network_address The network address of the StorageManager.
+ * @param cli_id The client ID of the block domain requester.
+ * @param locator_client_id The client ID of BlockLocator to set.
+ * @param bus A pointer to the TMB.
+ *
+ * @return The requested block domain.
+ **/
+block_id_domain getBlockDomain(const std::string &network_address,
+                               const tmb::client_id cli_id,
+                               tmb::client_id *locator_client_id,
+                               tmb::MessageBus *bus);
+
+/** @} */
+
+}  // namespace block_locator
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_UTIL_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index eec0029..719d9f3 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -31,6 +31,7 @@ endif()
 add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
 if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
+  add_library(quickstep_queryexecution_BlockLocatorUtil BlockLocatorUtil.cpp BlockLocatorUtil.hpp)
 endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
 if (ENABLE_DISTRIBUTED)
@@ -83,6 +84,12 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_threading_ThreadUtil
                         quickstep_utility_Macros
                         tmb)
+  target_link_libraries(quickstep_queryexecution_BlockLocatorUtil
+                        glog
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_storage_StorageBlockInfo
+                        tmb)
 endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_ForemanBase
                       glog
@@ -345,6 +352,7 @@ target_link_libraries(quickstep_queryexecution
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution
                         quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_BlockLocatorUtil
                         quickstep_queryexecution_ForemanDistributed
                         quickstep_queryexecution_PolicyEnforcerDistributed
                         quickstep_queryexecution_QueryManagerDistributed
@@ -363,6 +371,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_catalog_CatalogAttribute
                         quickstep_catalog_CatalogRelation
                         quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_BlockLocatorUtil
                         quickstep_queryexecution_QueryExecutionMessages_proto
                         quickstep_queryexecution_QueryExecutionTypedefs
                         quickstep_queryexecution_QueryExecutionUtil

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_execution/tests/BlockLocator_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp
index 465f2a3..32437c3 100644
--- a/query_execution/tests/BlockLocator_unittest.cpp
+++ b/query_execution/tests/BlockLocator_unittest.cpp
@@ -26,6 +26,7 @@
 #include "catalog/CatalogAttribute.hpp"
 #include "catalog/CatalogRelation.hpp"
 #include "query_execution/BlockLocator.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
@@ -71,7 +72,6 @@ class BlockLocatorTest : public ::testing::Test {
     bus_.Initialize();
 
     locator_.reset(new BlockLocator(&bus_));
-    locator_client_id_ = locator_->getBusClientID();
     locator_->start();
 
     worker_client_id_ = bus_.Connect();
@@ -84,7 +84,9 @@ class BlockLocatorTest : public ::testing::Test {
 
     bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage);
 
-    block_domain_ = getBlockDomain(kDomainNetworkAddress);
+    block_domain_ =
+        block_locator::getBlockDomain(kDomainNetworkAddress, worker_client_id_, &locator_client_id_,
&bus_);
+    DCHECK_EQ(locator_->getBusClientID(), locator_client_id_);
 
     storage_manager_.reset(
         new StorageManager(kStoragePath, block_domain_, locator_client_id_, &bus_));
@@ -168,42 +170,6 @@ class BlockLocatorTest : public ::testing::Test {
   unique_ptr<StorageManager> storage_manager_;
 
  private:
-  block_id_domain getBlockDomain(const string &network_address) {
-    serialization::BlockDomainRegistrationMessage proto;
-    proto.set_domain_network_address(network_address);
-
-    const int proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kBlockDomainRegistrationMessage);
-    free(proto_bytes);
-
-    LOG(INFO) << "Worker (id '" << worker_client_id_
-              << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
-              << "') to BlockLocator (id '" << locator_client_id_ << "')";
-
-    CHECK(MessageBus::SendStatus::kOK ==
-        QueryExecutionUtil::SendTMBMessage(&bus_,
-                                           worker_client_id_,
-                                           locator_client_id_,
-                                           move(message)));
-
-    const AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
-    const TaggedMessage &tagged_message = annotated_message.tagged_message;
-    EXPECT_EQ(locator_client_id_, annotated_message.sender);
-    EXPECT_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
-    LOG(INFO) << "Worker (id '" << worker_client_id_
-              << "') received BlockDomainRegistrationResponseMessage from BlockLocator";
-
-    serialization::BlockDomainMessage response_proto;
-    CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-    return static_cast<block_id_domain>(response_proto.block_domain());
-  }
-
   MessageBusImpl bus_;
 
   unique_ptr<BlockLocator> locator_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index 9c764e4..b987322 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -115,8 +115,8 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_parser_ParseStatement
                         quickstep_parser_SqlParserWrapper
                         quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_BlockLocatorUtil
                         quickstep_queryexecution_ForemanDistributed
-                        quickstep_queryexecution_QueryExecutionMessages_proto
                         quickstep_queryexecution_QueryExecutionTypedefs
                         quickstep_queryexecution_QueryExecutionUtil
                         quickstep_queryexecution_Shiftboss
@@ -127,7 +127,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryoptimizer_QueryHandle
                         quickstep_queryoptimizer_tests_TestDatabaseLoader
                         quickstep_storage_DataExchangerAsync
-                        quickstep_storage_StorageBlockInfo
+                        quickstep_storage_StorageManager
                         quickstep_utility_Macros
                         quickstep_utility_MemStream
                         quickstep_utility_SqlError

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 0403e77..2351dcd 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -32,14 +32,14 @@
 #include "cli/PrintToScreen.hpp"
 #include "parser/ParseStatement.hpp"
 #include "query_execution/BlockLocator.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
 #include "query_execution/ForemanDistributed.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "query_optimizer/OptimizerContext.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "storage/DataExchangerAsync.hpp"
-#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
 #include "utility/MemStream.hpp"
 #include "utility/SqlError.hpp"
 
@@ -81,14 +81,15 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
   bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage);
 
   block_locator_ = make_unique<BlockLocator>(&bus_);
-  locator_client_id_ = block_locator_->getBusClientID();
   block_locator_->start();
 
   test_database_loader_ = make_unique<TestDatabaseLoader>(
       storage_path,
-      getBlockDomain(test_database_loader_data_exchanger_.network_address()),
+      block_locator::getBlockDomain(
+          test_database_loader_data_exchanger_.network_address(), cli_id_, &locator_client_id_,
&bus_),
       locator_client_id_,
       &bus_);
+  DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
   test_database_loader_data_exchanger_.set_storage_manager(test_database_loader_->storage_manager());
   test_database_loader_data_exchanger_.start();
 
@@ -111,7 +112,11 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
         make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, numa_nodes));
 
     auto storage_manager = make_unique<StorageManager>(
-        storage_path, getBlockDomain(data_exchangers_[i].network_address()), locator_client_id_,
&bus_);
+        storage_path,
+        block_locator::getBlockDomain(
+            data_exchangers_[i].network_address(), cli_id_, &locator_client_id_, &bus_),
+        locator_client_id_, &bus_);
+    DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
 
     data_exchangers_[i].set_storage_manager(storage_manager.get());
     shiftbosses_.push_back(
@@ -193,44 +198,5 @@ void DistributedExecutionGeneratorTestRunner::runTestCase(
   }
 }
 
-block_id_domain DistributedExecutionGeneratorTestRunner::getBlockDomain(
-    const string &network_address) {
-  serialization::BlockDomainRegistrationMessage proto;
-  proto.set_domain_network_address(network_address);
-
-  const int proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                        proto_length,
-                        kBlockDomainRegistrationMessage);
-  free(proto_bytes);
-
-  DLOG(INFO) << "Client (id '" << cli_id_
-             << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
-             << "') to BlockLocator (id '" << locator_client_id_ << "')";
-
-  CHECK(MessageBus::SendStatus::kOK ==
-      QueryExecutionUtil::SendTMBMessage(&bus_,
-                                         cli_id_,
-                                         locator_client_id_,
-                                         move(message)));
-
-  const tmb::AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true));
-  const TaggedMessage &tagged_message = annotated_message.tagged_message;
-  CHECK_EQ(locator_client_id_, annotated_message.sender);
-  CHECK_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
-  DLOG(INFO) << "Client (id '" << cli_id_
-             << "') received BlockDomainRegistrationResponseMessage (typed '"
-             << kBlockDomainRegistrationResponseMessage
-             << "') from BlockLocator";
-
-  serialization::BlockDomainMessage response_proto;
-  CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-  return static_cast<block_id_domain>(response_proto.block_domain());
-}
-
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
index d2b13e4..63e320d 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -38,7 +38,7 @@
 #include "query_optimizer/Optimizer.hpp"
 #include "query_optimizer/tests/TestDatabaseLoader.hpp"
 #include "storage/DataExchangerAsync.hpp"
-#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
 #include "utility/Macros.hpp"
 #include "utility/textbased_test/TextBasedTestRunner.hpp"
 
@@ -115,8 +115,6 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner
{
                    std::string *output) override;
 
  private:
-  block_id_domain getBlockDomain(const std::string &network_address);
-
   std::size_t query_id_;
 
   SqlParserWrapper sql_parser_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index be60662..559d86d 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -862,7 +862,7 @@ target_link_libraries(quickstep_storage_PartitionedHashTablePool
                       quickstep_storage_FastHashTableFactory
                       quickstep_storage_HashTableBase
                       quickstep_utility_Macros
-                      quickstep_utility_StringUtil)                    
+                      quickstep_utility_StringUtil)
 target_link_libraries(quickstep_storage_PreloaderThread
                       glog
                       quickstep_catalog_CatalogDatabase
@@ -1502,7 +1502,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_catalog_CatalogRelation
                         quickstep_catalog_CatalogTypedefs
                         quickstep_queryexecution_BlockLocator
-                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_BlockLocatorUtil
                         quickstep_queryexecution_QueryExecutionTypedefs
                         quickstep_queryexecution_QueryExecutionUtil
                         quickstep_storage_CountedReference

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/storage/tests/DataExchange_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/DataExchange_unittest.cpp b/storage/tests/DataExchange_unittest.cpp
index 9c75150..ac39728 100644
--- a/storage/tests/DataExchange_unittest.cpp
+++ b/storage/tests/DataExchange_unittest.cpp
@@ -27,7 +27,7 @@
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/BlockLocator.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/BlockLocatorUtil.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "storage/CountedReference.hpp"
@@ -77,7 +77,6 @@ class DataExchangeTest : public ::testing::Test {
     bus_.Initialize();
 
     locator_.reset(new BlockLocator(&bus_));
-    locator_client_id_ = locator_->getBusClientID();
     locator_->start();
 
     worker_client_id_ = bus_.Connect();
@@ -88,18 +87,22 @@ class DataExchangeTest : public ::testing::Test {
 
     storage_manager_expected_.reset(new StorageManager(
         kStoragePath,
-        getBlockDomain(data_exchanger_expected_.network_address()),
+        block_locator::getBlockDomain(
+            data_exchanger_expected_.network_address(), worker_client_id_, &locator_client_id_,
&bus_),
         locator_client_id_,
         &bus_));
+    DCHECK_EQ(locator_->getBusClientID(), locator_client_id_);
 
     data_exchanger_expected_.set_storage_manager(storage_manager_expected_.get());
     data_exchanger_expected_.start();
 
     storage_manager_checked_.reset(new StorageManager(
         kStoragePath,
-        getBlockDomain(kCheckedDomainNetworkAddress),
+        block_locator::getBlockDomain(
+            kCheckedDomainNetworkAddress, worker_client_id_, &locator_client_id_, &bus_),
         locator_client_id_,
         &bus_));
+    DCHECK_EQ(locator_->getBusClientID(), locator_client_id_);
   }
 
   virtual void TearDown() {
@@ -123,45 +126,6 @@ class DataExchangeTest : public ::testing::Test {
   unique_ptr<StorageManager> storage_manager_expected_, storage_manager_checked_;
 
  private:
-  block_id_domain getBlockDomain(const string &network_address) {
-    serialization::BlockDomainRegistrationMessage proto;
-    proto.set_domain_network_address(network_address);
-
-    const int proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kBlockDomainRegistrationMessage);
-    free(proto_bytes);
-
-    LOG(INFO) << "Worker (id '" << worker_client_id_
-              << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
-              << "') to BlockLocator (id '" << locator_client_id_ << "')";
-
-    CHECK(MessageBus::SendStatus::kOK ==
-        QueryExecutionUtil::SendTMBMessage(&bus_,
-                                           worker_client_id_,
-                                           locator_client_id_,
-                                           move(message)));
-
-    const tmb::AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
-    const TaggedMessage &tagged_message = annotated_message.tagged_message;
-    EXPECT_EQ(locator_client_id_, annotated_message.sender);
-    EXPECT_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
-
-    LOG(INFO) << "Worker (id '" << worker_client_id_
-              << "') received BlockDomainRegistrationResponseMessage (typed '"
-              << kBlockDomainRegistrationResponseMessage
-              << "') from BlockLocator";
-
-    serialization::BlockDomainMessage response_proto;
-    CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-    return static_cast<block_id_domain>(response_proto.block_domain());
-  }
-
   MessageBusImpl bus_;
 
   unique_ptr<BlockLocator> locator_;


Mime
View raw message