quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [3/8] incubator-quickstep git commit: Add "COPY TO" operator for exporting data from Quickstep.
Date Fri, 25 Aug 2017 20:41:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1f9bc914/query_optimizer/tests/resolver/Copy.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/resolver/Copy.test b/query_optimizer/tests/resolver/Copy.test
index c2ae91a..d68e18f 100644
--- a/query_optimizer/tests/resolver/Copy.test
+++ b/query_optimizer/tests/resolver/Copy.test
@@ -16,7 +16,7 @@
 # under the License.
 
 [default initial_logical_plan]
-copy test from 'test.txt'
+COPY test FROM 'test.txt'
 --
 TopLevelPlan
 +-plan=CopyFrom[relation=Test,file_name=test.txt,column_delimiter="\t",
@@ -25,14 +25,14 @@ TopLevelPlan
   +-[]
 ==
 
-copy tESt from 'test.txt' with (delimiter '123')
+COPY tESt FROM 'test.txt' WITH (delimiter '123')
 --
 ERROR: DELIMITER is not a single character (1 : 43)
-copy tESt from 'test.txt' with (delimiter '123')
+COPY tESt FROM 'test.txt' WITH (delimiter '123')
                                           ^
 ==
 
-copy tESt from 'test.txt' with (delimiter 'd', escape_strings false)
+COPY tESt FROM 'test.txt' WITH (delimiter 'd', escape_strings false)
 --
 TopLevelPlan
 +-plan=CopyFrom[relation=Test,file_name=test.txt,column_delimiter="d",
@@ -41,8 +41,160 @@ TopLevelPlan
   +-[]
 ==
 
-copy undefined_table from 'test.txt'
+COPY test TO 'test.txt';
+--
+TopLevelPlan
++-plan=CopyTo[file_name=@test.txt,format=TEXT,column_delimiter="\t",
+| escape_strings=true,null_string=\N]
+| +-input=TableReference[relation_name=Test,relation_alias=test]
+|   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+|   +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+|   +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+|   +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
++-output_attributes=
+  +-[]
+==
+
+COPY test TO stdout WITH (FORMAT 'CSV');
+--
+TopLevelPlan
++-plan=CopyTo[file_name=$stdout,format=CSV,column_delimiter=",",header=true,
+| quote="]
+| +-input=TableReference[relation_name=Test,relation_alias=test]
+|   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+|   +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+|   +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+|   +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
++-output_attributes=
+  +-[]
+==
+
+COPY
+  SELECT SUM(int_col) AS sum_int,
+         AVG(double_col) AS avg_dbl
+  FROM test
+  GROUP BY char_col
+  UNION ALL
+  SELECT 1, 2.0
+  FROM generate_series(1, 1)
+TO 'test.txt' WITH (DELIMITER ',');
+--
+TopLevelPlan
++-plan=CopyTo[file_name=@test.txt,format=TEXT,column_delimiter=",",
+| escape_strings=true,null_string=\N]
+| +-input=UnionAll[set_operation_type=UnionAll]
+|   +-operands=
+|   | +-Project
+|   | | +-input=Project
+|   | | | +-input=Aggregate
+|   | | | | +-input=TableReference[relation_name=Test,relation_alias=test]
+|   | | | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   | | | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|   | | | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+|   | | | | | +-AttributeReference[id=3,name=double_col,relation=test,
+|   | | | | | | type=Double NULL]
+|   | | | | | +-AttributeReference[id=4,name=char_col,relation=test,
+|   | | | | | | type=Char(20)]
+|   | | | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+|   | | | | |   type=VarChar(20) NULL]
+|   | | | | +-grouping_expressions=
+|   | | | | | +-AttributeReference[id=4,name=char_col,relation=test,
+|   | | | | |   type=Char(20)]
+|   | | | | +-aggregate_expressions=
+|   | | | |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   | | | |   | type=Long NULL]
+|   | | | |   | +-AggregateFunction[function=SUM]
+|   | | | |   |   +-AttributeReference[id=0,name=int_col,relation=test,
+|   | | | |   |     type=Int NULL]
+|   | | | |   +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,
+|   | | | |     type=Double NULL]
+|   | | | |     +-AggregateFunction[function=AVG]
+|   | | | |       +-AttributeReference[id=3,name=double_col,relation=test,
+|   | | | |         type=Double NULL]
+|   | | | +-project_list=
+|   | | |   +-Alias[id=6,name=sum_int,relation=,type=Long NULL]
+|   | | |   | +-AttributeReference[id=6,name=,alias=$aggregate0,
+|   | | |   |   relation=$aggregate,type=Long NULL]
+|   | | |   +-Alias[id=7,name=avg_dbl,relation=,type=Double NULL]
+|   | | |     +-AttributeReference[id=7,name=,alias=$aggregate1,
+|   | | |       relation=$aggregate,type=Double NULL]
+|   | | +-project_list=
+|   | |   +-AttributeReference[id=6,name=sum_int,relation=,type=Long NULL]
+|   | |   +-AttributeReference[id=7,name=avg_dbl,relation=,type=Double NULL]
+|   | +-Project
+|   |   +-input=Project
+|   |   | +-input=TableGenerator[function_name=generate_series]
+|   |   | | +-AttributeReference[id=8,name=generate_series,
+|   |   | |   relation=generate_series,type=Int]
+|   |   | +-project_list=
+|   |   |   +-Alias[id=9,name=,alias=1,relation=,type=Int]
+|   |   |   | +-Literal[value=1,type=Int]
+|   |   |   +-Alias[id=10,name=,alias=2.0,relation=,type=Double]
+|   |   |     +-Literal[value=2,type=Double]
+|   |   +-project_list=
+|   |     +-Alias[id=11,name=,alias=1,relation=,type=Long NULL]
+|   |     | +-Cast[target_type=Long NULL]
+|   |     |   +-operand=AttributeReference[id=9,name=,alias=1,relation=,type=Int]
+|   |     +-Alias[id=12,name=,alias=2.0,relation=,type=Double NULL]
+|   |       +-Cast[target_type=Double NULL]
+|   |         +-operand=AttributeReference[id=10,name=,alias=2.0,relation=,
+|   |           type=Double]
+|   +-project_attributes=
+|     +-AttributeReference[id=13,name=sum_int,relation=,type=Long NULL]
+|     +-AttributeReference[id=14,name=avg_dbl,relation=,type=Double NULL]
++-output_attributes=
+  +-[]
+==
+
+COPY undefined_table FROM 'test.txt'
 --
 ERROR: Unrecognized relation undefined_table (1 : 6)
