quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zu...@apache.org
Subject incubator-quickstep git commit: Added command support in the distributed version.
Date Wed, 01 Mar 2017 02:53:40 GMT
Repository: incubator-quickstep
Updated Branches:
  refs/heads/dist-cmd [created] 2a6b5d375


Added command support in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2a6b5d37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2a6b5d37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2a6b5d37

Branch: refs/heads/dist-cmd
Commit: 2a6b5d375b5081673dd5b5648fd1a1601fa3b2b6
Parents: 7550f83
Author: Zuyu Zhang <zuyuz@apache.org>
Authored: Mon Feb 27 00:30:43 2017 -0800
Committer: Zuyu Zhang <zuyuz@apache.org>
Committed: Tue Feb 28 18:53:20 2017 -0800

----------------------------------------------------------------------
 CMakeLists.txt                                  |   2 +
 cli/distributed/CMakeLists.txt                  |   3 +
 cli/distributed/Cli.cpp                         |  82 +++++--
 cli/distributed/Conductor.cpp                   |  81 +++++-
 cli/distributed/Conductor.hpp                   |   3 +
 cli/tests/CMakeLists.txt                        |  41 ++++
 cli/tests/DistributedCommandExecutorTest.cpp    |  62 +++++
 .../DistributedCommandExecutorTestRunner.cpp    | 246 +++++++++++++++++++
 .../DistributedCommandExecutorTestRunner.hpp    |  99 ++++++++
 cli/tests/command_executor/CMakeLists.txt       |  18 ++
 query_execution/QueryExecutionMessages.proto    |   8 +
 query_execution/QueryExecutionTypedefs.hpp      |  10 +-
 12 files changed, 631 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a6b5d37/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 918069c..9cd02be 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -817,6 +817,7 @@ if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_distributed_cli_shell
                         glog
                         quickstep_catalog_CatalogRelation
+                        quickstep_cli_Constants
                         quickstep_cli_Flags
                         quickstep_cli_LineReader
                         quickstep_cli_PrintToScreen
@@ -833,6 +834,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_storage_StorageBlockInfo
                         quickstep_storage_StorageManager
                         quickstep_utility_Macros
+                        quickstep_utility_SqlError
                         quickstep_utility_StringUtil
                         tmb
                         ${GFLAGS_LIB_NAME}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a6b5d37/cli/distributed/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
