Repository: incubator-quickstep
Updated Branches:
refs/heads/trace-dev [created] 29024a3e9
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/cli/simple_socket/SimpleSocketServer.hpp
----------------------------------------------------------------------
diff --git a/cli/simple_socket/SimpleSocketServer.hpp b/cli/simple_socket/SimpleSocketServer.hpp
new file mode 100644
index 0000000..d2f5dfe
--- /dev/null
+++ b/cli/simple_socket/SimpleSocketServer.hpp
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_SIMPLE_SOCKET_SIMPLE_SOCKET_SERVER_HPP_
+#define QUICKSTEP_CLI_SIMPLE_SOCKET_SIMPLE_SOCKET_SERVER_HPP_
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <thread>
+
+#include "cli/simple_socket/SimpleSocketConnection.hpp"
+#include "utility/Macros.hpp"
+#include "utility/ThreadSafeQueue.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class SimpleSocketServer {
+ public:
+ SimpleSocketServer(const int port)
+ : port_(port) {}
+
+ ~SimpleSocketServer() {
+ stop();
+ }
+
+ void start() {
+ CHECK(main_loop_thread_ == nullptr);
+ CHECK(connections_.empty());
+ main_loop_thread_ = std::make_unique<std::thread>([this] {
+ this->mainLoop(this->port_);
+ });
+ }
+
+ void stop() {
+ if (main_loop_thread_ != nullptr) {
+ main_loop_thread_ = nullptr;
+ }
+ }
+
+ SimpleSocketConnection* waitForConnection() {
+ return connections_.popOne().release();
+ }
+
+ private:
+ void mainLoop(const int port) {
+ main_socket_fd_ = socket(AF_INET, SOCK_STREAM, 0);
+ int option = 1;
+ setsockopt(main_socket_fd_,
+ SOL_SOCKET, SO_REUSEADDR,
+ &option,
+ sizeof(option));
+ CHECK(main_socket_fd_ >= 0) << "Error opening socket";
+
+ constexpr socklen_t sockaddr_size = sizeof(sockaddr_in);
+ sockaddr_in server_address;
+ std::memset(&server_address, 0, sockaddr_size);
+
+ server_address.sin_family = AF_INET;
+ server_address.sin_addr.s_addr = INADDR_ANY;
+ server_address.sin_port = htons(port);
+
+ const int bind_retval =
+ bind(main_socket_fd_,
+ reinterpret_cast<const sockaddr*>(&server_address),
+ sockaddr_size);
+ CHECK(bind_retval >= 0) << "Error binding socket";
+
+ const int listen_retval = listen(main_socket_fd_, 32);
+ CHECK(listen_retval >= 0) << "Error listening to socket connection";
+
+ while (true) {
+ socklen_t client_addr_len = sockaddr_size;
+ sockaddr_in client_address;
+ const int client_socket_fd =
+ accept(main_socket_fd_,
+ reinterpret_cast<sockaddr*>(&client_address),
+ &client_addr_len);
+ CHECK(client_socket_fd >= 0) << "Error accepting socket connection";
+ connections_.push(std::make_unique<SimpleSocketConnection>(client_socket_fd));
+ }
+ }
+
+ const int port_;
+ std::unique_ptr<std::thread> main_loop_thread_;
+ int main_socket_fd_;
+ ThreadSafeQueue<std::unique_ptr<SimpleSocketConnection>> connections_;
+
+ DISALLOW_COPY_AND_ASSIGN(SimpleSocketServer);
+};
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_CLI_SIMPLE_SOCKET_SIMPLE_SOCKET_SERVER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 5e0db44..1c5d669 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -148,6 +148,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_relationaloperators_UpdateOperator
quickstep_relationaloperators_WindowAggregationOperator
quickstep_storage_AggregationOperationState_proto
+ quickstep_storage_Flags
quickstep_storage_HashTableFactory
quickstep_storage_HashTable_proto
quickstep_storage_InsertDestination_proto
@@ -248,6 +249,7 @@ target_link_libraries(quickstep_queryoptimizer_QueryHandle
quickstep_queryexecution_QueryContext_proto
quickstep_queryoptimizer_QueryPlan
quickstep_utility_Macros
+ quickstep_utility_StringUtil
tmb)
target_link_libraries(quickstep_queryoptimizer_QueryPlan
quickstep_relationaloperators_RelationalOperator
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 9b83074..2063f2a 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -31,6 +31,7 @@
#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
#include "query_optimizer/QueryPlan.hpp"
#include "utility/Macros.hpp"
+#include "utility/StringUtil.hpp"
#include "tmb/id_typedefs.h"
@@ -90,7 +91,7 @@ class QueryHandle {
analyze_query_info_(analyze_query_info),
query_plan_(new QueryPlan()),
query_result_relation_(nullptr),
- mem_data_(nullptr) {}
+ mem_data_(nullptr, 0) {}
~QueryHandle() {}
@@ -180,11 +181,11 @@ class QueryHandle {
query_result_relation_ = relation;
}
- const std::string* getMemData() const {
+ const StringPiece& getMemData() const {
return mem_data_;
}
- void setMemData(const std::string *mem_data) {
+ void setMemData(const StringPiece &mem_data) {
mem_data_ = mem_data;
}
@@ -224,7 +225,7 @@ class QueryHandle {
// and deleted by the Cli shell.
const CatalogRelation *query_result_relation_;
- const std::string *mem_data_;
+ StringPiece mem_data_;
#ifdef QUICKSTEP_DISTRIBUTED
// Indicate whether the query should be executed on the default Shiftboss for
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 7b9ed96..1a4311f 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -532,6 +532,8 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator
quickstep_utility_BulkIoConfiguration
quickstep_utility_Glob
quickstep_utility_Macros
+ quickstep_utility_ScopedBuffer
+ quickstep_utility_StringUtil
tmb)
if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
target_link_libraries(quickstep_relationaloperators_TextScanOperator
@@ -626,6 +628,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
quickstep_relationaloperators_WorkOrder_proto
quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
+ quickstep_utility_StringUtil
quickstep_utility_lipfilter_LIPFilterUtil
tmb)
target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 86712aa..ef7da22 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -125,11 +125,11 @@ bool TextScanOperator::getAllWorkOrders(
std::vector<std::size_t> file_sizes;
if (file_pattern_ == "$stdin") {
- if (mem_data_ == nullptr) {
+ if (mem_data_.first == nullptr) {
container->addNormalWorkOrder(
new TextScanWorkOrder(query_id_,
file_pattern_,
- nullptr /* mem_data */,
+ mem_data_,
0,
-1 /* text_segment_size */,
options_->getDelimiter(),
@@ -143,7 +143,7 @@ bool TextScanOperator::getAllWorkOrders(
return true;
}
files.emplace_back(file_pattern_);
- file_sizes.emplace_back(mem_data_->size());
+ file_sizes.emplace_back(mem_data_.second);
} else {
DCHECK_EQ('@', file_pattern_.front());
files = utility::file::GlobExpand(file_pattern_.substr(1));
@@ -282,7 +282,7 @@ serialization::WorkOrder* TextScanOperator::createWorkOrderProto(
void TextScanWorkOrder::execute() {
DCHECK(!filename_.empty());
if (filename_ == "$stdin") {
- if (mem_data_ == nullptr) {
+ if (mem_data_.first == nullptr) {
executeInputStream();
} else {
executeMemData();
@@ -512,7 +512,7 @@ void TextScanWorkOrder::executeMemData() {
bool is_faulty;
// Locate the first newline character.
- const char *row_ptr = mem_data_->c_str() + text_offset_;
+ const char *row_ptr = mem_data_.first + text_offset_;
const char *segment_end = row_ptr + text_segment_size_;
if (text_offset_ != 0) {
while (row_ptr < segment_end && *row_ptr != '\n') {
@@ -558,7 +558,7 @@ void TextScanWorkOrder::executeMemData() {
}
}
// Process the tuple that is right after the last newline character.
- const char *data_end = mem_data_->c_str() + mem_data_->size();
+ const char *data_end = mem_data_.first + mem_data_.second;
while (end_ptr < data_end && *end_ptr != '\n') {
++end_ptr;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index dfedb2b..09b7f2d 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -35,6 +35,7 @@
#include "types/containers/Tuple.hpp"
#include "utility/BulkIoConfiguration.hpp"
#include "utility/Macros.hpp"
+#include "utility/StringUtil.hpp"
#include "glog/logging.h"
@@ -127,7 +128,7 @@ class TextScanOperator : public RelationalOperator {
**/
TextScanOperator(const std::size_t query_id,
const std::string &file_pattern,
- const std::string *mem_data,
+ const StringPiece &mem_data,
const BulkIoConfigurationPtr &options,
const CatalogRelation &output_relation,
const QueryContext::insert_destination_id output_destination_index)
@@ -138,7 +139,7 @@ class TextScanOperator : public RelationalOperator {
options_(options),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
- serial_bulk_insert_(mem_data != nullptr),
+ serial_bulk_insert_(mem_data.first != nullptr),
num_remaining_chunks_(0),
serial_worker_ready_(true),
work_generated_(false) {}
@@ -177,7 +178,7 @@ class TextScanOperator : public RelationalOperator {
const std::size_t text_segment_size);
const std::string file_pattern_;
- const std::string *mem_data_;
+ const StringPiece mem_data_;
const BulkIoConfigurationPtr options_;
const CatalogRelation &output_relation_;
@@ -215,7 +216,7 @@ class TextScanWorkOrder : public WorkOrder {
TextScanWorkOrder(
const std::size_t query_id,
const std::string &filename,
- const std::string *mem_data,
+ const StringPiece &mem_data,
const std::size_t text_offset,
const std::size_t text_segment_size,
const char field_terminator,
@@ -360,7 +361,7 @@ class TextScanWorkOrder : public WorkOrder {
}
const std::string filename_;
- const std::string *mem_data_;
+ const StringPiece mem_data_;
const std::size_t text_offset_;
const std::size_t text_segment_size_;
const char field_terminator_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 9eeac9e..df839ef 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -55,6 +55,7 @@
#include "relational_operators/WindowAggregationOperator.hpp"
#include "relational_operators/WorkOrder.pb.h"
#include "storage/StorageBlockInfo.hpp"
+#include "utility/StringUtil.hpp"
#include "utility/lip_filter/LIPFilterUtil.hpp"
#include "glog/logging.h"
@@ -554,7 +555,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
return new TextScanWorkOrder(
query_id,
proto.GetExtension(serialization::TextScanWorkOrder::filename),
- nullptr /* TODO */,
+ StringPiece(nullptr, 0) /* TODO */,
proto.GetExtension(serialization::TextScanWorkOrder::text_offset),
proto.GetExtension(serialization::TextScanWorkOrder::text_segment_size),
proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/relational_operators/tests/TextScanOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp
index aacc70c..caaebfa 100644
--- a/relational_operators/tests/TextScanOperator_unittest.cpp
+++ b/relational_operators/tests/TextScanOperator_unittest.cpp
@@ -42,6 +42,7 @@
#include "types/TypeID.hpp"
#include "utility/BulkIoConfiguration.hpp"
#include "utility/MemStream.hpp"
+#include "utility/StringUtil.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
@@ -200,7 +201,7 @@ TEST_F(TextScanOperatorTest, ScanTest) {
std::unique_ptr<TextScanOperator> text_scan_op(
new TextScanOperator(kQueryId,
input_filename,
- nullptr /* mem_data */,
+ StringPiece(nullptr, 0) /* mem_data */,
BulkIoConfigurationPtr(options.release()),
*relation_,
output_destination_index));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/utility/StringUtil.hpp
----------------------------------------------------------------------
diff --git a/utility/StringUtil.hpp b/utility/StringUtil.hpp
index abda8f3..747d31d 100644
--- a/utility/StringUtil.hpp
+++ b/utility/StringUtil.hpp
@@ -20,9 +20,11 @@
#ifndef QUICKSTEP_UTILITY_STRING_UTIL_HPP_
#define QUICKSTEP_UTILITY_STRING_UTIL_HPP_
+#include <cstddef>
#include <cstdint>
#include <sstream>
#include <string>
+#include <utility>
#include <vector>
namespace quickstep {
@@ -120,6 +122,8 @@ extern std::string
DoubleToStringWithSignificantDigits(double val,
std::uint64_t significant_digits);
+typedef std::pair<const char*, std::size_t> StringPiece;
+
/** @} */
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29024a3e/utility/ThreadSafeQueue.hpp
----------------------------------------------------------------------
diff --git a/utility/ThreadSafeQueue.hpp b/utility/ThreadSafeQueue.hpp
index 440f8a7..dbbe63d 100644
--- a/utility/ThreadSafeQueue.hpp
+++ b/utility/ThreadSafeQueue.hpp
@@ -126,7 +126,7 @@ class ThreadSafeQueue {
queue_nonempty_condition_->await();
}
num_waiters_.fetch_sub(initially_empty, std::memory_order_relaxed);
- T popped_value(internal_queue_.front());
+ T popped_value(std::move(internal_queue_.front()));
internal_queue_.pop();
return popped_value;
}
|