-copy undefined_table from 'test.txt...
+COPY undefined_table FROM 'test.txt...
      ^
+==
+
+COPY test FROM 'test.txt' WITH (FORMAT 'CSV')
+--
+ERROR: Unsupported file format: csv (1 : 40)
+COPY test FROM 'test.txt' WITH (FORMAT 'CSV')
+                                       ^
+==
+
+COPY test FROM 'test.txt' WITH (XXX 'YY');
+--
+ERROR: Unsupported copy option: xxx (1 : 33)
+COPY test FROM 'test.txt' WITH (XXX 'YY');
+                                ^
+==
+
+COPY test TO 'test.txt' WITH (QUOTE '$');
+--
+ERROR: Unsupported copy option "quote" for file format TEXT (1 : 31)
+COPY test TO 'test.txt' WITH (QUOTE '$');
+                              ^
+==
+
+COPY test TO 'test.txt' WITH (FORMAT 'CSV', ESCAPE_STRINGS TRUE);
+--
+ERROR: Unsupported copy option "escape_strings" for file format CSV (1 : 45)
+... test TO 'test.txt' WITH (FORMAT 'CSV', ESCAPE_STRINGS TRUE);
+                                           ^
+==
+
+COPY test TO 'test.txt' WITH (FORMAT CSV, QUOTE '$$');
+--
+ERROR: QUOTE is not a single character (1 : 49)
+...test TO 'test.txt' WITH (FORMAT CSV, QUOTE '$$');
+                                              ^
+==
+
+COPY test TO 'test.txt' WITH (FORMAT 'TEXT', QUOTE '"');
+--
+ERROR: Unsupported copy option "quote" for file format TEXT (1 : 46)
+...test TO 'test.txt' WITH (FORMAT 'TEXT', QUOTE '"');
+                                           ^
+==

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1f9bc914/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 5ad9c3b..0d0fe41 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -71,6 +71,7 @@ add_library(quickstep_relationaloperators_SortMergeRunOperatorHelpers SortMergeR
             SortMergeRunOperatorHelpers.hpp)
 add_library(quickstep_relationaloperators_SortRunGenerationOperator SortRunGenerationOperator.cpp
             SortRunGenerationOperator.hpp)
+add_library(quickstep_relationaloperators_TableExportOperator TableExportOperator.cpp TableExportOperator.hpp)
 add_library(quickstep_relationaloperators_TableGeneratorOperator TableGeneratorOperator.cpp TableGeneratorOperator.hpp)
 add_library(quickstep_relationaloperators_TextScanOperator TextScanOperator.cpp TextScanOperator.hpp)
 add_library(quickstep_relationaloperators_UnionAllOperator UnionAllOperator.cpp UnionAllOperator.hpp)
@@ -473,6 +474,25 @@ target_link_libraries(quickstep_relationaloperators_SortRunGenerationOperator
                       quickstep_utility_Macros
                       quickstep_utility_SortConfiguration
                       tmb)
+target_link_libraries(quickstep_relationaloperators_TableExportOperator
+                      glog
+                      quickstep_catalog_CatalogAttribute
+                      quickstep_catalog_CatalogRelation
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_WorkOrderProtosContainer
+                      quickstep_queryexecution_WorkOrdersContainer
+                      quickstep_relationaloperators_RelationalOperator
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_ValueAccessor
+                      quickstep_threading_SpinMutex
+                      quickstep_types_TypedValue
+                      quickstep_types_containers_Tuple
+                      quickstep_utility_BulkIOConfiguration
+                      quickstep_utility_Macros
+                      quickstep_utility_StringUtil)
 target_link_libraries(quickstep_relationaloperators_TableGeneratorOperator
                       glog
                       quickstep_catalog_CatalogRelation
@@ -508,6 +528,7 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_types_containers_Tuple
+                      quickstep_utility_BulkIOConfiguration
                       quickstep_utility_Glob
                       quickstep_utility_Macros
                       tmb)
