quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [1/2] incubator-quickstep git commit: Add socket cli support
Date Mon, 05 Nov 2018 07:22:35 GMT
Repository: incubator-quickstep
Updated Branches:
  refs/heads/trace-dev [created] 29024a3e9


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/cli/simple_socket/SimpleSocketServer.hpp
----------------------------------------------------------------------
diff --git a/cli/simple_socket/SimpleSocketServer.hpp b/cli/simple_socket/SimpleSocketServer.hpp
new file mode 100644
index 0000000..d2f5dfe
--- /dev/null
+++ b/cli/simple_socket/SimpleSocketServer.hpp
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_SIMPLE_SOCKET_SIMPLE_SOCKET_SERVER_HPP_
+#define QUICKSTEP_CLI_SIMPLE_SOCKET_SIMPLE_SOCKET_SERVER_HPP_
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <thread>
+
+#include "cli/simple_socket/SimpleSocketConnection.hpp"
+#include "utility/Macros.hpp"
+#include "utility/ThreadSafeQueue.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class SimpleSocketServer {
+ public:
+  SimpleSocketServer(const int port)
+      : port_(port) {}
+
+  ~SimpleSocketServer() {
+    stop();
+  }
+
+  void start() {
+    CHECK(main_loop_thread_ == nullptr);
+    CHECK(connections_.empty());
+    main_loop_thread_ = std::make_unique<std::thread>([this] {
+      this->mainLoop(this->port_);
+    });
+  }
+
+  void stop() {
+    if (main_loop_thread_ != nullptr) {
+      main_loop_thread_ = nullptr;
+    }
+  }
+
+  SimpleSocketConnection* waitForConnection() {
+    return connections_.popOne().release();
+  }
+
+ private:
+  void mainLoop(const int port) {
+    main_socket_fd_ = socket(AF_INET, SOCK_STREAM, 0);
+    int option = 1;
+    setsockopt(main_socket_fd_,
+               SOL_SOCKET, SO_REUSEADDR,
+               &option,
+               sizeof(option));
+    CHECK(main_socket_fd_ >= 0) << "Error opening socket";
+
+    constexpr socklen_t sockaddr_size = sizeof(sockaddr_in);
+    sockaddr_in server_address;
+    std::memset(&server_address, 0, sockaddr_size);
+
+    server_address.sin_family = AF_INET;
+    server_address.sin_addr.s_addr = INADDR_ANY;
+    server_address.sin_port = htons(port);
+
+    const int bind_retval =
+        bind(main_socket_fd_,
+             reinterpret_cast<const sockaddr*>(&server_address),
+             sockaddr_size);
+    CHECK(bind_retval >= 0) << "Error binding socket";
+
+    const int listen_retval = listen(main_socket_fd_, 32);
+    CHECK(listen_retval >= 0) << "Error listening to socket connection";
+
+    while (true) {
+      socklen_t client_addr_len = sockaddr_size;
+      sockaddr_in client_address;
+      const int client_socket_fd =
+          accept(main_socket_fd_,
+                 reinterpret_cast<sockaddr*>(&client_address),
+                 &client_addr_len);
+      CHECK(client_socket_fd >= 0) << "Error accepting socket connection";
+      connections_.push(std::make_unique<SimpleSocketConnection>(client_socket_fd));
+    }
+  }
+
+  const int port_;
+  std::unique_ptr<std::thread> main_loop_thread_;
+  int main_socket_fd_;
+  ThreadSafeQueue<std::unique_ptr<SimpleSocketConnection>> connections_;
+
+  DISALLOW_COPY_AND_ASSIGN(SimpleSocketServer);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CLI_SIMPLE_SOCKET_SIMPLE_SOCKET_SERVER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 5e0db44..1c5d669 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -148,6 +148,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_relationaloperators_UpdateOperator
                       quickstep_relationaloperators_WindowAggregationOperator
                       quickstep_storage_AggregationOperationState_proto
+                      quickstep_storage_Flags
                       quickstep_storage_HashTableFactory
                       quickstep_storage_HashTable_proto
                       quickstep_storage_InsertDestination_proto
