quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zu...@apache.org
Subject incubator-quickstep git commit: Added the distributed execution engine and tests. [Forced Update!]
Date Fri, 29 Jul 2016 23:00:15 GMT
Repository: incubator-quickstep
Updated Branches:
  refs/heads/foreman-dist 1a5692b45 -> 220fa06ff (forced update)


Added the distributed execution engine and tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/220fa06f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/220fa06f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/220fa06f

Branch: refs/heads/foreman-dist
Commit: 220fa06fff04ffffe14bf24ee090c5bea0b5f55d
Parents: aaecc76
Author: Zuyu Zhang <zuyuz@twitter.com>
Authored: Fri Jul 22 11:31:33 2016 -0700
Committer: Zuyu Zhang <zuyuz@twitter.com>
Committed: Fri Jul 29 15:59:10 2016 -0700

----------------------------------------------------------------------
 query_execution/AdmitRequestMessage.hpp         |   7 +
 query_execution/CMakeLists.txt                  |  50 ++-
 query_execution/ForemanDistributed.cpp          | 333 +++++++++++++++++++
 query_execution/ForemanDistributed.hpp          | 130 ++++++++
 query_execution/PolicyEnforcerBase.cpp          |   3 +
 query_execution/PolicyEnforcerBase.hpp          |   7 +
 query_execution/PolicyEnforcerDistributed.cpp   | 253 ++++++++++++++
 query_execution/PolicyEnforcerDistributed.hpp   | 112 +++++++
 query_execution/QueryExecutionMessages.proto    |  16 +-
 query_execution/QueryExecutionTypedefs.hpp      |   4 +
 query_execution/QueryManagerBase.cpp            |   3 +-
 query_execution/QueryManagerBase.hpp            |  26 +-
 query_execution/QueryManagerDistributed.cpp     |  41 ++-
 query_execution/QueryManagerDistributed.hpp     |  10 +-
 query_execution/Shiftboss.cpp                   |  89 +++--
 query_optimizer/CMakeLists.txt                  |   4 +
 query_optimizer/QueryHandle.hpp                 |  26 ++
 query_optimizer/tests/CMakeLists.txt            |  41 +++
 .../tests/DistributedExecutionGeneratorTest.cpp |  57 ++++
 .../DistributedExecutionGeneratorTestRunner.cpp | 122 +++++++
 .../DistributedExecutionGeneratorTestRunner.hpp | 146 ++++++++
 .../tests/execution_generator/CMakeLists.txt    |  68 +++-
 third_party/tmb/include/tmb/tagged_message.h    |   9 +
 23 files changed, 1511 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/AdmitRequestMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/AdmitRequestMessage.hpp b/query_execution/AdmitRequestMessage.hpp
index e33b354..75c5ff6 100644
--- a/query_execution/AdmitRequestMessage.hpp
+++ b/query_execution/AdmitRequestMessage.hpp
@@ -60,6 +60,13 @@ class AdmitRequestMessage {
     return query_handles_;
   }
 
+  /**
+   * @brief Get the mutable query handles from this message.
+   **/
+  std::vector<QueryHandle*>* getQueryHandlesMutable() {
+    return &query_handles_;
+  }
+
  private:
   std::vector<QueryHandle*> query_handles_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 8bf1ab1..cfb72d7 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -33,8 +33,14 @@ if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
 endif()
 add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
+if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_queryexecution_ForemanDistributed ForemanDistributed.cpp ForemanDistributed.hpp)
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
+if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_queryexecution_PolicyEnforcerDistributed PolicyEnforcerDistributed.cpp PolicyEnforcerDistributed.hpp)
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp PolicyEnforcerSingleNode.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
 add_library(quickstep_queryexecution_QueryContext_proto
@@ -83,6 +89,26 @@ target_link_libraries(quickstep_queryexecution_ForemanBase
                       quickstep_threading_Thread
                       quickstep_utility_Macros
                       tmb)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution_ForemanDistributed
+                        glog
+                        quickstep_catalog_CatalogDatabase
+                        quickstep_catalog_CatalogRelation
+                        quickstep_catalog_CatalogTypedefs
+                        quickstep_queryexecution_AdmitRequestMessage
+                        quickstep_queryexecution_ForemanBase
+                        quickstep_queryexecution_PolicyEnforcerDistributed
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryexecution_ShiftbossDirectory
+                        quickstep_queryoptimizer_QueryHandle
+                        quickstep_threading_ThreadUtil
+                        quickstep_utility_EqualsAnyConstant
+                        quickstep_utility_Macros
+                        tmb
+                        ${GFLAGS_LIB_NAME})
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_ForemanSingleNode
                       glog
                       quickstep_queryexecution_AdmitRequestMessage
@@ -110,6 +136,26 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
                       tmb)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed
+                        glog
+                        quickstep_catalog_CatalogRelation
+                        quickstep_catalog_Catalog_proto
+                        quickstep_queryexecution_PolicyEnforcerBase
+                        quickstep_queryexecution_QueryContext_proto
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionState
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryexecution_QueryManagerBase
+                        quickstep_queryexecution_QueryManagerDistributed
+                        quickstep_queryexecution_ShiftbossDirectory
+                        quickstep_queryoptimizer_QueryHandle
+                        quickstep_storage_StorageBlockInfo
+                        quickstep_utility_Macros
+                        tmb
+                        ${GFLAGS_LIB_NAME})
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
                       glog
                       quickstep_catalog_CatalogTypedefs
@@ -294,10 +340,12 @@ target_link_libraries(quickstep_queryexecution
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution
                         quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_ForemanDistributed
+                        quickstep_queryexecution_PolicyEnforcerDistributed
                         quickstep_queryexecution_QueryManagerDistributed
                         quickstep_queryexecution_Shiftboss
                         quickstep_queryexecution_ShiftbossDirectory)
-endif()
+endif(ENABLE_DISTRIBUTED)
 
 # Tests:
 if (ENABLE_DISTRIBUTED)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