@@ -635,6 +656,7 @@ target_link_libraries(quickstep_relationaloperators
                       quickstep_relationaloperators_SortMergeRunOperatorHelpers
                       quickstep_relationaloperators_SortMergeRunOperator_proto
                       quickstep_relationaloperators_SortRunGenerationOperator
+                      quickstep_relationaloperators_TableExportOperator
                       quickstep_relationaloperators_TableGeneratorOperator
                       quickstep_relationaloperators_TextScanOperator
                       quickstep_relationaloperators_UnionAllOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1f9bc914/relational_operators/RelationalOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index 5de7eb5..41107e9 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -85,6 +85,7 @@ class RelationalOperator {
     kSelect,
     kSortMergeRun,
     kSortRunGeneration,
+    kTableExport,
     kTableGenerator,
     kTextScan,
     kUnionAll,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1f9bc914/relational_operators/TableExportOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableExportOperator.cpp b/relational_operators/TableExportOperator.cpp
new file mode 100644
index 0000000..809e34c
--- /dev/null
+++ b/relational_operators/TableExportOperator.cpp
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/TableExportOperator.hpp"
+
+#include <cstdio>
+#include <exception>
+#include <string>
+#include <utility>
+
+#include "catalog/CatalogAttribute.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "threading/SpinMutex.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/Tuple.hpp"
+#include "utility/BulkIOConfiguration.hpp"
+#include "utility/StringUtil.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool TableExportOperator::getAllWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
+  if (query_context_ == nullptr) {
+    query_context_ = query_context;
+  }
+
+  const auto add_work_order = [&](const block_id input_block_id) -> void {
+    std::unique_ptr<std::string> output_buffer = std::make_unique<std::string>();
+    container->addNormalWorkOrder(
+        new TableExportWorkOrder(query_id_,
+                                 input_relation_,
+                                 input_block_id,
+                                 options_->getFormat(),
+                                 options_->getDelimiter(),
+                                 options_->escapeStrings(),
+                                 options_->getQuoteCharacter(),
+                                 options_->getNullString(),
+                                 op_index_,
+                                 scheduler_client_id,
+                                 storage_manager,
+                                 bus,
+                                 output_buffer.get()),
+        op_index_);
+    query_context->setBlockOutputTextBuffer(input_block_id, output_buffer.release());
+  };
+
+  if (input_relation_is_stored_) {
+    if (!started_) {
+      for (const block_id input_block_id : input_relation_block_ids_) {
+        add_work_order(input_block_id);
+      }
+      started_ = true;
+    }
+    return true;
+  } else {
+    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+      add_work_order(input_relation_block_ids_[num_workorders_generated_]);
+      ++num_workorders_generated_;
+    }
+    return done_feeding_input_relation_;
+  }
+}
+
+bool TableExportOperator::getAllWorkOrderProtos(
+    WorkOrderProtosContainer *container) {
+  // TODO(quickstep-team): Implement TextExportOperator for the distributed case.
+  LOG(FATAL) << "TableExportOperator::getAllWorkOrderProtos() is not supported";
+}
+
+void TableExportOperator::receiveFeedbackMessage(
+    const WorkOrder::FeedbackMessage &msg) {
+  DCHECK(TableExportOperator::kBlockOutputMessage == msg.type());
+  DCHECK(msg.payload_size() == sizeof(block_id));
+
+  if (file_ == nullptr) {
+    const std::string lo_file_name = ToLower(file_name_);
+    if (lo_file_name == "$stdout") {
+      file_ = stdout;
+    } else if (lo_file_name == "$stderr") {
+      file_ = stderr;
+    } else {
+      file_ = std::fopen(file_name_.substr(1).c_str(), "wb");
+      // TODO(quickstep-team): Decent handling of exceptions at query runtime.
+      if (file_ == nullptr) {
+        throw std::runtime_error("Can not open file " + file_name_ + " for writing");
+      }
+    }
+
+    if (options_->hasHeader()) {
+
+    }
+  }
+
+  block_id completed_block_id = *static_cast<const block_id*>(msg.payload());
+  outputs_.emplace(completed_block_id, std::unique_ptr<std::string>(
+      query_context_->releaseBlockOutputTextBuffer(completed_block_id)));
+
+  while (true) {
+    block_id next_block_id;
+    {
+      SpinMutexLock lock(block_ids_mutex_);
+      next_block_id = input_relation_block_ids_[num_blocks_written_];
+    }
+    auto it = outputs_.find(next_block_id);
+    if (it == outputs_.end()) {
+      break;
+    }
+    std::fwrite(it->second->c_str(), 1, it->second->length(), file_);
+    ++num_blocks_written_;
+    outputs_.erase(it);
+  }
+}
+
+void TableExportOperator::updateCatalogOnCompletion() {
+  if (file_ != nullptr && file_ != stdout && file_ != stderr) {
+    std::fclose(file_);
+  }
+  file_ = nullptr;
+}
+
+void TableExportWorkOrder::execute() {
+  BlockReference block(
+      storage_manager_->getBlock(input_block_id_, input_relation_));
+  std::unique_ptr<ValueAccessor> accessor(
+      block->getTupleStorageSubBlock().createValueAccessor());
+
+  switch (format_) {
+    case BulkIOFormat::kCSV:
+      writeToString<&TableExportWorkOrder::quoteCSVField>(
+          accessor.get(), output_buffer_);
+      break;
+    case BulkIOFormat::kText:
+      writeToString<&TableExportWorkOrder::escapeTextField>(
+          accessor.get(), output_buffer_);
+      break;
+    default:
+      LOG(FATAL) << "Unsupported export format in TableExportWorkOrder::execute()";
+  }
+
+  // Send completion message to operator.
+  FeedbackMessage msg(TableExportOperator::kBlockOutputMessage,
+                      getQueryID(),
+                      operator_index_,
+                      new block_id(input_block_id_),
+                      sizeof(input_block_id_));
+  SendFeedbackMessage(
+      bus_, ClientIDMap::Instance()->getValue(), scheduler_client_id_, msg);
+}
+
+inline std::string TableExportWorkOrder::quoteCSVField(std::string &&field) const {
+  bool need_quote = false;
+  for (const char c : field) {
+    if (c == column_delimiter_ || c == quote_character_ || c == '\n') {
+      need_quote = true;
+      break;
+    }
+  }
+  if (!need_quote) {
+    return std::move(field);
+  }
+
+  std::string quoted;
+  quoted.push_back(quote_character_);
+  for (const char c : field) {
+    if (c == quote_character_) {
+      quoted.push_back(c);
+    }
+    quoted.push_back(c);
+  }
+  quoted.push_back(quote_character_);
+  return quoted;
+}
+
+
+inline std::string TableExportWorkOrder::escapeTextField(std::string &&field) const {
+  if (escape_strings_ == false || field == "\\N") {
+    return std::move(field);
+  }
+  bool need_escape = false;
+  for (const unsigned char c : field) {
+    if (c < ' ' || c == '\\' || c == column_delimiter_) {
+      need_escape = true;
+      break;
+    }
+  }
+  if (!need_escape) {
+    return std::move(field);
+  }
+
+  std::string escaped;
+  for (const unsigned char c : field) {
+    if (c < 32) {
+      switch (c) {
+        case '\b':
+          // Backspace.
+          escaped.append("\\b");
+          break;
+        case '\f':
+          // Form-feed.
+          escaped.append("\\f");
+          break;
+        case '\n':
+          // Newline.
+          escaped.append("\\n");
+          break;
+        case '\r':
+          // Carriage return.
+          escaped.append("\\r");
+          break;
+        case '\t':
+          // Tab.
+          escaped.append("\\t");
+          break;
+        case '\v':
+          // Vertical tab
+          escaped.append("\\v");
+          break;
+        default: {
+          // Use hexidecimal representation.
+          static const std::string digits = "0123456789ABCDEF";
+          escaped.append("\\x");
+          escaped.push_back(digits.at(c >> 4));
+          escaped.push_back(digits.at(c & 0xF));
+          break;
+        }
+      }
+    } else {
+      if (c == '\\' || c == column_delimiter_) {
+        escaped.push_back('\\');
+      }
+      escaped.push_back(c);
+    }
+  }
+  return escaped;
+}
+
+template <std::string (TableExportWorkOrder::*transform)(std::string&&) const,
+          typename Container, typename Functor>
+inline void TableExportWorkOrder::writeEachToString(const Container &container,
+                                                    std::string *output,
+                                                    const Functor &functor) const {
+  auto it = container.begin();
+  if (it != container.end()) {
+    std::size_t idx = 0;
+    output->append((this->*transform)(functor(*it, idx++)));
+    while ((++it) != container.end()) {
+      output->push_back(column_delimiter_);
+      output->append((this->*transform)(functor(*it, idx++)));
+    }
+  }
+}
+
+template <std::string (TableExportWorkOrder::*transform)(std::string&&) const>
+void TableExportWorkOrder::writeToString(
+    ValueAccessor *accessor, std::string *output) const {
+  std::vector<const Type*> value_types;
+  value_types.reserve(input_relation_.size());
+  for (const CatalogAttribute &attribute : input_relation_) {
+    value_types.emplace_back(&attribute.getType());
+  }
+
+  accessor->beginIterationVirtual();
+  while (accessor->nextVirtual()) {
+    std::unique_ptr<Tuple> tuple(accessor->getTupleVirtual());
+    writeEachToString<transform>(
+        *tuple, output,
+        [&](const TypedValue &value, const std::size_t idx) -> std::string {
+      if (value.isNull()) {
+        return null_string_;
+      } else {
+        return value_types[idx]->printValueToString(value);
+      }
+    });
+    output->push_back('\n');
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1f9bc914/relational_operators/TableExportOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableExportOperator.hpp b/relational_operators/TableExportOperator.hpp
new file mode 100644
index 0000000..a8152c8
--- /dev/null
+++ b/relational_operators/TableExportOperator.hpp
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_TABLE_EXPORT_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_TABLE_EXPORT_OPERATOR_HPP_
+
+#include <cstddef>
+#include <cstdio>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "threading/SpinMutex.hpp"
+#include "utility/BulkIOConfiguration.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogRelationSchema;
+class InsertDestination;
+class StorageManager;
+class ValueAccessor;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ *  @{
+ */
+
+class TableExportOperator : public RelationalOperator {
+ public:
+  enum FeedbackMessageType : WorkOrder::FeedbackMessageType {
+      kBlockOutputMessage,
+  };
+
+  TableExportOperator(const std::size_t query_id,
+                      const CatalogRelation &input_relation,
+                      const bool input_relation_is_stored,
+                      const std::string &file_name,
+                      const BulkIOConfigurationPtr &options)
+      : RelationalOperator(query_id),
+        input_relation_(input_relation),
+        input_relation_is_stored_(input_relation_is_stored),
+        file_name_(file_name),
+        options_(options),
+        input_relation_block_ids_(input_relation_is_stored
+                                      ? input_relation.getBlocksSnapshot()
+                                      : std::vector<block_id>()),
+        num_workorders_generated_(0),
+        started_(false),
+        query_context_(nullptr),
+        num_blocks_written_(0),
+        file_(nullptr) {}
+
+  ~TableExportOperator() override {}
+
+  OperatorType getOperatorType() const override {
+    return kTableExport;
+  }
+
+  std::string getName() const override {
+    return "TableExportOperator";
+  }
+
+  const CatalogRelation& input_relation() const {
+    return input_relation_;
+  }
+
+  bool getAllWorkOrders(WorkOrdersContainer *container,
+                        QueryContext *query_context,
+                        StorageManager *storage_manager,
+                        const tmb::client_id scheduler_client_id,
+                        tmb::MessageBus *bus) override;
+
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+  void feedInputBlock(const block_id input_block_id,
+                      const relation_id input_relation_id,
+                      const partition_id part_id) override {
+    if (input_relation_id == input_relation_.getID()) {
+      SpinMutexLock lock(block_ids_mutex_);
+      input_relation_block_ids_.emplace_back(input_block_id);
+    }
+  }
+
+  void receiveFeedbackMessage(const WorkOrder::FeedbackMessage &msg) override;
+
+  void updateCatalogOnCompletion() override;
+
+ private:
+  const CatalogRelation &input_relation_;
+  const bool input_relation_is_stored_;
+  const std::string file_name_;
+  const BulkIOConfigurationPtr options_;
+
+  std::vector<block_id> input_relation_block_ids_;
+  std::size_t num_workorders_generated_;
+
+  bool started_;
+
+  QueryContext *query_context_;
+  std::size_t num_blocks_written_;
+  std::unordered_map<block_id, std::unique_ptr<std::string>> outputs_;
+  SpinMutex block_ids_mutex_;
+
+  FILE *file_;
+
+  DISALLOW_COPY_AND_ASSIGN(TableExportOperator);
+};
+
+class TableExportWorkOrder : public WorkOrder {
+ public:
+  TableExportWorkOrder(const std::size_t query_id,
+                       const CatalogRelationSchema &input_relation,
+                       const block_id input_block_id,
+                       const BulkIOFormat format,
+                       const char column_delimiter,
+                       const bool escape_strings,
+                       const char quote_character,
+                       const std::string null_string,
+                       const std::size_t operator_index,
+                       const tmb::client_id scheduler_client_id,
+                       StorageManager *storage_manager,
+                       MessageBus *bus,
+                       std::string *output_buffer)
+      : WorkOrder(query_id),
+        input_relation_(input_relation),
+        input_block_id_(input_block_id),
+        format_(format),
+        column_delimiter_(column_delimiter),
+        escape_strings_(escape_strings),
+        quote_character_(quote_character),
+        null_string_(null_string),
+        operator_index_(operator_index),
+        scheduler_client_id_(scheduler_client_id),
+        storage_manager_(storage_manager),
+        bus_(bus),
+        output_buffer_(output_buffer) {
+  }
+
+  ~TableExportWorkOrder() override {}
+
+  void execute() override;
+
+ private:
+  inline std::string quoteCSVField(std::string &&field) const;
+  inline std::string escapeTextField(std::string &&field) const;
+
+  template <std::string (TableExportWorkOrder::*transform)(std::string&&) const,
+            typename Container, typename Functor>
+  inline void writeEachToString(const Container &container,
+                                std::string *output,
+                                const Functor &functor) const;
+
+  template <std::string (TableExportWorkOrder::*transform)(std::string&&) const>
+  void writeToString(ValueAccessor *accessor, std::string *output) const;
+
+  const CatalogRelationSchema &input_relation_;
+  const block_id input_block_id_;
+
+  const BulkIOFormat format_;
+  const char column_delimiter_;
+  const bool escape_strings_;
+  const char quote_character_;
+  const std::string null_string_;
+
+  const std::size_t operator_index_;
+  const tmb::client_id scheduler_client_id_;
+  StorageManager *storage_manager_;
+  MessageBus *bus_;
+
+  std::string *output_buffer_;
+
+  DISALLOW_COPY_AND_ASSIGN(TableExportWorkOrder);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_RELATIONAL_OPERATORS_TABLE_EXPORT_OPERATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1f9bc914/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 3ca3af4..3cb78fb 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -31,6 +31,7 @@
 #include <cstdint>
 #include <cstdio>
 #include <cstdlib>
+#include <exception>
 #include <memory>
 #include <string>
 #include <utility>
@@ -54,6 +55,7 @@
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "types/containers/Tuple.hpp"
+#include "utility/BulkIOConfiguration.hpp"
 #include "utility/Glob.hpp"
 
 #include "gflags/gflags.h"
@@ -61,9 +63,6 @@
 
 #include "tmb/id_typedefs.h"
 
-using std::size_t;
-using std::string;
-
 namespace quickstep {
 
 // Text segment size set to 256KB.
@@ -82,14 +81,19 @@ static bool ValidateTextScanTextSegmentSize(const char *flagname,
   return true;
 }
 
-static const volatile bool text_scan_text_segment_size_dummy = gflags::RegisterFlagValidator(
-    &FLAGS_textscan_text_segment_size, &ValidateTextScanTextSegmentSize);
+static const volatile bool text_scan_text_segment_size_dummy =
+    gflags::RegisterFlagValidator(
+        &FLAGS_textscan_text_segment_size, &ValidateTextScanTextSegmentSize);
 
 namespace {
 
-size_t getFileSize(const string &file_name) {
+static std::size_t GetFileSize(const std::string &file_name) {
   // Use standard C libary to retrieve the file size.
   FILE *fp = std::fopen(file_name.c_str(), "rb");
+  // TODO(quickstep-team): Decent handling of exceptions at query runtime.
+  if (fp == nullptr) {
+    throw std::runtime_error("Can not open file " + file_name + " for reading");
+  }
   std::fseek(fp, 0, SEEK_END);
   const std::size_t file_size = std::ftell(fp);
   std::fclose(fp);
@@ -127,7 +131,7 @@ bool TextScanOperator::getAllWorkOrders(
         << "File " << file << " is not readable due to permission issues.";
 #endif  // QUICKSTEP_HAVE_UNISTD
 
-    const std::size_t file_size = getFileSize(file);
+    const std::size_t file_size = GetFileSize(file);
 
     std::size_t text_offset = 0;
     for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size;
@@ -138,8 +142,8 @@ bool TextScanOperator::getAllWorkOrders(
                                 file,
                                 text_offset,
                                 FLAGS_textscan_text_segment_size,
-                                field_terminator_,
-                                process_escape_sequences_,
+                                options_->getDelimiter(),
+                                options_->escapeStrings(),
                                 output_destination),
           op_index_);
     }
@@ -152,8 +156,8 @@ bool TextScanOperator::getAllWorkOrders(
                                 file,
                                 text_offset,
                                 file_size - text_offset,
-                                field_terminator_,
-                                process_escape_sequences_,
+                                options_->getDelimiter(),
+                                options_->escapeStrings(),
                                 output_destination),
           op_index_);
     }
@@ -169,22 +173,25 @@ bool TextScanOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container
     return true;
   }
 
-  for (const string &file : files) {
-    const std::size_t file_size = getFileSize(file);
+  for (const std::string &file : files) {
+    const std::size_t file_size = GetFileSize(file);
 
     size_t text_offset = 0;
     for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size;
          num_full_segments > 0;
          --num_full_segments, text_offset += FLAGS_textscan_text_segment_size) {
-      container->addWorkOrderProto(createWorkOrderProto(file, text_offset, FLAGS_textscan_text_segment_size),
-                                   op_index_);
+      container->addWorkOrderProto(
+          createWorkOrderProto(file, text_offset,
+                               FLAGS_textscan_text_segment_size),
+          op_index_);
     }
 
     // Deal with the residual partial segment whose size is less than
     // 'FLAGS_textscan_text_segment_size'.
     if (text_offset < file_size) {
-      container->addWorkOrderProto(createWorkOrderProto(file, text_offset, file_size - text_offset),
-                                   op_index_);
+      container->addWorkOrderProto(
+          createWorkOrderProto(file, text_offset, file_size - text_offset),
+          op_index_);
     }
   }
 
@@ -192,9 +199,10 @@ bool TextScanOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container
   return true;
 }
 
-serialization::WorkOrder* TextScanOperator::createWorkOrderProto(const string &filename,
-                                                                 const size_t text_offset,
-                                                                 const size_t text_segment_size) {
+serialization::WorkOrder* TextScanOperator::createWorkOrderProto(
+    const std::string &filename,
+    const std::size_t text_offset,
+    const std::size_t text_segment_size) {
   serialization::WorkOrder *proto = new serialization::WorkOrder;
   proto->set_work_order_type(serialization::TEXT_SCAN);
   proto->set_query_id(query_id_);
@@ -202,9 +210,10 @@ serialization::WorkOrder* TextScanOperator::createWorkOrderProto(const string &f
   proto->SetExtension(serialization::TextScanWorkOrder::filename, filename);
   proto->SetExtension(serialization::TextScanWorkOrder::text_offset, text_offset);
   proto->SetExtension(serialization::TextScanWorkOrder::text_segment_size, text_segment_size);
-  proto->SetExtension(serialization::TextScanWorkOrder::field_terminator, field_terminator_);
+  proto->SetExtension(serialization::TextScanWorkOrder::field_terminator,
+                      options_->getDelimiter());
   proto->SetExtension(serialization::TextScanWorkOrder::process_escape_sequences,
-                      process_escape_sequences_);
+                      options_->escapeStrings());
   proto->SetExtension(serialization::TextScanWorkOrder::insert_destination_index,
                       output_destination_index_);
 
@@ -235,12 +244,14 @@ void TextScanWorkOrder::execute() {
     file_handle = hdfsOpenFile(hdfs, filename_.c_str(), O_RDONLY, buffer_size,
                                0 /* default replication */, 0 /* default block size */);
     if (file_handle == nullptr) {
-      LOG(ERROR) << "Failed to open file " << filename_ << " with error: " << strerror(errno);
+      LOG(ERROR) << "Failed to open file " << filename_
+                 << " with error: " << strerror(errno);
       return;
     }
 
     if (hdfsSeek(hdfs, file_handle, text_offset_)) {
-      LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno);
+      LOG(ERROR) << "Failed to seek in file " << filename_
+                 << " with error: " << strerror(errno);
 
       hdfsCloseFile(hdfs, file_handle);
       return;
@@ -248,7 +259,9 @@ void TextScanWorkOrder::execute() {
 
     bytes_read = hdfsRead(hdfs, file_handle, buffer, text_segment_size_);
     while (bytes_read != text_segment_size_) {
-      bytes_read += hdfsRead(hdfs, file_handle, buffer + bytes_read, text_segment_size_ - bytes_read);
+      bytes_read += hdfsRead(hdfs, file_handle,
+                             buffer + bytes_read,
+                             text_segment_size_ - bytes_read);
     }
   }
 #endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
@@ -325,7 +338,8 @@ void TextScanWorkOrder::execute() {
   if (use_hdfs) {
 #ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
     if (hdfsSeek(hdfs, file_handle, dynamic_read_offset)) {
-      LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno);
+      LOG(ERROR) << "Failed to seek in file " << filename_
+                 << " with error: " << strerror(errno);
 
       hdfsCloseFile(hdfs, file_handle);
       return;
@@ -343,7 +357,9 @@ void TextScanWorkOrder::execute() {
 
       // Read again when acrossing the HDFS block boundary.
       if (bytes_read != dynamic_read_size) {
-        bytes_read += hdfsRead(hdfs, file_handle, buffer + bytes_read, dynamic_read_size - bytes_read);
+        bytes_read += hdfsRead(hdfs, file_handle,
+                               buffer + bytes_read,
+                               dynamic_read_size - bytes_read);
       }
 #endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1f9bc914/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index f6be8c8..bc18d9a 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -32,6 +32,7 @@
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
 #include "types/containers/Tuple.hpp"
+#include "utility/BulkIOConfiguration.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -121,14 +122,12 @@ class TextScanOperator : public RelationalOperator {
    **/
   TextScanOperator(const std::size_t query_id,
                    const std::string &file_pattern,
-                   const char field_terminator,
-                   const bool process_escape_sequences,
+                   const BulkIOConfigurationPtr &options,
                    const CatalogRelation &output_relation,
                    const QueryContext::insert_destination_id output_destination_index)
       : RelationalOperator(query_id),
         file_pattern_(file_pattern),
-        field_terminator_(field_terminator),
-        process_escape_sequences_(process_escape_sequences),
+        options_(options),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
         work_generated_(false) {}
@@ -165,8 +164,7 @@ class TextScanOperator : public RelationalOperator {
                                                  const std::size_t text_segment_size);
 
   const std::string file_pattern_;
-  const char field_terminator_;
-  const bool process_escape_sequences_;
+  const BulkIOConfigurationPtr options_;
 
   const CatalogRelation &output_relation_;
   const QueryContext::insert_destination_id output_destination_index_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1f9bc914/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 42a0e7d..7025c16 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -21,7 +21,7 @@ package quickstep.serialization;
 
 import "relational_operators/SortMergeRunOperator.proto";
 
-// Next tag: 26.
+// Next tag: 27.
 enum WorkOrderType {
   AGGREGATION = 1;
   BUILD_AGGREGATION_EXISTENCE_MAP = 23;
@@ -43,6 +43,7 @@ enum WorkOrderType {
   SELECT = 15;
   SORT_MERGE_RUN = 16;
   SORT_RUN_GENERATION = 17;
+  TABLE_EXPORT = 26;
   TABLE_GENERATOR = 18;
   TEXT_SCAN = 19;
   UNION_ALL = 24;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1f9bc914/relational_operators/tests/TextScanOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp
index c92a3dd..d609817 100644
--- a/relational_operators/tests/TextScanOperator_unittest.cpp
+++ b/relational_operators/tests/TextScanOperator_unittest.cpp
@@ -40,6 +40,7 @@
 #include "threading/ThreadIDBasedMap.hpp"
 #include "types/TypeFactory.hpp"
 #include "types/TypeID.hpp"
+#include "utility/BulkIOConfiguration.hpp"
 #include "utility/MemStream.hpp"
 
 #include "gflags/gflags.h"
@@ -191,11 +192,15 @@ TEST_F(TextScanOperatorTest, ScanTest) {
   output_destination_proto->set_relation_id(relation_->getID());
   output_destination_proto->set_relational_op_index(kOpIndex);
 
+  std::unique_ptr<BulkIOConfiguration> options =
+      std::make_unique<BulkIOConfiguration>(BulkIOFormat::kText);
+  options->setDelimiter('\t');
+  options->setEscapeStrings(true);
+
   std::unique_ptr<TextScanOperator> text_scan_op(
       new TextScanOperator(kQueryId,
                            input_filename,
-                           '\t',
-                           true,
+                           BulkIOConfigurationPtr(options.release()),
                            *relation_,
                            output_destination_index));
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1f9bc914/utility/BulkIOConfiguration.cpp
----------------------------------------------------------------------
diff --git a/utility/BulkIOConfiguration.cpp b/utility/BulkIOConfiguration.cpp
new file mode 100644
index 0000000..af95dca
--- /dev/null
+++ b/utility/BulkIOConfiguration.cpp
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/BulkIOConfiguration.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+void BulkIOConfiguration::initializeDefaultParameters(const BulkIOFormat format) {
+  switch (format) {
+    case BulkIOFormat::kCSV: {
+      delimiter_ = ',';
+      escape_strings_ = false;
+      header_ = true;
+      quote_ = '"';
+      null_string_ = "";
+      break;
+    }
+    case BulkIOFormat::kText: {
+      delimiter_ = '\t';
+      escape_strings_ = true;
+      header_ = false;
+      quote_ = 0;
+      null_string_ = "\\N";
+      break;
+    }
+    default:
+      LOG(FATAL) << "Unexpected format in "
+                 << "BulkIOConfiguration::initializeDefaultParameters()";
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1f9bc914/utility/BulkIOConfiguration.hpp
----------------------------------------------------------------------
diff --git a/utility/BulkIOConfiguration.hpp b/utility/BulkIOConfiguration.hpp
new file mode 100644
index 0000000..042932e
--- /dev/null
+++ b/utility/BulkIOConfiguration.hpp
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_BULK_IO_CONFIGURATION_HPP_
+#define QUICKSTEP_UTILITY_BULK_IO_CONFIGURATION_HPP_
+
+#include <memory>
+#include <string>
+
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+/**
+ * @brief
+ */
+enum class BulkIOFormat {
+  kCSV,
+  kText
+};
+
+class BulkIOConfiguration;
+typedef std::shared_ptr<const BulkIOConfiguration> BulkIOConfigurationPtr;
+
+class BulkIOConfiguration {
+ public:
+  BulkIOConfiguration(const BulkIOFormat format)
+      : format_(format) {
+    initializeDefaultParameters(format);
+  }
+
+  inline BulkIOFormat getFormat() const {
+    return format_;
+  }
+
+  inline std::string getFormatName() const {
+    switch (format_) {
+      case BulkIOFormat::kCSV:
+        return "CSV";
+      case BulkIOFormat::kText:
+        return "TEXT";
+    }
+  }
+
+  inline char getDelimiter() const {
+    return delimiter_;
+  }
+
+  inline void setDelimiter(const char delimiter) {
+    delimiter_ = delimiter;
+  }
+
+  inline bool escapeStrings() const {
+    return escape_strings_;
+  }
+
+  inline void setEscapeStrings(const bool escape_strings) {
+    escape_strings_ = escape_strings;
+  }
+
+  inline bool hasHeader() const {
+    return header_;
+  }
+
+  inline void setHeader(const bool header) {
+    header_ = header;
+  }
+
+  inline char getQuoteCharacter() const {
+    return quote_;
+  }
+
+  inline void setQuoteCharacter(const char quote) {
+    quote_ = quote;
+  }
+
+  inline const std::string& getNullString() const {
+    return null_string_;
+  }
+
+  inline void setNullString(const std::string &null_string) {
+    null_string_ = null_string;
+  }
+
+ private:
+  void initializeDefaultParameters(const BulkIOFormat format);
+
+  const BulkIOFormat format_;
+
+  char delimiter_;
+  bool escape_strings_;
+  bool header_;
+  char quote_;
+  std::string null_string_;
+
+  DISALLOW_COPY_AND_ASSIGN(BulkIOConfiguration);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_BULK_IO_CONFIGURATION_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1f9bc914/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index 16a83ee..59d843d 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -168,6 +168,7 @@ add_library(quickstep_utility_BloomFilter ../empty_src.cpp BloomFilter.hpp)
 add_library(quickstep_utility_BloomFilter_proto
             ${quickstep_utility_BloomFilter_proto_srcs}
             ${quickstep_utility_BloomFilter_proto_hdrs})
+add_library(quickstep_utility_BulkIOConfiguration BulkIOConfiguration.cpp BulkIOConfiguration.hpp)
 add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp)
 add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
 add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
@@ -249,6 +250,8 @@ target_link_libraries(quickstep_utility_CompositeHash
 target_link_libraries(quickstep_utility_BarrieredReadWriteConcurrentBitVector
                       quickstep_utility_BitManipulation
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_BulkIOConfiguration
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_utility_DAG
                       glog
                       quickstep_utility_Macros)
@@ -347,6 +350,7 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_BitVector
                       quickstep_utility_BloomFilter
                       quickstep_utility_BloomFilter_proto
+                      quickstep_utility_BulkIOConfiguration
                       quickstep_utility_CalculateInstalledMemory
                       quickstep_utility_Cast
                       quickstep_utility_CheckSnprintf


Mime
View raw message