index 5804321..1f7dee0 100644
--- a/cli/distributed/CMakeLists.txt
+++ b/cli/distributed/CMakeLists.txt
@@ -26,6 +26,8 @@ add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp)
 target_link_libraries(quickstep_cli_distributed_Conductor
                       glog
                       quickstep_catalog_CatalogDatabase
+                      quickstep_cli_CommandExecutorUtil
+                      quickstep_cli_Constants
                       quickstep_cli_DefaultsConfigurator
                       quickstep_cli_Flags
                       quickstep_cli_distributed_Role
@@ -41,6 +43,7 @@ target_link_libraries(quickstep_cli_distributed_Conductor
                       quickstep_storage_StorageConstants
                       quickstep_utility_Macros
                       quickstep_utility_SqlError
+                      quickstep_utility_StringUtil
                       tmb)
 target_link_libraries(quickstep_cli_distributed_Executor
                       glog

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a6b5d37/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 6228898..89ea0be 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -30,6 +30,7 @@
 
 #include "catalog/CatalogRelation.hpp"
 #include "cli/CliConfig.h"  // For QUICKSTEP_USE_LINENOISE.
+#include "cli/Constants.hpp"
 #include "cli/Flags.hpp"
 
 #ifdef QUICKSTEP_USE_LINENOISE
@@ -49,6 +50,7 @@ typedef quickstep::LineReaderDumb LineReaderImpl;
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "storage/DataExchangerAsync.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/SqlError.hpp"
 #include "utility/StringUtil.hpp"
 
 #include "tmb/address.h"
@@ -76,6 +78,7 @@ using tmb::client_id;
 
 namespace quickstep {
 
+namespace C = cli;
 namespace S = serialization;
 
 void Cli::init() {
@@ -127,6 +130,10 @@ void Cli::init() {
   bus_.RegisterClientAsSender(cli_id_, kQueryResultTeardownMessage);
 
   bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
+
+  // Prepare for submitting a command.
+  bus_.RegisterClientAsSender(cli_id_, kCommandMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kCommandResponseMessage);
 }
 
 void Cli::run() {
@@ -158,27 +165,55 @@ void Cli::run() {
           break;
         }
 
-        CHECK_NE(statement.getStatementType(), ParseStatement::kCommand)
-            << "TODO(quickstep-team)";
-
-        DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
-                   << "') to Conductor";
-        S::SqlQueryMessage proto;
-        proto.set_sql_query(*command_string);
-
-        const size_t proto_length = proto.ByteSize();
-        char *proto_bytes = static_cast<char*>(malloc(proto_length));
-        CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-        TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes),
-                                        proto_length,
-                                        kSqlQueryMessage);
-        free(proto_bytes);
+        if (statement.getStatementType() == ParseStatement::kCommand) {
+          const ParseCommand &command = static_cast<const ParseCommand &>(statement);
+          const std::string &command_str = command.command()->value();
+          try {
+            if (command_str == C::kAnalyzeCommand) {
+              THROW_SQL_ERROR_AT(command.command()) << "Unsupported Command";
+            } else if (command_str != C::kDescribeDatabaseCommand &&
+                       command_str != C::kDescribeTableCommand) {
+              THROW_SQL_ERROR_AT(command.command()) << "Invalid Command";
+            }
+          } catch (const SqlError &sql_error) {
+            fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str());
+            reset_parser = true;
+            break;
+          }
 
-        QueryExecutionUtil::SendTMBMessage(&bus_,
-                                           cli_id_,
-                                           conductor_client_id_,
-                                           move(sql_query_message));
+          DLOG(INFO) << "DistributedCli sent CommandMessage (typed '" << kCommandMessage
+                     << "') to Conductor";
+          S::CommandMessage proto;
+          proto.set_command(*command_string);
+
+          const size_t proto_length = proto.ByteSize();
+          char *proto_bytes = static_cast<char*>(malloc(proto_length));
+          CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+          TaggedMessage command_message(static_cast<const void*>(proto_bytes), proto_length,
kCommandMessage);
+          free(proto_bytes);
+
+          QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(command_message));
+        } else {
+          DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
+                     << "') to Conductor";
+          S::SqlQueryMessage proto;
+          proto.set_sql_query(*command_string);
+
+          const size_t proto_length = proto.ByteSize();
+          char *proto_bytes = static_cast<char*>(malloc(proto_length));
+          CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+          TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes),
+                                          proto_length,
+                                          kSqlQueryMessage);
+          free(proto_bytes);
+
+          QueryExecutionUtil::SendTMBMessage(&bus_,
+                                             cli_id_,
+                                             conductor_client_id_,
+                                             move(sql_query_message));
+        }
 
         start = std::chrono::steady_clock::now();
 