new file mode 100644
index 0000000..1c0fba8
--- /dev/null
+++ b/query_execution/ForemanDistributed.cpp
@@ -0,0 +1,333 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/ForemanDistributed.hpp"
+
+#include <cstddef>
+#include <cstdio>
+#include <cstdlib>
+#include <memory>
+#include <tuple>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "threading/ThreadUtil.hpp"
+#include "utility/EqualsAnyConstant.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/tagged_message.h"
+
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::AnnotatedMessage;
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+namespace S = serialization;
+
+ForemanDistributed::ForemanDistributed(
+    tmb::MessageBus *bus,
+    CatalogDatabaseLite *catalog_database,
+    const int cpu_id,
+    const bool profile_individual_workorders)
+    : ForemanBase(bus, cpu_id),
+      catalog_database_(DCHECK_NOTNULL(catalog_database)) {
+  const std::vector<QueryExecutionMessageType> sender_message_types{
+      kShiftbossRegistrationResponseMessage,
+      kQueryInitiateMessage,
+      kWorkOrderMessage,
+      kInitiateRebuildMessage,
+      kSaveQueryResultMessage,
+      kQueryExecutionSuccessMessage,
+      kPoisonMessage};
+
+  for (const auto message_type : sender_message_types) {
+    bus_->RegisterClientAsSender(foreman_client_id_, message_type);
+  }
+
+  const std::vector<QueryExecutionMessageType> receiver_message_types{
+      kShiftbossRegistrationMessage,
+      kAdmitRequestMessage,
+      kQueryInitiateResponseMessage,
+      kCatalogRelationNewBlockMessage,
+      kDataPipelineMessage,
+      kInitiateRebuildResponseMessage,
+      kWorkOrderCompleteMessage,
+      kRebuildWorkOrderCompleteMessage,
+      kWorkOrderFeedbackMessage,
+      kWorkOrdersAvailableMessage,
+      kSaveQueryResultResponseMessage,
+      kPoisonMessage};
+
+  for (const auto message_type : receiver_message_types) {
+    bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
+  }
+
+  policy_enforcer_.reset(new PolicyEnforcerDistributed(
+      foreman_client_id_,
+      catalog_database_,
+      &shiftboss_directory_,
+      bus_,
+      profile_individual_workorders));
+}
+
+void ForemanDistributed::run() {
+  if (cpu_id_ >= 0) {
+    // We can pin the foreman thread to a CPU if specified.
+    ThreadUtil::BindToCPU(cpu_id_);
+  }
+
+  // Ensure that at least one Shiftboss to register.
+  if (shiftboss_directory_.empty()) {
+    const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type());
+    LOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type()
+              << "' message from client " << annotated_message.sender;
+
+    S::ShiftbossRegistrationMessage proto;
+    CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+    processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
+    DCHECK_EQ(1u, shiftboss_directory_.size());
+  }
+
+  // Event loop
+  for (;;) {
+    // Receive() causes this thread to sleep until next message is received.
+    const AnnotatedMessage annotated_message =
+        bus_->Receive(foreman_client_id_, 0, true);
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    const tmb::message_type_id message_type = tagged_message.message_type();
+    LOG(INFO) << "ForemanDistributed received typed '" << message_type
+              << "' message from client " << annotated_message.sender;
+    switch (message_type) {
+      case kShiftbossRegistrationMessage: {
+        S::ShiftbossRegistrationMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
+        break;
+      }
+      case kAdmitRequestMessage: {
+        AdmitRequestMessage *request_message =
+            const_cast<AdmitRequestMessage*>(
+                static_cast<const AdmitRequestMessage*>(tagged_message.message()));
+
+        vector<QueryHandle *> *query_handles = request_message->getQueryHandlesMutable();
+        DCHECK(!query_handles->empty());
+
+        for (QueryHandle *handle : *query_handles) {
+          handle->setClientId(annotated_message.sender);
+        }
+
+        bool all_queries_admitted = true;
+        if (query_handles->size() == 1u) {
+          all_queries_admitted =
+              policy_enforcer_->admitQuery(query_handles->front());
+        } else {
+          all_queries_admitted = policy_enforcer_->admitQueries(*query_handles);
+        }
+        if (!all_queries_admitted) {
+          LOG(WARNING) << "The scheduler could not admit all the queries";
+          // TODO(harshad) - Inform the main thread about the failure.
+        }
+        break;
+      }
+      case kQueryInitiateResponseMessage: {
+        // TODO(zuyu): check the query id.
+        break;
+      }
+      case kCatalogRelationNewBlockMessage:  // Fall through
+      case kDataPipelineMessage:
+      case kRebuildWorkOrderCompleteMessage:
+      case kWorkOrderCompleteMessage:
+      case kWorkOrderFeedbackMessage:
+      case kWorkOrdersAvailableMessage: {
+        policy_enforcer_->processMessage(tagged_message);
+        break;
+      }
+      case kInitiateRebuildResponseMessage: {
+        // A unique case in the distributed version.
+        policy_enforcer_->processInitiateRebuildResponseMessage(tagged_message);
+        break;
+      }
+      case kSaveQueryResultResponseMessage: {
+        S::SaveQueryResultResponseMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
+        break;
+      }
+      case kPoisonMessage: {
+        if (policy_enforcer_->hasQueries()) {
+          LOG(WARNING) << "Foreman thread exiting while some queries are "
+                          "under execution or waiting to be admitted";
+        }
+
+        // Shutdown all Shiftbosses.
+        tmb::Address shiftboss_addresses;
+        for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) {
+          shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i));
+        }
+
+        tmb::MessageStyle broadcast_style;
+        broadcast_style.Broadcast(true);
+
+        TaggedMessage poison_message(kPoisonMessage);
+
+        const tmb::MessageBus::SendStatus send_status =
+            bus_->Send(foreman_client_id_,
+                       shiftboss_addresses,
+                       broadcast_style,
+                       move(poison_message));
+        DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+        return;
+      }
+      default:
+        LOG(FATAL) << "Unknown message type to Foreman";
+    }
+
+    if (canCollectNewMessages(message_type)) {
+      vector<unique_ptr<S::WorkOrderMessage>> new_messages;
+      policy_enforcer_->getWorkOrderMessages(&new_messages);
+      dispatchWorkOrderMessages(new_messages);
+    }
+  }
+}
+
+bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) {
+  return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
+                                        kCatalogRelationNewBlockMessage,
+                                        kWorkOrderFeedbackMessage) &&
+         // TODO(zuyu): Multiple Shiftbosses support.
+         !shiftboss_directory_.hasReachedCapacity(0);
+}
+
+void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
+  for (const auto &message : messages) {
+    DCHECK(message != nullptr);
+    // TODO(zuyu): Multiple Shiftbosses support.
+    sendWorkOrderMessage(0, *message);
+    shiftboss_directory_.incrementNumQueuedWorkOrders(0);
+  }
+}
+
+void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index,
+                                              const S::WorkOrderMessage &proto) {
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kWorkOrderMessage);
+  free(proto_bytes);
+
+  const tmb::MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         foreman_client_id_,
+                                         shiftboss_directory_.getClientId(shiftboss_index),
+                                         move(message));
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+      << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+      << " to Shiftboss with TMB client ID " << shiftboss_directory_.getClientId(shiftboss_index);
+}
+
+void ForemanDistributed::printWorkOrderProfilingResults(const std::size_t query_id,
+                                                        std::FILE *out) const {
+  const std::vector<
+      std::tuple<std::size_t, std::size_t, std::size_t>>
+          &recorded_times = policy_enforcer_->getProfilingResults(query_id);
+  fputs("Query ID,Worker ID,Operator ID,Time (microseconds)\n", out);
+  for (const auto &workorder_entry : recorded_times) {
+    // Note: Index of the "worker thread index" in the tuple is 0.
+    const std::size_t worker_id = std::get<0>(workorder_entry);
+    fprintf(out,
+            "%lu,%lu,%lu,%lu\n",
+            query_id,
+            worker_id,
+            std::get<1>(workorder_entry),  // Operator ID.
+            std::get<2>(workorder_entry));  // Time.
+  }
+}
+
+void ForemanDistributed::processShiftbossRegisterationMessage(const client_id shiftboss_client_id,
+                                                              const std::size_t work_order_capacity) {
+  shiftboss_directory_.addShiftboss(shiftboss_client_id, work_order_capacity);
+
+  S::ShiftbossRegistrationResponseMessage proto;
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kShiftbossRegistrationResponseMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage (typed '"
+            << kShiftbossRegistrationResponseMessage
+            << "') to Shiftboss with TMB client id " << shiftboss_client_id;
+  QueryExecutionUtil::SendTMBMessage(bus_,
+                                     foreman_client_id_,
+                                     shiftboss_client_id,
+                                     move(message));
+}
+
+void ForemanDistributed::processSaveQueryResultResponseMessage(const tmb::client_id cli_id,
+                                                               const relation_id result_relation_id) {
+  S::QueryExecutionSuccessMessage proto;
+  proto.mutable_result_relation()->MergeFrom(
+      static_cast<CatalogDatabase*>(catalog_database_)->getRelationById(result_relation_id)->getProto());
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kQueryExecutionSuccessMessage);
+  free(proto_bytes);
+
+  // Notify the CLI regarding the query result.
+  LOG(INFO) << "ForemanDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
+            << "') to CLI with TMB client id " << cli_id;
+  QueryExecutionUtil::SendTMBMessage(bus_,
+                                     foreman_client_id_,
+                                     cli_id,
+                                     move(message));
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
new file mode 100644
index 0000000..8a4a97c
--- /dev/null
+++ b/query_execution/ForemanDistributed.hpp
@@ -0,0 +1,130 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/ForemanBase.hpp"
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief The Foreman receives queries from the main thread, messages from the
+ *        policy enforcer and dispatches the work to Shiftbosses. It also
+ *        receives work completion messages from Shiftbosses.
+ **/
+class ForemanDistributed final : public ForemanBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param bus A pointer to the TMB.
+   * @param catalog_database The catalog database where this query is executed.
+   * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
+   * @param profile_individual_workorders Whether every workorder's execution
+   *        be profiled or not.
+   *
+   * @note If cpu_id is not specified, Foreman thread can be possibly moved
+   *       around on different CPUs by the OS.
+  **/
+  ForemanDistributed(tmb::MessageBus *bus,
+                     CatalogDatabaseLite *catalog_database,
+                     const int cpu_id = -1,
+                     const bool profile_individual_workorders = false);
+
+  ~ForemanDistributed() override {}
+
+  /**
+   * @brief Print the results of profiling individual work orders for a given
+   *        query.
+   *
+   * TODO(harshad) - Add the name of the operator to the output.
+   *
+   * @param query_id The ID of the query for which the results are to be printed.
+   * @param out The file stream.
+   **/
+  void printWorkOrderProfilingResults(const std::size_t query_id,
+                                      std::FILE *out) const;
+
+ protected:
+  void run() override;
+
+ private:
+  /**
+   * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
+   *        worker threads.
+   *
+   * @param messages The messages to be dispatched.
+   **/
+  void dispatchWorkOrderMessages(
+      const std::vector<std::unique_ptr<serialization::WorkOrderMessage>> &messages);
+
+  /**
+   * @brief Send the given message to the specified worker.
+   *
+   * @param worker_index The logical index of the recipient worker in
+   *        ShiftbossDirectory.
+   * @param proto The WorkOrderMessage to be sent.
+   **/
+  void sendWorkOrderMessage(const std::size_t worker_index,
+                            const serialization::WorkOrderMessage &proto);
+
+  void processShiftbossRegisterationMessage(const tmb::client_id shiftboss_client_id,
+                                            const std::size_t work_order_capacity);
+
+  void processSaveQueryResultResponseMessage(const tmb::client_id cli_id,
+                                             const relation_id result_relation_id);
+
+  /**
+   * @brief Check if we can collect new messages from the PolicyEnforcer.
+   *
+   * @param message_type The type of the last received message.
+   **/
+  bool canCollectNewMessages(const tmb::message_type_id message_type);
+
+  ShiftbossDirectory shiftboss_directory_;
+
+  CatalogDatabaseLite *catalog_database_;
+
+  std::unique_ptr<PolicyEnforcerDistributed> policy_enforcer_;
+
+  DISALLOW_COPY_AND_ASSIGN(ForemanDistributed);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index d16a502..a28fa3b 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -131,8 +131,11 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
     default:
       LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
   }