@@ -248,6 +249,7 @@ target_link_libraries(quickstep_queryoptimizer_QueryHandle
                       quickstep_queryexecution_QueryContext_proto
                       quickstep_queryoptimizer_QueryPlan
                       quickstep_utility_Macros
+                      quickstep_utility_StringUtil
                       tmb)
 target_link_libraries(quickstep_queryoptimizer_QueryPlan
                       quickstep_relationaloperators_RelationalOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 9b83074..2063f2a 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -31,6 +31,7 @@
 #include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "query_optimizer/QueryPlan.hpp"
 #include "utility/Macros.hpp"
+#include "utility/StringUtil.hpp"
 
 #include "tmb/id_typedefs.h"
 
@@ -90,7 +91,7 @@ class QueryHandle {
         analyze_query_info_(analyze_query_info),
         query_plan_(new QueryPlan()),
         query_result_relation_(nullptr),
-        mem_data_(nullptr) {}
+        mem_data_(nullptr, 0) {}
 
   ~QueryHandle() {}
 
@@ -180,11 +181,11 @@ class QueryHandle {
     query_result_relation_ = relation;
   }
 
-  const std::string* getMemData() const {
+  const StringPiece& getMemData() const {
     return mem_data_;
   }
 
-  void setMemData(const std::string *mem_data) {
+  void setMemData(const StringPiece &mem_data) {
     mem_data_ = mem_data;
   }
 
@@ -224,7 +225,7 @@ class QueryHandle {
   //             and deleted by the Cli shell.
   const CatalogRelation *query_result_relation_;
 
-  const std::string *mem_data_;
+  StringPiece mem_data_;
 
 #ifdef QUICKSTEP_DISTRIBUTED
   // Indicate whether the query should be executed on the default Shiftboss for

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 7b9ed96..1a4311f 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -532,6 +532,8 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator
                       quickstep_utility_BulkIoConfiguration
                       quickstep_utility_Glob
                       quickstep_utility_Macros
+                      quickstep_utility_ScopedBuffer
+                      quickstep_utility_StringUtil
                       tmb)
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
   target_link_libraries(quickstep_relationaloperators_TextScanOperator
@@ -626,6 +628,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
                       quickstep_relationaloperators_WorkOrder_proto
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
+                      quickstep_utility_StringUtil
                       quickstep_utility_lipfilter_LIPFilterUtil
                       tmb)
 target_link_libraries(quickstep_relationaloperators_WorkOrder_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 86712aa..ef7da22 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -125,11 +125,11 @@ bool TextScanOperator::getAllWorkOrders(
     std::vector<std::size_t> file_sizes;
 
     if (file_pattern_ == "$stdin") {
-      if (mem_data_ == nullptr) {
+      if (mem_data_.first == nullptr) {
         container->addNormalWorkOrder(
             new TextScanWorkOrder(query_id_,
                                   file_pattern_,
-                                  nullptr /* mem_data */,
+                                  mem_data_,
                                   0,
                                   -1 /* text_segment_size */,
                                   options_->getDelimiter(),
@@ -143,7 +143,7 @@ bool TextScanOperator::getAllWorkOrders(
         return true;
       }
       files.emplace_back(file_pattern_);
-      file_sizes.emplace_back(mem_data_->size());
+      file_sizes.emplace_back(mem_data_.second);
     } else {
       DCHECK_EQ('@', file_pattern_.front());
       files = utility::file::GlobExpand(file_pattern_.substr(1));
@@ -282,7 +282,7 @@ serialization::WorkOrder* TextScanOperator::createWorkOrderProto(
 void TextScanWorkOrder::execute() {
   DCHECK(!filename_.empty());
   if (filename_ == "$stdin") {
-    if (mem_data_ == nullptr) {
+    if (mem_data_.first == nullptr) {
       executeInputStream();
     } else {
       executeMemData();
@@ -512,7 +512,7 @@ void TextScanWorkOrder::executeMemData() {
   bool is_faulty;
 
   // Locate the first newline character.
-  const char *row_ptr = mem_data_->c_str() + text_offset_;
+  const char *row_ptr = mem_data_.first + text_offset_;
   const char *segment_end = row_ptr + text_segment_size_;
   if (text_offset_ != 0) {
     while (row_ptr < segment_end && *row_ptr != '\n') {
@@ -558,7 +558,7 @@ void TextScanWorkOrder::executeMemData() {
     }
   }
   // Process the tuple that is right after the last newline character.
-  const char *data_end = mem_data_->c_str() + mem_data_->size();
+  const char *data_end = mem_data_.first + mem_data_.second;
   while (end_ptr < data_end && *end_ptr != '\n') {
     ++end_ptr;
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index dfedb2b..09b7f2d 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -35,6 +35,7 @@
 #include "types/containers/Tuple.hpp"
 #include "utility/BulkIoConfiguration.hpp"
 #include "utility/Macros.hpp"
+#include "utility/StringUtil.hpp"
 
 #include "glog/logging.h"
 
@@ -127,7 +128,7 @@ class TextScanOperator : public RelationalOperator {
    **/
   TextScanOperator(const std::size_t query_id,
                    const std::string &file_pattern,
-                   const std::string *mem_data,
+                   const StringPiece &mem_data,
                    const BulkIoConfigurationPtr &options,
                    const CatalogRelation &output_relation,
                    const QueryContext::insert_destination_id output_destination_index)
@@ -138,7 +139,7 @@ class TextScanOperator : public RelationalOperator {
         options_(options),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
-        serial_bulk_insert_(mem_data != nullptr),
+        serial_bulk_insert_(mem_data.first != nullptr),
         num_remaining_chunks_(0),
         serial_worker_ready_(true),
         work_generated_(false) {}
@@ -177,7 +178,7 @@ class TextScanOperator : public RelationalOperator {
                                                  const std::size_t text_segment_size);
 
   const std::string file_pattern_;
-  const std::string *mem_data_;
+  const StringPiece mem_data_;
   const BulkIoConfigurationPtr options_;
 
   const CatalogRelation &output_relation_;
@@ -215,7 +216,7 @@ class TextScanWorkOrder : public WorkOrder {
   TextScanWorkOrder(
       const std::size_t query_id,
       const std::string &filename,
-      const std::string *mem_data,
+      const StringPiece &mem_data,
       const std::size_t text_offset,
       const std::size_t text_segment_size,
       const char field_terminator,
@@ -360,7 +361,7 @@ class TextScanWorkOrder : public WorkOrder {
   }
 
   const std::string filename_;
-  const std::string *mem_data_;
+  const StringPiece mem_data_;
   const std::size_t text_offset_;
   const std::size_t text_segment_size_;
   const char field_terminator_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 9eeac9e..df839ef 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -55,6 +55,7 @@
 #include "relational_operators/WindowAggregationOperator.hpp"
 #include "relational_operators/WorkOrder.pb.h"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/StringUtil.hpp"
 #include "utility/lip_filter/LIPFilterUtil.hpp"
 
 #include "glog/logging.h"
@@ -554,7 +555,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       return new TextScanWorkOrder(
           query_id,
           proto.GetExtension(serialization::TextScanWorkOrder::filename),
-          nullptr /* TODO */,
+          StringPiece(nullptr, 0) /* TODO */,
           proto.GetExtension(serialization::TextScanWorkOrder::text_offset),
           proto.GetExtension(serialization::TextScanWorkOrder::text_segment_size),
           proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/relational_operators/tests/TextScanOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp
index aacc70c..caaebfa 100644
--- a/relational_operators/tests/TextScanOperator_unittest.cpp
+++ b/relational_operators/tests/TextScanOperator_unittest.cpp
@@ -42,6 +42,7 @@
 #include "types/TypeID.hpp"
 #include "utility/BulkIoConfiguration.hpp"
 #include "utility/MemStream.hpp"
+#include "utility/StringUtil.hpp"
 
 #include "gflags/gflags.h"
 #include "glog/logging.h"
@@ -200,7 +201,7 @@ TEST_F(TextScanOperatorTest, ScanTest) {
   std::unique_ptr<TextScanOperator> text_scan_op(
       new TextScanOperator(kQueryId,
                            input_filename,
-                           nullptr /* mem_data */,
+                           StringPiece(nullptr, 0) /* mem_data */,
                            BulkIoConfigurationPtr(options.release()),
                            *relation_,
                            output_destination_index));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/utility/StringUtil.hpp
----------------------------------------------------------------------
diff --git a/utility/StringUtil.hpp b/utility/StringUtil.hpp
index abda8f3..747d31d 100644
--- a/utility/StringUtil.hpp
+++ b/utility/StringUtil.hpp
@@ -20,9 +20,11 @@
 #ifndef QUICKSTEP_UTILITY_STRING_UTIL_HPP_
 #define QUICKSTEP_UTILITY_STRING_UTIL_HPP_
 
+#include <cstddef>
 #include <cstdint>
 #include <sstream>
 #include <string>
+#include <utility>
 #include <vector>
 
 namespace quickstep {
@@ -120,6 +122,8 @@ extern std::string
 DoubleToStringWithSignificantDigits(double val,
                                     std::uint64_t significant_digits);
 
+typedef std::pair<const char*, std::size_t> StringPiece;
+
 /** @} */
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/utility/ThreadSafeQueue.hpp
----------------------------------------------------------------------
diff --git a/utility/ThreadSafeQueue.hpp b/utility/ThreadSafeQueue.hpp
index 440f8a7..dbbe63d 100644
--- a/utility/ThreadSafeQueue.hpp
+++ b/utility/ThreadSafeQueue.hpp
@@ -126,7 +126,7 @@ class ThreadSafeQueue {
       queue_nonempty_condition_->await();
     }
     num_waiters_.fetch_sub(initially_empty, std::memory_order_relaxed);
-    T popped_value(internal_queue_.front());
+    T popped_value(std::move(internal_queue_.front()));
     internal_queue_.pop();
     return popped_value;
   }


Mime
View raw message