@@ -187,6 +222,13 @@ void Cli::run() {
         DLOG(INFO) << "DistributedCli received typed '" << tagged_message.message_type()
                    << "' message from client " << annotated_message.sender;
         switch (tagged_message.message_type()) {
+          case kCommandResponseMessage: {
+            S::CommandResponseMessage proto;
+            CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+            printf("%s", proto.command_response().c_str());
+            break;
+          }
           case kQueryExecutionSuccessMessage: {
             end = std::chrono::steady_clock::now();
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a6b5d37/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index 3c68bfb..b877b04 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -29,6 +29,8 @@
 #include <utility>
 
 #include "catalog/CatalogDatabase.hpp"
+#include "cli/CommandExecutorUtil.hpp"
+#include "cli/Constants.hpp"
 #include "cli/DefaultsConfigurator.hpp"
 #include "cli/Flags.hpp"
 #include "parser/ParseStatement.hpp"
@@ -42,6 +44,7 @@
 #include "query_optimizer/QueryProcessor.hpp"
 #include "storage/StorageConstants.hpp"
 #include "utility/SqlError.hpp"
+#include "utility/StringUtil.hpp"
 
 #include "tmb/id_typedefs.h"
 #include "tmb/native_net_client_message_bus.h"
@@ -63,6 +66,7 @@ using tmb::client_id;
 
 namespace quickstep {
 
+namespace C = cli;
 namespace S = serialization;
 
 void Conductor::init() {
@@ -91,6 +95,9 @@ void Conductor::init() {
   bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage);
 
+  bus_.RegisterClientAsReceiver(conductor_client_id_, kCommandMessage);
+  bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage);
+
   bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
@@ -125,6 +132,14 @@ void Conductor::run() {
             QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
         break;
       }
+      case kCommandMessage: {
+        S::CommandMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+        DLOG(INFO) << "Conductor received the following command: " << proto.command();
+
+        processCommandMessage(sender, new string(move(proto.command())));
+        break;
+      }
       case kSqlQueryMessage: {
         S::SqlQueryMessage proto;
         CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -146,6 +161,69 @@ void Conductor::run() {
   }
 }
 
+void Conductor::processCommandMessage(const tmb::client_id sender, string *command_string)
{
+  parser_wrapper_.feedNextBuffer(command_string);
+  ParseResult parse_result = parser_wrapper_.getNextStatement();
+
+  CHECK(parse_result.condition == ParseResult::kSuccess)
+      << "Any syntax error should be addressed in the DistributedCli.";
+
+  const ParseStatement &statement = *parse_result.parsed_statement;
+  DCHECK_EQ(ParseStatement::kCommand, statement.getStatementType());
+
+  const ParseCommand &command = static_cast<const ParseCommand &>(statement);
+  const PtrVector<ParseString> &arguments = *(command.arguments());
+  const string &command_str = command.command()->value();
+
+  string command_response;
+
+  try {
+    if (command_str == C::kDescribeDatabaseCommand) {
+      command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
+    } else if (command_str == C::kDescribeTableCommand) {
+      if (arguments.empty()) {
+        command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
+      } else {
+        command_response = C::ExecuteDescribeTable(arguments, *catalog_database_);
+      }
+    }
+  } catch (const SqlError &command_error) {
+    // Set the query execution status along with the error message.
+    S::QueryExecutionErrorMessage proto;
+    proto.set_error_message(command_error.formatMessage(*command_string));
+
+    const size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kQueryExecutionErrorMessage);
+    free(proto_bytes);
+
+    DLOG(INFO) << "Conductor sent QueryExecutionErrorMessage (typed '"
+               << kQueryExecutionErrorMessage
+               << "') to Distributed CLI " << sender;
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+  }
+
+  S::CommandResponseMessage proto;
+  proto.set_command_response(command_response);
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
+  free(proto_bytes);
+
+  DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage
+             << "') to Distributed CLI " << sender;
+  CHECK(MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+}
+
 void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string)
{
   parser_wrapper_.feedNextBuffer(command_string);
   ParseResult parse_result = parser_wrapper_.getNextStatement();
@@ -154,8 +232,7 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string
*comm
       << "Any SQL syntax error should be addressed in the DistributedCli.";
 
   const ParseStatement &statement = *parse_result.parsed_statement;
-  CHECK(statement.getStatementType() != ParseStatement::kCommand)
-     << "TODO(quickstep-team)";
+  DCHECK_NE(ParseStatement::kCommand, statement.getStatementType());
 
   try {
     auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a6b5d37/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index 09bf2b9..e7e003f 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -35,6 +35,7 @@
 namespace quickstep {
 
 class CatalogDatabase;
+class ParseCommand;
 
 /** \addtogroup CliDistributed
  *  @{
@@ -60,6 +61,8 @@ class Conductor final : public Role {
   void run() override;
 
  private:
+  void processCommandMessage(const tmb::client_id sender, std::string *command_string);
+
   void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
 
   SqlParserWrapper parser_wrapper_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a6b5d37/cli/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/tests/CMakeLists.txt b/cli/tests/CMakeLists.txt
index 48f27bb..7f8150f 100644
--- a/cli/tests/CMakeLists.txt
+++ b/cli/tests/CMakeLists.txt
@@ -23,6 +23,14 @@ add_executable(quickstep_cli_tests_CommandExecutorTest
                CommandExecutorTestRunner.hpp
                "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
                "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
+if (ENABLE_DISTRIBUTED)
+  add_executable(quickstep_cli_tests_DistributedCommandExecutorTest
+                 DistributedCommandExecutorTest.cpp
+                 DistributedCommandExecutorTestRunner.cpp
+                 DistributedCommandExecutorTestRunner.hpp
+                 "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
+                 "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
+endif(ENABLE_DISTRIBUTED)
 
 target_link_libraries(quickstep_cli_tests_CommandExecutorTest
                       glog
@@ -49,3 +57,36 @@ target_link_libraries(quickstep_cli_tests_CommandExecutorTest
                       quickstep_utility_TextBasedTestDriver
                       tmb
                       ${LIBS})
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_cli_tests_DistributedCommandExecutorTest
+                        glog
+                        gtest
+                        quickstep_catalog_CatalogTypedefs
+                        quickstep_cli_CommandExecutorUtil
+                        quickstep_cli_Constants
+                        quickstep_cli_DropRelation
+                        quickstep_cli_PrintToScreen
+                        quickstep_parser_ParseStatement
+                        quickstep_parser_SqlParserWrapper
+                        quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_BlockLocatorUtil
+                        quickstep_queryexecution_ForemanDistributed
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryexecution_Shiftboss
+                        quickstep_queryexecution_Worker
+                        quickstep_queryexecution_WorkerDirectory
+                        quickstep_queryoptimizer_Optimizer
+                        quickstep_queryoptimizer_OptimizerContext
+                        quickstep_queryoptimizer_QueryHandle
+                        quickstep_queryoptimizer_tests_TestDatabaseLoader
+                        quickstep_storage_DataExchangerAsync
+                        quickstep_storage_StorageManager
+                        quickstep_utility_Macros
+                        quickstep_utility_MemStream
+                        quickstep_utility_SqlError
+                        quickstep_utility_TextBasedTestDriver
+                        tmb
+                        ${GFLAGS_LIB_NAME}
+                        ${LIBS})
+endif(ENABLE_DISTRIBUTED)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a6b5d37/cli/tests/DistributedCommandExecutorTest.cpp
----------------------------------------------------------------------
diff --git a/cli/tests/DistributedCommandExecutorTest.cpp b/cli/tests/DistributedCommandExecutorTest.cpp
new file mode 100644
index 0000000..b41a70f
--- /dev/null
+++ b/cli/tests/DistributedCommandExecutorTest.cpp
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include <iostream>
+#include <fstream>
+#include <memory>
+
+#include "cli/tests/DistributedCommandExecutorTestRunner.hpp"
+#include "utility/textbased_test/TextBasedTestDriver.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+using quickstep::TextBasedTest;
+
+using std::make_unique;
+
+QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_COMMAND_EXECUTOR_TEST);
+
+int main(int argc, char** argv) {
+  google::InitGoogleLogging(argv[0]);
+  // Honor FLAGS_buffer_pool_slots in StorageManager.
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  if (argc < 4) {
+    LOG(ERROR) << "Must have at least 3 arguments, but " << argc - 1
+               << " are provided";
+  }
+
+  std::ifstream input_file(argv[1]);
+  CHECK(input_file.is_open()) << argv[1];
+
+  auto test_runner = make_unique<quickstep::DistributedCommandExecutorTestRunner>(argv[3]);
+  test_driver = make_unique<quickstep::TextBasedTestDriver>(&input_file, test_runner.get());
+  test_driver->registerOption(
+      quickstep::DistributedCommandExecutorTestRunner::kResetOption);
+
+  ::testing::InitGoogleTest(&argc, argv);
+  const int success = RUN_ALL_TESTS();
+  if (success != 0) {
+    test_driver->writeActualOutputToFile(argv[2]);
+  }
+
+  return success;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a6b5d37/cli/tests/DistributedCommandExecutorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/cli/tests/DistributedCommandExecutorTestRunner.cpp b/cli/tests/DistributedCommandExecutorTestRunner.cpp
new file mode 100644
index 0000000..66d0767
--- /dev/null
+++ b/cli/tests/DistributedCommandExecutorTestRunner.cpp
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/tests/DistributedCommandExecutorTestRunner.hpp"
+
+#include <cstdio>
+#include <functional>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "cli/CommandExecutorUtil.hpp"
+#include "cli/Constants.hpp"
+#include "cli/DropRelation.hpp"
+#include "cli/PrintToScreen.hpp"
+#include "parser/ParseStatement.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_optimizer/Optimizer.hpp"
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/MemStream.hpp"
+#include "utility/SqlError.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::make_unique;
+using std::string;
+using std::vector;
+
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+class CatalogRelation;
+
+namespace {
+
+void nop() {}
+
+}  // namespace
+
+namespace C = cli;
+
+const char *DistributedCommandExecutorTestRunner::kResetOption =
+    "reset_before_execution";
+
+DistributedCommandExecutorTestRunner::DistributedCommandExecutorTestRunner(const string &storage_path)
+    : query_id_(0) {
+  bus_.Initialize();
+
+  cli_id_ = bus_.Connect();
+  bus_.RegisterClientAsSender(cli_id_, kAdmitRequestMessage);
+  bus_.RegisterClientAsSender(cli_id_, kPoisonMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+
+  bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage);
+
+  block_locator_ = make_unique<BlockLocator>(&bus_);
+  block_locator_->start();
+
+  test_database_loader_ = make_unique<optimizer::TestDatabaseLoader>(
+      storage_path,
+      block_locator::getBlockDomain(
+          test_database_loader_data_exchanger_.network_address(), cli_id_, &locator_client_id_,
&bus_),
+      locator_client_id_,
+      &bus_);
+  DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
+  test_database_loader_data_exchanger_.set_storage_manager(test_database_loader_->storage_manager());
+  test_database_loader_data_exchanger_.start();
+
+  test_database_loader_->createTestRelation(false /* allow_vchar */);
+  test_database_loader_->loadTestRelation();
+
+  // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
+  // could receive a registration message from the latter.
+  foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&nop),
&bus_,
+                                             test_database_loader_->catalog_database());
+
+  // We don't use the NUMA aware version of worker code.
+  const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance
*/,
+                                        kAnyNUMANodeID);
+
+  bus_local_.Initialize();
+
+  worker_ = make_unique<Worker>(0 /* worker_thread_index */, &bus_local_);
+
+  const vector<tmb::client_id> worker_client_ids(1, worker_->getBusClientID());
+  worker_directory_ = make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids,
numa_nodes);
+
+  storage_manager_ = make_unique<StorageManager>(
+      storage_path,
+      block_locator::getBlockDomain(
+          data_exchanger_.network_address(), cli_id_, &locator_client_id_, &bus_),
+      locator_client_id_, &bus_);
+  DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
+
+  data_exchanger_.set_storage_manager(storage_manager_.get());
+  shiftboss_ =
+      make_unique<Shiftboss>(&bus_, &bus_local_, storage_manager_.get(), worker_directory_.get(),
+                             storage_manager_->hdfs());
+
+  foreman_->start();
+
+  data_exchanger_.start();
+  shiftboss_->start();
+  worker_->start();
+}
+
+DistributedCommandExecutorTestRunner::~DistributedCommandExecutorTestRunner() {
+  const tmb::MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, foreman_->getBusClientID(),
TaggedMessage(kPoisonMessage));
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+
+  worker_->join();
+  shiftboss_->join();
+
+  foreman_->join();
+
+  test_database_loader_data_exchanger_.shutdown();
+  test_database_loader_.reset();
+  data_exchanger_.shutdown();
+  storage_manager_.reset();
+
+  CHECK(MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, locator_client_id_, TaggedMessage(kPoisonMessage)));
+
+  test_database_loader_data_exchanger_.join();
+  data_exchanger_.join();
+  block_locator_->join();
+}
+
+void DistributedCommandExecutorTestRunner::runTestCase(
+    const string &input, const std::set<string> &options, string *output) {
+  // TODO(qzeng): Test multi-threaded query execution when we have a Sort operator.
+
+  VLOG(4) << "Test SQL(s): " << input;
+
+  if (options.find(kResetOption) != options.end()) {
+    test_database_loader_->clear();
+    test_database_loader_->createTestRelation(false /* allow_vchar */);
+    test_database_loader_->loadTestRelation();
+  }
+
+  MemStream output_stream;
+  sql_parser_.feedNextBuffer(new string(input));
+
+  while (true) {
+    ParseResult result = sql_parser_.getNextStatement();
+    if (result.condition != ParseResult::kSuccess) {
+      if (result.condition == ParseResult::kError) {
+        *output = result.error_message;
+      }
+      break;
+    }
+
+    const ParseStatement &parse_statement = *result.parsed_statement;
+    std::printf("%s\n", parse_statement.toString().c_str());
+
+    try {
+      if (parse_statement.getStatementType() == ParseStatement::kCommand) {
+        const ParseCommand &command = static_cast<const ParseCommand &>(parse_statement);
+        const PtrVector<ParseString> &arguments = *(command.arguments());
+        const string &command_str = command.command()->value();
+
+        string command_response;
+        if (command_str == C::kDescribeDatabaseCommand) {
+          command_response = C::ExecuteDescribeDatabase(arguments, *test_database_loader_->catalog_database());
+        } else if (command_str == C::kDescribeTableCommand) {
+          if (arguments.empty()) {
+            command_response = C::ExecuteDescribeDatabase(arguments, *test_database_loader_->catalog_database());
+          } else {
+            command_response = C::ExecuteDescribeTable(arguments, *test_database_loader_->catalog_database());
+          }
+        } else {
+          THROW_SQL_ERROR_AT(command.command()) << "Unsupported command";
+        }
+
+        std::fprintf(output_stream.file(), "%s", command_response.c_str());
+      } else {
+        optimizer::OptimizerContext optimizer_context;
+        auto query_handle = std::make_unique<QueryHandle>(query_id_++, cli_id_);
+
+        optimizer_.generateQueryHandle(parse_statement,
+                                       test_database_loader_->catalog_database(),
+                                       &optimizer_context,
+                                       query_handle.get());
+        const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
+
+        QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+            cli_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+
+        const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true);
+        DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type());
+
+        if (query_result_relation) {
+          PrintToScreen::PrintRelation(*query_result_relation,
+                                       test_database_loader_->storage_manager(),
+                                       output_stream.file());
+          DropRelation::Drop(*query_result_relation,
+                             test_database_loader_->catalog_database(),
+                             test_database_loader_->storage_manager());
+        }
+      }
+    } catch (const SqlError &error) {
+      *output = error.formatMessage(input);
+      break;
+    }
+  }
+
+  if (output->empty()) {
+    *output = output_stream.str();
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a6b5d37/cli/tests/DistributedCommandExecutorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/cli/tests/DistributedCommandExecutorTestRunner.hpp b/cli/tests/DistributedCommandExecutorTestRunner.hpp
new file mode 100644
index 0000000..0427a85
--- /dev/null
+++ b/cli/tests/DistributedCommandExecutorTestRunner.hpp
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_TESTS_DISTRIBUTED_COMMAND_EXECUTOR_TEST_RUNNER_HPP_
+#define QUICKSTEP_CLI_TESTS_DISTRIBUTED_COMMAND_EXECUTOR_TEST_RUNNER_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_optimizer/Optimizer.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/Macros.hpp"
+#include "utility/textbased_test/TextBasedTestRunner.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/**
+ * @brief TextBasedTestRunner for testing the CommandExecutor in the
+ *        distributed version.
+ */
+class DistributedCommandExecutorTestRunner : public TextBasedTestRunner {
+ public:
+  /**
+   * @brief If this option is enabled, recreate the entire database and
+   *        repopulate the data before every test.
+   */
+  static const char *kResetOption;
+
+  /**
+   * @brief Constructor.
+   */
+  explicit DistributedCommandExecutorTestRunner(const std::string &storage_path);
+
+  ~DistributedCommandExecutorTestRunner();
+
+  void runTestCase(const std::string &input,
+                   const std::set<std::string> &options,
+                   std::string *output) override;
+
+ private:
+  std::size_t query_id_;
+
+  SqlParserWrapper sql_parser_;
+  std::unique_ptr<optimizer::TestDatabaseLoader> test_database_loader_;
+  DataExchangerAsync test_database_loader_data_exchanger_;
+  optimizer::Optimizer optimizer_;
+
+  MessageBusImpl bus_;
+  tmb::client_id cli_id_, locator_client_id_;
+
+  std::unique_ptr<BlockLocator> block_locator_;
+
+  std::unique_ptr<ForemanDistributed> foreman_;
+
+  MessageBusImpl bus_local_;
+  std::unique_ptr<Worker> worker_;
+  std::unique_ptr<WorkerDirectory> worker_directory_;
+  DataExchangerAsync data_exchanger_;
+  std::unique_ptr<StorageManager> storage_manager_;
+  std::unique_ptr<Shiftboss> shiftboss_;
+
+  DISALLOW_COPY_AND_ASSIGN(DistributedCommandExecutorTestRunner);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CLI_TESTS_DISTRIBUTED_COMMAND_EXECUTOR_TEST_RUNNER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a6b5d37/cli/tests/command_executor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/tests/command_executor/CMakeLists.txt b/cli/tests/command_executor/CMakeLists.txt
index 9cf1869..e62d954 100644
--- a/cli/tests/command_executor/CMakeLists.txt
+++ b/cli/tests/command_executor/CMakeLists.txt
@@ -26,7 +26,25 @@ add_test(quickstep_cli_tests_commandexecutor_dt
          "${CMAKE_CURRENT_BINARY_DIR}/Dt.test"
          "${CMAKE_CURRENT_BINARY_DIR}/Dt/")
 
+if (ENABLE_DISTRIBUTED)
+  add_test(quickstep_cli_tests_commandexecutor_d_distributed
+           "../quickstep_cli_tests_DistributedCommandExecutorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/D.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DDistributed/")
+  add_test(quickstep_cli_tests_commandexecutor_dt_distributed
+           "../quickstep_cli_tests_DistributedCommandExecutorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Dt.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DtDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DtDistributed/")
+endif(ENABLE_DISTRIBUTED)
+
 # Create the folders where the unit tests will store their data blocks for the
 # duration of their test.
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/D)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Dt)
+
+if (ENABLE_DISTRIBUTED)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DtDistributed)
+endif(ENABLE_DISTRIBUTED)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a6b5d37/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 68f286d..47246d8 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -81,6 +81,10 @@ message ShiftbossRegistrationResponseMessage {
   required uint64 shiftboss_index = 1;
 }
 