+
   if (admitted_queries_[query_id]->queryStatus(op_index) ==
           QueryManagerBase::QueryStatusCode::kQueryExecuted) {
+    onQueryCompletion(admitted_queries_[query_id]->query_handle());
+
     removeQuery(query_id);
     if (!waiting_queries_.empty()) {
       // Admit the earliest waiting query.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index 0482ebc..1de0677 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -148,6 +148,13 @@ class PolicyEnforcerBase {
   void recordTimeForWorkOrder(
       const serialization::NormalWorkOrderCompletionMessage &proto);
 
+  /**
+   * @brief Add custom actions upon the completion of a query.
+   *
+   * @param query_handle The query handle.
+   **/
+  virtual void onQueryCompletion(QueryHandle *query_handle) {}
+
   CatalogDatabaseLite *catalog_database_;
 
   const bool profile_individual_workorders_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
new file mode 100644
index 0000000..59df3de
--- /dev/null
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -0,0 +1,253 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+
+#include <cstddef>
+#include <cstdlib>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/QueryContext.pb.h"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/QueryManagerDistributed.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+namespace S = serialization;
+
+DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
+              " can be allocated in a single round of dispatch of messages to"
+              " the workers.");
+
+void PolicyEnforcerDistributed::getWorkOrderMessages(
+    vector<unique_ptr<S::WorkOrderMessage>> *work_order_messages) {
+  // Iterate over admitted queries until either there are no more
+  // messages available, or the maximum number of messages have
+  // been collected.
+  DCHECK(work_order_messages->empty());
+  // TODO(harshad) - Make this function generic enough so that it
+  // works well when multiple queries are getting executed.
+  if (admitted_queries_.empty()) {
+    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+    return;
+  }
+
+  const std::size_t per_query_share =
+      FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
+  DCHECK_GT(per_query_share, 0u);
+
+  vector<std::size_t> finished_queries_ids;
+
+  for (const auto &admitted_query_info : admitted_queries_) {
+    QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
+    DCHECK(curr_query_manager != nullptr);
+    std::size_t messages_collected_curr_query = 0;
+    while (messages_collected_curr_query < per_query_share) {
+      S::WorkOrderMessage *next_work_order_message =
+          static_cast<QueryManagerDistributed*>(curr_query_manager)->getNextWorkOrderMessage(0);
+      if (next_work_order_message != nullptr) {
+        ++messages_collected_curr_query;
+        work_order_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message));
+      } else {
+        // No more work ordes from the current query at this time.
+        // Check if the query's execution is over.
+        if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+          // If the query has been executed, remove it.
+          finished_queries_ids.push_back(admitted_query_info.first);
+        }
+        break;
+      }
+    }
+  }
+  for (const std::size_t finished_qid : finished_queries_ids) {
+    onQueryCompletion(admitted_queries_[finished_qid]->query_handle());
+    removeQuery(finished_qid);
+  }
+}
+
+bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) {
+  if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
+    // Ok to admit the query.
+    const std::size_t query_id = query_handle->query_id();
+    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+      // NOTE(zuyu): Should call before constructing a 'QueryManager'.
+      // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext'
+      // initializes.
+      initiateQueryInShiftboss(query_handle);
+
+      // Query with the same ID not present, ok to admit.
+      admitted_queries_[query_id].reset(
+          new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_));
+      return true;
+    } else {
+      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+      return false;
+    }
+  } else {
+    // This query will have to wait.
+    waiting_queries_.push(query_handle);
+    return false;
+  }
+}
+
+void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message) {
+  S::InitiateRebuildResponseMessage proto;
+  CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+  const std::size_t query_id = proto.query_id();
+  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+  QueryManagerBase *query_manager = admitted_queries_[query_id].get();
+
+  const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders();
+  query_manager->processInitiateRebuildResponseMessage(proto.operator_index(), num_rebuild_work_orders);
+  shiftboss_directory_->addNumQueuedWorkOrders(proto.shiftboss_index(), num_rebuild_work_orders);
+
+  if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+    onQueryCompletion(query_manager->query_handle());
+
+    removeQuery(query_id);
+    if (!waiting_queries_.empty()) {
+      // Admit the earliest waiting query.
+      QueryHandle *new_query = waiting_queries_.front();
+      waiting_queries_.pop();
+      admitQuery(new_query);
+    }
+  }
+}
+
+void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_handle) {
+  S::QueryInitiateMessage proto;
+  proto.set_query_id(query_handle->query_id());
+  proto.mutable_catalog_database_cache()->MergeFrom(query_handle->getCatalogDatabaseCacheProto());
+  proto.mutable_query_context()->MergeFrom(query_handle->getQueryContextProto());
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kQueryInitiateMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage
+            << "') to Shiftboss 0";
+
+  // TODO(zuyu): Multiple Shiftbosses support.
+  const tmb::MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         foreman_client_id_,
+                                         shiftboss_directory_->getClientId(0),
+                                         move(message));
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+      << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+      << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+
+  // Wait Shiftboss for QueryInitiateResponseMessage.
+  const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
+  const TaggedMessage &tagged_message = annotated_message.tagged_message;
+  DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type());
+  LOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type()
+            << "' message from client " << annotated_message.sender;
+
+  S::QueryInitiateResponseMessage proto_response;
+  CHECK(proto_response.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+}
+
+void PolicyEnforcerDistributed::onQueryCompletion(QueryHandle *query_handle) {
+  const CatalogRelation *query_result = query_handle->getQueryResultRelation();
+  if (query_result == nullptr) {
+    // TODO(zuyu): notify Shiftboss to remove QueryContext.
+    TaggedMessage message(kQueryExecutionSuccessMessage);
+
+    const tmb::client_id cli_id = query_handle->getClientId();
+
+    // Notify the CLI regarding the query execution result.
+    LOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
+              << "') to CLI with TMB client id " << cli_id;
+    const tmb::MessageBus::SendStatus send_status =
+        QueryExecutionUtil::SendTMBMessage(bus_,
+                                           foreman_client_id_,
+                                           cli_id,
+                                           move(message));
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+        << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+        << " to CLI with TMB client ID " << cli_id;
+    return;
+  }
+
+  // SaveQueryResultMessage implies QueryContext clean up in Shiftboss.
+  S::SaveQueryResultMessage proto;
+  proto.set_query_id(query_handle->query_id());
+  proto.set_relation_id(query_result->getID());
+
+  const vector<block_id> blocks(query_result->getBlocksSnapshot());
+  for (const block_id block : blocks) {
+    proto.add_blocks(block);
+  }
+
+  proto.set_cli_id(query_handle->getClientId());
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kSaveQueryResultMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
+            << "') to Shiftboss 0";
+  // TODO(zuyu): Support multiple shiftbosses.
+  const tmb::MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         foreman_client_id_,
+                                         shiftboss_directory_->getClientId(0),
+                                         move(message));
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+      << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+      << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
new file mode 100644
index 0000000..8b07748
--- /dev/null
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -0,0 +1,112 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "query_execution/PolicyEnforcerBase.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb {
+class MessageBus;
+class TaggedMessage;
+}
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ *        in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param foreman_client_id The TMB client ID of the Foreman.
+   * @param catalog_database The CatalogDatabase used.
+   * @param bus The TMB.
+   **/
+  PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
+                            CatalogDatabaseLite *catalog_database,
+                            ShiftbossDirectory *shiftboss_directory,
+                            tmb::MessageBus *bus,
+                            const bool profile_individual_workorders = false)
+      : PolicyEnforcerBase(catalog_database, profile_individual_workorders),
+        foreman_client_id_(foreman_client_id),
+        shiftboss_directory_(shiftboss_directory),
+        bus_(bus) {}
+
+  /**
+   * @brief Destructor.
+   **/
+  ~PolicyEnforcerDistributed() override {}
+
+  bool admitQuery(QueryHandle *query_handle) override;
+
+  /**
+   * @brief Get work order messages to be dispatched. These messages come from
+   *        the active queries.
+   *
+   * @param work_order_messages The work order messages to be dispatched.
+   **/
+  void getWorkOrderMessages(
+      std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_messages);
+
+  /**
+   * @brief Process the initiate rebuild work order response message.
+   *
+   * @param tagged_message The message.
+   **/
+  void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
+
+ private:
+  void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) override {
+    shiftboss_directory_->decrementNumQueuedWorkOrders(shiftboss_index);
+  }
+
+  void onQueryCompletion(QueryHandle *query_handle) override;
+
+  void initiateQueryInShiftboss(QueryHandle *query_handle);
+
+  const tmb::client_id foreman_client_id_;
+
+  ShiftbossDirectory *shiftboss_directory_;
+
+  tmb::MessageBus *bus_;
+
+  DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 308d736..99de75c 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -111,15 +111,27 @@ message InitiateRebuildResponseMessage {
   required uint64 query_id = 1;
   required uint64 operator_index = 2;
   required uint64 num_rebuild_work_orders = 3;
+  required uint64 shiftboss_index = 4;
 }
 
 message SaveQueryResultMessage {
-  required int32 relation_id = 1;
-  repeated fixed64 blocks = 2 [packed=true];
+  required uint64 query_id = 1;
+  required int32 relation_id = 2;
+  repeated fixed64 blocks = 3 [packed=true];
+
+  // Defined in "tmb/id_typedefs.h".
+  required uint32 cli_id = 4;
 }
 
 message SaveQueryResultResponseMessage {
   required int32 relation_id = 1;
+
+  // Defined in "tmb/id_typedefs.h".
+  required uint32 cli_id = 2;
+}
+
+message QueryExecutionSuccessMessage {
+  optional CatalogRelationSchema result_relation = 1;
 }
 
 // BlockLocator related messages.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index b67209f..0d43237 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -84,6 +84,10 @@ enum QueryExecutionMessageType : message_type_id {
   kSaveQueryResultMessage,  // From Foreman to Shiftboss.
   kSaveQueryResultResponseMessage,  // From Shiftboss to Foreman.
 
+  // From Foreman to CLI.
+  kQueryExecutionSuccessMessage,
+  kQueryExecutionErrorMessage,
+
   // BlockLocator related messages, sorted in a life cycle of StorageManager
   // with a unique block domain.
   kBlockDomainRegistrationMessage,  // From Worker to BlockLocator.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index d2a3341..4ee51c3 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -35,7 +35,8 @@ using std::pair;
 namespace quickstep {
 
 QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
-    : query_id_(DCHECK_NOTNULL(query_handle)->query_id()),
+    : query_handle_(query_handle),
+      query_id_(DCHECK_NOTNULL(query_handle)->query_id()),
       query_dag_(DCHECK_NOTNULL(
           DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())),
       num_operators_in_dag_(query_dag_->size()),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index 6edfd5c..3338478 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -24,6 +24,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryExecutionState.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -79,6 +80,13 @@ class QueryManagerBase {
   }
 
   /**
+   * @brief Get the query handle.
+   **/
+  inline QueryHandle* query_handle() const {
+    return query_handle_;
+  }
+
+  /**
    * @brief Process the received WorkOrder complete message.
    *
    * @param op_index The index of the specified operator node in the query DAG
@@ -128,6 +136,20 @@ class QueryManagerBase {
   void processFeedbackMessage(const dag_node_index op_index,
                               const WorkOrder::FeedbackMessage &message);
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Process the initiate rebuild work order response message.
+   *
+   * @param shiftboss_index The Shiftboss index for the rebuild work orders.
+   * @param op_index The index of the specified operator node in the query DAG
+   *        for initiating the rebuild work order.
+   * @param num_rebuild_work_orders The number of the rebuild work orders
+   *        generated for the operator indexed by 'op_index'.
+   **/
+  virtual void processInitiateRebuildResponseMessage(const dag_node_index op_index,
+                                                     const std::size_t num_rebuild_work_orders) {}
+#endif  // QUICKSTEP_DISTRIBUTED
+
   /**
    * @brief Get the query status after processing an incoming message.
    *
@@ -250,9 +272,11 @@ class QueryManagerBase {
     return query_exec_state_->hasRebuildInitiated(index);
   }
 
+  QueryHandle *query_handle_;  // Owned by the optimizer.
+
   const std::size_t query_id_;
 
-  DAG<RelationalOperator, bool> *query_dag_;
+  DAG<RelationalOperator, bool> *query_dag_;  // Owned by 'query_handle_'.
   const dag_node_index num_operators_in_dag_;
 
   // For all nodes, store their receiving dependents.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index e906fa5..bed3e45 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -32,6 +32,7 @@
 #include "glog/logging.h"
 
 #include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
 
 using std::free;
 using std::malloc;
@@ -42,11 +43,11 @@ using std::unique_ptr;
 namespace quickstep {
 
 QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
-                                                 ShiftbossDirectory *shiftbosses,
+                                                 ShiftbossDirectory *shiftboss_directory,
                                                  const tmb::client_id foreman_client_id,
                                                  tmb::MessageBus *bus)
     : QueryManagerBase(query_handle),
-      shiftbosses_(shiftbosses),
+      shiftboss_directory_(shiftboss_directory),
       foreman_client_id_(foreman_client_id),
       bus_(bus),
       normal_workorder_protos_container_(
@@ -119,6 +120,27 @@ bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index)
   return generated_new_workorder_protos;
 }
 
+void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index,
+                                                                    const std::size_t num_rebuild_work_orders) {
+  // TODO(zuyu): Multiple workers support.
+  query_exec_state_->setRebuildStatus(op_index, num_rebuild_work_orders, true);
+
+  if (num_rebuild_work_orders != 0u) {
+    // Wait for the rebuild work orders finish.
+    return;
+  }
+
+  markOperatorFinished(op_index);
+
+  for (const std::pair<dag_node_index, bool> &dependent_link :
+       query_dag_->getDependents(op_index)) {
+    const dag_node_index dependent_op_index = dependent_link.first;
+    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+      processOperator(dependent_op_index, true);
+    }
+  }
+}
+
 bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
   DCHECK(checkRebuildRequired(index));
   DCHECK(!checkRebuildInitiated(index));
@@ -127,6 +149,7 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
   DCHECK_NE(op.getInsertDestinationID(), QueryContext::kInvalidInsertDestinationId);
 
   serialization::InitiateRebuildMessage proto;
+  proto.set_query_id(query_id_);
   proto.set_operator_index(index);
   proto.set_insert_destination_index(op.getInsertDestinationID());
   proto.set_relation_id(op.getOutputRelationID());
@@ -140,13 +163,17 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
                            kInitiateRebuildMessage);
   free(proto_bytes);
 
-  LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
+  LOG(INFO) << "QueryManagerDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
             << "') to Shiftboss";
   // TODO(zuyu): Multiple workers support.
-  QueryExecutionUtil::SendTMBMessage(bus_,
-                                     foreman_client_id_,
-                                     shiftbosses_->getClientId(0),
-                                     move(tagged_msg));
+  const tmb::MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         foreman_client_id_,
+                                         shiftboss_directory_->getClientId(0),
+                                         move(tagged_msg));
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+      << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+      << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
 
   // The negative value indicates that the number of rebuild work orders is to be
   // determined.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 8641c22..9a3f44b 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -15,6 +15,7 @@
 #ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
 #define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
 
+#include <cstddef>
 #include <memory>
 
 #include "query_execution/QueryExecutionState.hpp"
@@ -47,12 +48,12 @@ class QueryManagerDistributed final : public QueryManagerBase {
    * @brief Constructor.
    *
    * @param query_handle The QueryHandle object for this query.
-   * @param shiftbosses The ShiftbossDirectory to use.
+   * @param shiftboss_directory The ShiftbossDirectory to use.
    * @param foreman_client_id The TMB client ID of the foreman thread.
    * @param bus The TMB used for communication.
    **/
   QueryManagerDistributed(QueryHandle *query_handle,
-                          ShiftbossDirectory *shiftbosses,
+                          ShiftbossDirectory *shiftboss_directory,
                           const tmb::client_id foreman_client_id,
                           tmb::MessageBus *bus);
 
@@ -60,6 +61,9 @@ class QueryManagerDistributed final : public QueryManagerBase {
 
   bool fetchNormalWorkOrders(const dag_node_index index) override;
 
+  void processInitiateRebuildResponseMessage(const dag_node_index op_index,
+                                             const std::size_t num_rebuild_work_orders) override;
+
  /**
    * @brief Get the next normal workorder to be excuted, wrapped in a
    *        WorkOrderMessage proto.
@@ -88,7 +92,7 @@ class QueryManagerDistributed final : public QueryManagerBase {
            (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
   }
 
-  ShiftbossDirectory *shiftbosses_;
+  ShiftbossDirectory *shiftboss_directory_;
 
   const tmb::client_id foreman_client_id_;
   tmb::MessageBus *bus_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 7f655c6..925dc1f 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -113,10 +113,14 @@ void Shiftboss::run() {
                   << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
                   << "') from Foreman to worker " << worker_index;
 
-        QueryExecutionUtil::SendTMBMessage(bus_,
-                                           shiftboss_client_id_,
-                                           workers_->getClientID(worker_index),
-                                           move(worker_tagged_message));
+        const tmb::MessageBus::SendStatus send_status =
+            QueryExecutionUtil::SendTMBMessage(bus_,
+                                               shiftboss_client_id_,
+                                               workers_->getClientID(worker_index),
+                                               move(worker_tagged_message));
+        DCHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+            << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+            << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
         break;
       }
       case kInitiateRebuildMessage: {
@@ -143,10 +147,14 @@ void Shiftboss::run() {
                   << "' message from worker (client " << annotated_message.sender << ") to Foreman";
 
         DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
-        QueryExecutionUtil::SendTMBMessage(bus_,
-                                           shiftboss_client_id_,
-                                           foreman_client_id_,
-                                           move(annotated_message.tagged_message));
+        const tmb::MessageBus::SendStatus send_status =
+            QueryExecutionUtil::SendTMBMessage(bus_,
+                                               shiftboss_client_id_,
+                                               foreman_client_id_,
+                                               move(annotated_message.tagged_message));
+        DCHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+            << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+            << " to Foreman with TMB client ID " << foreman_client_id_;
         break;
       }
       case kSaveQueryResultMessage: {
@@ -167,8 +175,11 @@ void Shiftboss::run() {
           }
         }
 
+        query_contexts_.erase(proto.query_id());
+
         serialization::SaveQueryResultResponseMessage proto_response;
         proto_response.set_relation_id(proto.relation_id());
+        proto_response.set_cli_id(proto.cli_id());
 
         const size_t proto_response_length = proto_response.ByteSize();
         char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
@@ -182,10 +193,14 @@ void Shiftboss::run() {
         LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
                   << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
                   << "') to Foreman";
-        QueryExecutionUtil::SendTMBMessage(bus_,
-                                           shiftboss_client_id_,
-                                           foreman_client_id_,
-                                           move(message_response));
+        const tmb::MessageBus::SendStatus send_status =
+            QueryExecutionUtil::SendTMBMessage(bus_,
+                                               shiftboss_client_id_,
+                                               foreman_client_id_,
+                                               move(message_response));
+        DCHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+            << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+            << " to Foreman with TMB client ID " << foreman_client_id_;
         break;
       }
       case kPoisonMessage: {
@@ -196,7 +211,7 @@ void Shiftboss::run() {
         tmb::MessageStyle broadcast_style;
         broadcast_style.Broadcast(true);
 
-        tmb::MessageBus::SendStatus send_status =
+        const tmb::MessageBus::SendStatus send_status =
             bus_->Send(shiftboss_client_id_,
                        worker_addresses_,
                        broadcast_style,
@@ -249,7 +264,7 @@ void Shiftboss::registerWithForeman() {
                         kShiftbossRegistrationMessage);
   free(proto_bytes);
 
-  tmb::MessageBus::SendStatus send_status =
+  const tmb::MessageBus::SendStatus send_status =
       bus_->Send(shiftboss_client_id_, all_addresses, style, move(message));
   DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
@@ -268,10 +283,6 @@ void Shiftboss::processQueryInitiateMessage(
                        bus_));
   query_contexts_.emplace(query_id, move(query_context));
 
-  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-            << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
-            << "') to Foreman";
-
   serialization::QueryInitiateResponseMessage proto;
   proto.set_query_id(query_id);
 
@@ -284,10 +295,18 @@ void Shiftboss::processQueryInitiateMessage(
                                  kQueryInitiateResponseMessage);
   free(proto_bytes);
 
-  QueryExecutionUtil::SendTMBMessage(bus_,
-                                     shiftboss_client_id_,
-                                     foreman_client_id_,
-                                     move(message_response));
+  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+            << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
+            << "') to Foreman";
+
+  const tmb::MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         shiftboss_client_id_,
+                                         foreman_client_id_,
+                                         move(message_response));
+  DCHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+      << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+      << " to Foreman with TMB client ID " << foreman_client_id_;
 }
 
 void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
@@ -311,6 +330,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
   proto.set_query_id(query_id);
   proto.set_operator_index(op_index);
   proto.set_num_rebuild_work_orders(partially_filled_block_refs.size());
+  // TODO(zuyu): Multiple Shiftboss support.
+  proto.set_shiftboss_index(0);
 
   const size_t proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(malloc(proto_length));
@@ -321,10 +342,14 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                  kInitiateRebuildResponseMessage);
   free(proto_bytes);
 
-  QueryExecutionUtil::SendTMBMessage(bus_,
-                                     shiftboss_client_id_,
-                                     foreman_client_id_,
-                                     move(message_response));
+  const tmb::MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         shiftboss_client_id_,
+                                         foreman_client_id_,
+                                         move(message_response));
+  DCHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+      << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+      << " to Foreman with TMB client ID " << foreman_client_id_;
 
   for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
     // NOTE(zuyu): Worker releases the memory after the execution of
@@ -349,10 +374,14 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
               << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
               << "') to worker " << worker_index;
 
-    QueryExecutionUtil::SendTMBMessage(bus_,
-                                       shiftboss_client_id_,
-                                       workers_->getClientID(worker_index),
-                                       move(worker_tagged_message));
+    const tmb::MessageBus::SendStatus send_status =
+        QueryExecutionUtil::SendTMBMessage(bus_,
+                                           shiftboss_client_id_,
+                                           workers_->getClientID(worker_index),
+                                           move(worker_tagged_message));
+    DCHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+        << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+        << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index a56b714..b6b97a0 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -212,6 +212,10 @@ target_link_libraries(quickstep_queryoptimizer_QueryHandle
                       quickstep_queryexecution_QueryContext_proto
                       quickstep_queryoptimizer_QueryPlan
                       quickstep_utility_Macros)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryoptimizer_QueryHandle
+                        tmb)
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryoptimizer_QueryPlan
                       quickstep_relationaloperators_RelationalOperator
                       quickstep_utility_DAG

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 5f3649a..bbf1918 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -24,9 +24,14 @@
 
 #include "catalog/Catalog.pb.h"
 #include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "query_optimizer/QueryPlan.hpp"
 #include "utility/Macros.hpp"
 
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "tmb/id_typedefs.h"
+#endif  // QUICKSTEP_DISTRIBUTED
+
 namespace quickstep {
 
 class CatalogRelation;
@@ -119,6 +124,22 @@ class QueryHandle {
     query_result_relation_ = relation;
   }
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Get the client id.
+   */
+  tmb::client_id getClientId() const {
+    return cli_id_;
+  }
+
+  /**
+   * @brief Set the client id.
+   */
+  void setClientId(const tmb::client_id cli_id) {
+    cli_id_ = cli_id;
+  }
+#endif  // QUICKSTEP_DISTRIBUTED
+
  private:
   const std::size_t query_id_;
   const std::uint64_t query_priority_;
@@ -134,6 +155,11 @@ class QueryHandle {
   //             and deleted by the Cli shell.
   const CatalogRelation *query_result_relation_;
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  // The client id of the CLI which sends the query.
+  tmb::client_id cli_id_;
+#endif  // QUICKSTEP_DISTRIBUTED
+
   DISALLOW_COPY_AND_ASSIGN(QueryHandle);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index 9cad47f..6522117 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -78,6 +78,14 @@ target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader
                       quickstep_utility_Macros
                       tmb)
 
+if (ENABLE_DISTRIBUTED)
+  add_executable(quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest
+                 DistributedExecutionGeneratorTest.cpp
+                 DistributedExecutionGeneratorTestRunner.cpp
+                 DistributedExecutionGeneratorTestRunner.hpp
+                 "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
+                 "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
+endif(ENABLE_DISTRIBUTED)
 add_executable(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
                ExecutionGeneratorTest.cpp
                ExecutionGeneratorTestRunner.cpp
@@ -107,6 +115,39 @@ add_executable(quickstep_queryoptimizer_tests_OptimizerTextTest
                "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
                "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
 
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest
+                        glog
+                        gtest
+                        gtest_main
+                        quickstep_catalog_CatalogDatabase
+                        quickstep_catalog_CatalogTypedefs
+                        quickstep_cli_DropRelation
+                        quickstep_cli_PrintToScreen
+                        quickstep_parser_ParseStatement
+                        quickstep_parser_SqlParserWrapper
+                        quickstep_queryexecution_ForemanDistributed
+                        quickstep_queryexecution_QueryContext
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryexecution_Shiftboss
+                        quickstep_queryexecution_Worker
+                        quickstep_queryexecution_WorkerDirectory
+                        quickstep_queryexecution_WorkerMessage
+                        quickstep_queryoptimizer_ExecutionGenerator
+                        quickstep_queryoptimizer_LogicalGenerator
+                        quickstep_queryoptimizer_OptimizerContext
+                        quickstep_queryoptimizer_PhysicalGenerator
+                        quickstep_queryoptimizer_QueryHandle
+                        quickstep_queryoptimizer_physical_Physical
+                        quickstep_queryoptimizer_tests_TestDatabaseLoader
+                        quickstep_utility_Macros
+                        quickstep_utility_MemStream
+                        quickstep_utility_SqlError
+                        quickstep_utility_TextBasedTestDriver
+                        tmb
+                        ${LIBS})
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
                       ${GFLAGS_LIB_NAME}
                       glog

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
new file mode 100644
index 0000000..fc0c67d
--- /dev/null
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
@@ -0,0 +1,57 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include <fstream>
+#include <memory>
+
+#include "query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp"
+#include "utility/textbased_test/TextBasedTestDriver.hpp"
+#include "utility/textbased_test/TextBasedTest.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+using quickstep::TextBasedTest;
+
+QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_EXECUTION_GENERATOR_TEST);
+
+int main(int argc, char** argv) {
+  google::InitGoogleLogging(argv[0]);
+  // Honor FLAGS_buffer_pool_slots in StorageManager.
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  if (argc < 4) {
+    LOG(ERROR) << "Must have at least 3 arguments, but " << argc - 1
+               << " are provided";
+  }
+
+  std::ifstream input_file(argv[1]);
+  CHECK(input_file.is_open()) << argv[1];
+  std::unique_ptr<quickstep::optimizer::DistributedExecutionGeneratorTestRunner>
+      test_runner(
+          new quickstep::optimizer::DistributedExecutionGeneratorTestRunner(argv[3]));
+  test_driver.reset(
+      new quickstep::TextBasedTestDriver(&input_file, test_runner.get()));
+  test_driver->registerOption(
+      quickstep::optimizer::DistributedExecutionGeneratorTestRunner::kResetOption);
+
+  ::testing::InitGoogleTest(&argc, argv);
+  const int success = RUN_ALL_TESTS();
+  if (success != 0) {
+    test_driver->writeActualOutputToFile(argv[2]);
+  }
+
+  return success;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
new file mode 100644
index 0000000..ffed4f0
--- /dev/null
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -0,0 +1,122 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp"
+
+#include <cstdio>
+#include <set>
+#include <string>
+
+#include "cli/DropRelation.hpp"
+#include "cli/PrintToScreen.hpp"
+#include "parser/ParseStatement.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_optimizer/ExecutionGenerator.hpp"
+#include "query_optimizer/LogicalGenerator.hpp"
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/PhysicalGenerator.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "utility/MemStream.hpp"
+#include "utility/SqlError.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+
+class CatalogRelation;
+
+namespace optimizer {
+
+const char *DistributedExecutionGeneratorTestRunner::kResetOption =
+    "reset_before_execution";
+
+void DistributedExecutionGeneratorTestRunner::runTestCase(
+    const std::string &input, const std::set<std::string> &options,
+    std::string *output) {
+  // TODO(qzeng): Test multi-threaded query execution when we have a Sort operator.
+
+  VLOG(4) << "Test SQL(s): " << input;
+
+  if (options.find(kResetOption) != options.end()) {
+    test_database_loader_.clear();
+    test_database_loader_.createTestRelation(false /* allow_vchar */);
+    test_database_loader_.loadTestRelation();
+  }
+
+  MemStream output_stream;
+  sql_parser_.feedNextBuffer(new std::string(input));
+
+  while (true) {
+    ParseResult result = sql_parser_.getNextStatement();
+
+    OptimizerContext optimizer_context(query_id_++,
+                                       test_database_loader_.catalog_database(),
+                                       test_database_loader_.storage_manager());
+
+    if (result.condition != ParseResult::kSuccess) {
+      if (result.condition == ParseResult::kError) {
+        *output = result.error_message;
+      }
+      break;
+    }
+
+    std::printf("%s\n", result.parsed_statement->toString().c_str());
+    try {
+      QueryHandle query_handle(optimizer_context.query_id());
+      LogicalGenerator logical_generator(&optimizer_context);
+      PhysicalGenerator physical_generator;
+      ExecutionGenerator execution_generator(&optimizer_context,
+                                             &query_handle);
+
+      const physical::PhysicalPtr physical_plan =
+          physical_generator.generatePlan(
+              logical_generator.generatePlan(*result.parsed_statement));
+      execution_generator.generatePlan(physical_plan);
+
+      QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+          cli_id_,
+          foreman_->getBusClientID(),
+          &query_handle,
+          &bus_);
+
+      const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true);
+      DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type());
+
+      const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
+      if (query_result_relation) {
+          PrintToScreen::PrintRelation(*query_result_relation,
+                                       test_database_loader_.storage_manager(),
+                                       output_stream.file());
+          DropRelation::Drop(*query_result_relation,
+                             test_database_loader_.catalog_database(),
+                             test_database_loader_.storage_manager());
+      }
+    } catch (const SqlError &error) {
+      *output = error.formatMessage(input);
+      break;
+    }
+  }
+
+  if (output->empty()) {
+    *output = output_stream.str();
+  }
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
new file mode 100644
index 0000000..cd59596
--- /dev/null
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -0,0 +1,146 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_TESTS_DISTRIBUTED_EXECUTION_GENERATOR_TEST_RUNNER_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_TESTS_DISTRIBUTED_EXECUTION_GENERATOR_TEST_RUNNER_HPP_
+
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "utility/Macros.hpp"
+#include "utility/textbased_test/TextBasedTestDriver.hpp"
+#include "utility/textbased_test/TextBasedTestRunner.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+namespace optimizer {
+
+/**
+ * @brief TextBasedTestRunner for testing the ExecutionGenerator in the
+ *        distributed version.
+ */
+class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner {
+ public:
+  /**
+   * @brief If this option is enabled, recreate the entire database and
+   *        repopulate the data before every test.
+   */
+  static const char *kResetOption;
+
+  /**
+   * @brief Constructor.
+   */
+  explicit DistributedExecutionGeneratorTestRunner(const std::string &storage_path)
+      : query_id_(0),
+        test_database_loader_(storage_path) {
+    test_database_loader_.createTestRelation(false /* allow_vchar */);
+    test_database_loader_.loadTestRelation();
+
+    bus_.Initialize();
+
+    // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
+    // could receive a registration message from the latter.
+    foreman_.reset(new ForemanDistributed(&bus_, test_database_loader_.catalog_database()));
+
+    worker_.reset(new Worker(0 /* worker_thread_index */, &bus_));
+
+    std::vector<tmb::client_id> worker_client_ids;
+    worker_client_ids.push_back(worker_->getBusClientID());
+
+    // We don't use the NUMA aware version of worker code.
+    const std::vector<numa_node_id> numa_nodes(worker_client_ids.size(), kAnyNUMANodeID);
+
+    workers_.reset(
+        new WorkerDirectory(worker_client_ids.size(), worker_client_ids, numa_nodes));
+
+    shiftboss_.reset(new Shiftboss(&bus_, test_database_loader_.storage_manager(), workers_.get()));
+
+    cli_id_ = bus_.Connect();
+    bus_.RegisterClientAsSender(cli_id_, kAdmitRequestMessage);
+    bus_.RegisterClientAsSender(cli_id_, kPoisonMessage);
+    bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+
+    foreman_->start();
+
+    shiftboss_->start();
+    worker_->start();
+  }
+
+  ~DistributedExecutionGeneratorTestRunner() {
+    std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
+    tmb::TaggedMessage poison_tagged_message(poison_message.get(),
+                                             sizeof(*poison_message),
+                                             quickstep::kPoisonMessage);
+
+    const tmb::MessageBus::SendStatus send_status =
+        QueryExecutionUtil::SendTMBMessage(
+            &bus_,
+            cli_id_,
+            foreman_->getBusClientID(),
+            std::move(poison_tagged_message));
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+
+    worker_->join();
+    shiftboss_->join();
+
+    foreman_->join();
+  }
+
+  void runTestCase(const std::string &input,
+                   const std::set<std::string> &options,
+                   std::string *output) override;
+
+ private:
+  std::size_t query_id_;
+
+  SqlParserWrapper sql_parser_;
+  TestDatabaseLoader test_database_loader_;
+
+  MessageBusImpl bus_;
+
+  tmb::client_id cli_id_;
+
+  std::unique_ptr<ForemanDistributed> foreman_;
+
+  std::unique_ptr<Worker> worker_;
+  std::unique_ptr<WorkerDirectory> workers_;
+
+  std::unique_ptr<Shiftboss> shiftboss_;
+
+  DISALLOW_COPY_AND_ASSIGN(DistributedExecutionGeneratorTestRunner);
+};
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_TESTS_DISTRIBUTED_EXECUTION_GENERATOR_TEST_RUNNER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/tests/execution_generator/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt
index 56bae16..cd0e626 100644
--- a/query_optimizer/tests/execution_generator/CMakeLists.txt
+++ b/query_optimizer/tests/execution_generator/CMakeLists.txt
@@ -13,6 +13,61 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_create
+  "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/Create.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_delete
+         "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/Delete.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/Delete.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_distinct
+         "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/Distinct.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/Distinct.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_drop
+         "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/Drop.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/Drop.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_index
+         "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/Index.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/Index.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_insert
+         "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/Insert.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/Insert.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_join
+         "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/Join.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/Join.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_select
+         "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/Select.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/Select.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_stringpatternmatching
+         "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/StringPatternMatching.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatching.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_tablegenerator
+         "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/TableGenerator.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/TableGenerator.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_update
+         "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/Update.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/Update.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate/")
 add_test(quickstep_queryoptimizer_tests_executiongenerator_create
          "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
          "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
@@ -74,6 +129,17 @@ add_test(quickstep_queryoptimizer_tests_executiongenerator_update
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Create)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Delete)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Distinct)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Drop)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Index)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Insert)
@@ -81,4 +147,4 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Join)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Select)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatching)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/TableGenerator)
-file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)
\ No newline at end of file
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/third_party/tmb/include/tmb/tagged_message.h
----------------------------------------------------------------------
diff --git a/third_party/tmb/include/tmb/tagged_message.h b/third_party/tmb/include/tmb/tagged_message.h
index 49dcee7..75b980e 100644
--- a/third_party/tmb/include/tmb/tagged_message.h
+++ b/third_party/tmb/include/tmb/tagged_message.h
@@ -63,6 +63,15 @@ class TaggedMessage {
   }
 
   /**
+   * @brief Constructor which creates an empty, typed message.
+   **/
+  explicit TaggedMessage(const message_type_id message_type)
+      : payload_inline_(true),
+        message_type_(message_type) {
+    payload_.in_line.size = 0;
+  }
+
+  /**
    * @brief Constructor.
    *
    * @param msg A pointer to the message contents in memory, which will be



Mime
View raw message