+message CommandMessage {
+  required string command = 1;
+}
+
 message SqlQueryMessage {
   required string sql_query = 1;
 }
@@ -134,6 +138,10 @@ message SaveQueryResultResponseMessage {
   required uint64 shiftboss_index = 4;
 }
 
+message CommandResponseMessage {
+  required string command_response = 1;
+}
+
 message QueryExecutionSuccessMessage {
   optional CatalogRelationSchema result_relation = 1;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a6b5d37/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 994bd60..0fd0bdf 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -89,7 +89,11 @@ enum QueryExecutionMessageType : message_type_id {
                                           // Shiftboss to Worker.
   kDistributedCliRegistrationMessage,  // From CLI to Conductor.
   kDistributedCliRegistrationResponseMessage,  // From Conductor to CLI.
-  kSqlQueryMessage,  // From CLI to Conductor.
+
+  // From CLI to Conductor.
+  kCommandMessage,
+  kSqlQueryMessage,
+
   kQueryInitiateMessage,  // From Foreman to Shiftboss.
   kQueryInitiateResponseMessage,  // From Shiftboss to Foreman.
 
@@ -101,8 +105,10 @@ enum QueryExecutionMessageType : message_type_id {
   kSaveQueryResultMessage,  // From Foreman to Shiftboss.
   kSaveQueryResultResponseMessage,  // From Shiftboss to Foreman.
 
+  kQueryExecutionSuccessMessage,  // From Foreman to CLI.
+
   // From Foreman / Conductor to CLI.
-  kQueryExecutionSuccessMessage,
+  kCommandResponseMessage,
   kQueryExecutionErrorMessage,
 
   kQueryResultTeardownMessage,  // From CLI to Conductor.



Mime
View raw message