quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zu...@apache.org
Subject [1/2] incubator-quickstep git commit: QUICKSTEP-10: Serialized WorkOrders as proto. [Forced Update!]
Date Fri, 10 Jun 2016 00:05:00 GMT
Repository: incubator-quickstep
Updated Branches:
  refs/heads/work-order-serialization 0b6fca500 -> c9214ecb1 (forced update)


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SortMergeRunOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.cpp b/relational_operators/SortMergeRunOperator.cpp
index 6bf5719..e398d62 100644
--- a/relational_operators/SortMergeRunOperator.cpp
+++ b/relational_operators/SortMergeRunOperator.cpp
@@ -23,9 +23,11 @@
 #include <vector>
 
 #include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/SortMergeRunOperator.pb.h"
 #include "relational_operators/SortMergeRunOperatorHelpers.hpp"
+#include "relational_operators/WorkOrder.pb.h"
 #include "threading/ThreadIDBasedMap.hpp"
 
 #include "glog/logging.h"
@@ -69,6 +71,72 @@ bool SortMergeRunOperator::getAllWorkOrders(
   return generateWorkOrders(container, query_context, storage_manager, scheduler_client_id,
bus);
 }
 
+bool SortMergeRunOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  if (input_relation_is_stored_) {
+    // Input blocks (or runs) are from base relation. Only possible when base
+    // relation is stored sorted.
+    if (!started_) {
+      // Initialize merge tree completely, since all input runs are known.
+      merge_tree_.initializeTree(input_relation_block_ids_.size());
+      started_ = true;
+      initializeInputRuns();
+    }
+  } else {
+    // Input blocks (or runs) are pipelined from the sorted run generation
+    // operator.
+    if (!started_ && !input_stream_done_) {
+      // Initialize merge tree for first pipeline mode.
+      merge_tree_.initializeForPipeline();
+      started_ = true;
+      initializeInputRuns();
+    }
+  }
+
+  // Get merge jobs from merge tree.
+  std::vector<MergeTree::MergeJob> jobs;
+  const bool done_generating = merge_tree_.getMergeJobs(&jobs);
+
+  for (std::vector<MergeTree::MergeJob>::size_type job_id = 0;
+       job_id < jobs.size();
+       ++job_id) {
+    // Add work order for each merge job.
+    container->addWorkOrderProto(createWorkOrderProto(&jobs[job_id]), op_index_);
+  }
+
+  return done_generating;
+}
+
+serialization::WorkOrder* SortMergeRunOperator::createWorkOrderProto(
+    merge_run_operator::MergeTree::MergeJob *job) {
+  DCHECK(job != nullptr);
+  DCHECK(!job->runs.empty());
+
+  serialization::WorkOrder *proto = new serialization::WorkOrder;
+  proto->set_work_order_type(serialization::SORT_MERGE_RUN);
+  proto->set_query_id(query_id_);
+
+  proto->SetExtension(serialization::SortMergeRunWorkOrder::operator_index, op_index_);
+  proto->SetExtension(serialization::SortMergeRunWorkOrder::sort_config_index, sort_config_index_);
+
+  for (const merge_run_operator::Run &run : job->runs) {
+    serialization::Run *run_proto = proto->AddExtension(serialization::SortMergeRunWorkOrder::runs);
+    for (const block_id block : run) {
+      run_proto->add_blocks(block);
+    }
+  }
+
+  proto->SetExtension(serialization::SortMergeRunWorkOrder::top_k, top_k_);
+  proto->SetExtension(serialization::SortMergeRunWorkOrder::merge_level, job->level);
+  proto->SetExtension(serialization::SortMergeRunWorkOrder::relation_id,
+                      job->level > 0 ? run_relation_.getID()
+                                     : input_relation_.getID());
+  proto->SetExtension(serialization::SortMergeRunWorkOrder::insert_destination_index,
+                      job->is_final_level ? output_destination_index_
+                                          : run_block_destination_index_);
+
+  return proto;
+}
+
 WorkOrder *SortMergeRunOperator::createWorkOrder(
     merge_run_operator::MergeTree::MergeJob *job,
     QueryContext *query_context,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SortMergeRunOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.hpp b/relational_operators/SortMergeRunOperator.hpp
index cfff8b9..177836f 100644
--- a/relational_operators/SortMergeRunOperator.hpp
+++ b/relational_operators/SortMergeRunOperator.hpp
@@ -44,8 +44,11 @@ namespace quickstep {
 class CatalogRelationSchema;
 class InsertDestination;
 class StorageManager;
+class WorkOrderProtosContainer;
 class WorkOrdersContainer;
 
+namespace serialization { class WorkOrder; }
+
 /**
  * @defgroup SortMergeRun Merging Sorted Runs
  * @ingroup Sort
@@ -132,6 +135,8 @@ class SortMergeRunOperator : public RelationalOperator {
                         const tmb::client_id scheduler_client_id,
                         tmb::MessageBus *bus) override;
 
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
   void feedInputBlock(const block_id input_block_id,
                       const relation_id input_relation_id) override {
     input_relation_block_ids_.push_back(input_block_id);
@@ -182,6 +187,13 @@ class SortMergeRunOperator : public RelationalOperator {
                              const tmb::client_id scheduler_client_id,
                              tmb::MessageBus *bus);
 
+  /**
+   * @brief Create Work Order proto.
+   *
+   * @param job The merge job.
+   **/
+  serialization::WorkOrder* createWorkOrderProto(merge_run_operator::MergeTree::MergeJob
*job);
+
   const CatalogRelation &input_relation_;
 
   const CatalogRelation &output_relation_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SortRunGenerationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortRunGenerationOperator.cpp b/relational_operators/SortRunGenerationOperator.cpp
index 37b8fb8..d7362db 100644
--- a/relational_operators/SortRunGenerationOperator.cpp
+++ b/relational_operators/SortRunGenerationOperator.cpp
@@ -21,7 +21,9 @@
 
 #include "catalog/CatalogRelation.hpp"
 #include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
 #include "storage/InsertDestination.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageManager.hpp"
@@ -80,6 +82,43 @@ bool SortRunGenerationOperator::getAllWorkOrders(
   }
 }
 
+bool SortRunGenerationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container)
{
+  if (input_relation_is_stored_) {
+    // Input blocks are from a base relation.
+    if (!started_) {
+      for (const block_id input_block_id : input_relation_block_ids_) {
+        container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_);
+      }
+      started_ = true;
+    }
+    return true;
+  } else {
+    // Input blocks are pipelined.
+    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+      container->addWorkOrderProto(
+          createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]),
+          op_index_);
+      ++num_workorders_generated_;
+    }
+    return done_feeding_input_relation_;
+  }
+}
+
+serialization::WorkOrder* SortRunGenerationOperator::createWorkOrderProto(const block_id
block) {
+  serialization::WorkOrder *proto = new serialization::WorkOrder;
+  proto->set_work_order_type(serialization::SORT_RUN_GENERATION);
+  proto->set_query_id(query_id_);
+
+  proto->SetExtension(serialization::SortRunGenerationWorkOrder::sort_config_index, sort_config_index_);
+  proto->SetExtension(serialization::SortRunGenerationWorkOrder::relation_id, input_relation_.getID());
+  proto->SetExtension(serialization::SortRunGenerationWorkOrder::insert_destination_index,
+                      output_destination_index_);
+  proto->SetExtension(serialization::SortRunGenerationWorkOrder::block_id, block);
+
+  return proto;
+}
+
+
 void SortRunGenerationWorkOrder::execute() {
   BlockReference block(
       storage_manager_->getBlock(input_block_id_, input_relation_));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SortRunGenerationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortRunGenerationOperator.hpp b/relational_operators/SortRunGenerationOperator.hpp
index f96e6a6..96a3ce1 100644
--- a/relational_operators/SortRunGenerationOperator.hpp
+++ b/relational_operators/SortRunGenerationOperator.hpp
@@ -40,8 +40,11 @@ namespace quickstep {
 class CatalogRelationSchema;
 class InsertDestination;
 class StorageManager;
+class WorkOrderProtosContainer;
 class WorkOrdersContainer;
 
+namespace serialization { class WorkOrder; }
+
 /**
  * \defgroup Sort Sorting
  * \ingroup RelationalOperators
@@ -112,6 +115,8 @@ class SortRunGenerationOperator : public RelationalOperator {
                         const tmb::client_id scheduler_client_id,
                         tmb::MessageBus *bus) override;
 
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
   void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id)
override {
     DCHECK(input_relation_id == input_relation_.getID());
     input_relation_block_ids_.push_back(input_block_id);
@@ -133,6 +138,13 @@ class SortRunGenerationOperator : public RelationalOperator {
   }
 
  private:
+  /**
+   * @brief Create Work Order proto.
+   *
+   * @param block The block id used in the Work Order.
+   **/
+  serialization::WorkOrder* createWorkOrderProto(const block_id block);
+
   const CatalogRelation &input_relation_;
 
   const CatalogRelation &output_relation_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/TableGeneratorOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableGeneratorOperator.cpp b/relational_operators/TableGeneratorOperator.cpp
index a3f9340..d5a08ec 100644
--- a/relational_operators/TableGeneratorOperator.cpp
+++ b/relational_operators/TableGeneratorOperator.cpp
@@ -1,6 +1,6 @@
 /**
  *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- *   University of Wisconsin—Madison.
+ *     University of Wisconsin—Madison.
  *   Copyright 2016 Pivotal Software, Inc.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
@@ -20,7 +20,9 @@
 
 #include "expressions/table_generator/GeneratorFunctionHandle.hpp"
 #include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
 #include "storage/InsertDestination.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 
@@ -53,6 +55,22 @@ bool TableGeneratorOperator::getAllWorkOrders(
   return started_;
 }
 
+bool TableGeneratorOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  if (!started_) {
+    serialization::WorkOrder *proto = new serialization::WorkOrder;
+    proto->set_work_order_type(serialization::TABLE_GENERATOR);
+    proto->set_query_id(query_id_);
+
+    proto->SetExtension(serialization::TableGeneratorWorkOrder::generator_function_index,
generator_function_index_);
+    proto->SetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index,
output_destination_index_);
+
+    container->addWorkOrderProto(proto, op_index_);
+    started_ = true;
+  }
+  return true;
+}
+
+
 void TableGeneratorWorkOrder::execute() {
   ColumnVectorsValueAccessor temp_result;
   function_handle_.populateColumns(&temp_result);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/TableGeneratorOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableGeneratorOperator.hpp b/relational_operators/TableGeneratorOperator.hpp
index 6a6af4b..1b791a6 100644
--- a/relational_operators/TableGeneratorOperator.hpp
+++ b/relational_operators/TableGeneratorOperator.hpp
@@ -1,6 +1,6 @@
 /**
  *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- *   University of Wisconsin—Madison.
+ *     University of Wisconsin—Madison.
  *   Copyright 2016 Pivotal Software, Inc.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
@@ -40,6 +40,7 @@ namespace quickstep {
 class GeneratorFunctionHandle;
 class InsertDestination;
 class StorageManager;
+class WorkOrderProtosContainer;
 class WorkOrdersContainer;
 
 /** \addtogroup RelationalOperators
@@ -81,6 +82,8 @@ class TableGeneratorOperator : public RelationalOperator {
                         const tmb::client_id scheduler_client_id,
                         tmb::MessageBus *bus) override;
 
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
   void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id)
override {
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index d2fd0cd..49c9150 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -22,6 +22,7 @@
 #include <algorithm>
 #include <cctype>
 #include <cstddef>
+#include <cstdint>
 #include <cstdio>
 #include <cstdlib>
 #include <memory>
@@ -31,21 +32,46 @@
 
 #include "catalog/CatalogAttribute.hpp"
 #include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
 #include "storage/InsertDestination.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
-#include "types/containers/Tuple.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/containers/Tuple.hpp"
 #include "utility/Glob.hpp"
 
+#include "gflags/gflags.h"
 #include "glog/logging.h"
 
 #include "tmb/id_typedefs.h"
 
+using std::size_t;
+using std::string;
+
 namespace quickstep {
 
+// Text segment size set to 256KB.
+DEFINE_uint64(textscan_text_segment_size, 0x40000,
+              "Size of text segment in bytes the input text files "
+              "are split into in the TextScanOperator.");
+
+// Check if the segment size is positive.
+static bool ValidateTextScanTextSegmentSize(const char *flagname,
+                                            std::uint64_t text_segment_size) {
+  if (text_segment_size == 0) {
+    LOG(ERROR) << "--" << flagname << " must be greater than 0";
+    return false;
+  }
+
+  return true;
+}
+
+static const volatile bool text_scan_text_segment_size_dummy = gflags::RegisterFlagValidator(
+    &FLAGS_textscan_text_segment_size, &ValidateTextScanTextSegmentSize);
+
 bool TextScanOperator::getAllWorkOrders(
     WorkOrdersContainer *container,
     QueryContext *query_context,
@@ -56,16 +82,12 @@ bool TextScanOperator::getAllWorkOrders(
 
   const std::vector<std::string> files = utility::file::GlobExpand(file_pattern_);
 
-  if (files.size() == 0) {
-    LOG(FATAL) << "No files matched '" << file_pattern_ << "'. Exiting.";
-  }
+  CHECK_NE(files.size(), 0u)
+      << "No files matched '" << file_pattern_ << "'. Exiting.";
 
   InsertDestination *output_destination =
       query_context->getInsertDestination(output_destination_index_);
 
-  // Text segment size set to 256KB.
-  constexpr std::size_t kTextSegmentSize = 0x40000u;
-
   if (blocking_dependencies_met_ && !work_generated_) {
     for (const std::string &file : files) {
       // Use standard C libary to retrieve the file size.
@@ -75,18 +97,32 @@ bool TextScanOperator::getAllWorkOrders(
       std::fclose(fp);
 
       std::size_t text_offset = 0;
-      while (text_offset < file_size) {
+      for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size;
+           num_full_segments > 0;
+           --num_full_segments, text_offset += FLAGS_textscan_text_segment_size) {
+        container->addNormalWorkOrder(
+            new TextScanWorkOrder(query_id_,
+                                  file,
+                                  text_offset,
+                                  FLAGS_textscan_text_segment_size,
+                                  field_terminator_,
+                                  process_escape_sequences_,
+                                  output_destination),
+            op_index_);
+      }
+
+      // Deal with the residual partial segment whose size is less than
+      // 'FLAGS_textscan_text_segment_size'.
+      if (text_offset < file_size) {
         container->addNormalWorkOrder(
             new TextScanWorkOrder(query_id_,
                                   file,
                                   text_offset,
-                                  std::min(kTextSegmentSize, file_size - text_offset),
+                                  file_size - text_offset,
                                   field_terminator_,
                                   process_escape_sequences_,
-                                  output_destination,
-                                  storage_manager),
+                                  output_destination),
             op_index_);
-        text_offset += kTextSegmentSize;
       }
     }
     work_generated_ = true;
@@ -94,24 +130,53 @@ bool TextScanOperator::getAllWorkOrders(
   return work_generated_;
 }
 
-TextScanWorkOrder::TextScanWorkOrder(const std::size_t query_id,
-                                     const std::string &filename,
-                                     const std::size_t text_offset,
-                                     const std::size_t text_segment_size,
-                                     const char field_terminator,
-                                     const bool process_escape_sequences,
-                                     InsertDestination *output_destination,
-                                     StorageManager *storage_manager)
-    : WorkOrder(query_id),
-      filename_(filename),
-      text_offset_(text_offset),
-      text_segment_size_(text_segment_size),
-      field_terminator_(field_terminator),
-      process_escape_sequences_(process_escape_sequences),
-      output_destination_(output_destination),
-      storage_manager_(storage_manager) {
-  DCHECK(output_destination_ != nullptr);
-  DCHECK(storage_manager_ != nullptr);
+bool TextScanOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  const std::vector<std::string> files = utility::file::GlobExpand(file_pattern_);
+  if (blocking_dependencies_met_ && !work_generated_) {
+    for (const string &file : files) {
+      // Use standard C libary to retrieve the file size.
+      FILE *fp = std::fopen(file.c_str(), "rb");
+      std::fseek(fp, 0, SEEK_END);
+      const std::size_t file_size = std::ftell(fp);
+      std::fclose(fp);
+
+      size_t text_offset = 0;
+      for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size;
+           num_full_segments > 0;
+           --num_full_segments, text_offset += FLAGS_textscan_text_segment_size) {
+        container->addWorkOrderProto(createWorkOrderProto(file, text_offset, FLAGS_textscan_text_segment_size),
+                                     op_index_);
+      }
+
+      // Deal with the residual partial segment whose size is less than
+      // 'FLAGS_textscan_text_segment_size'.
+      if (text_offset < file_size) {
+        container->addWorkOrderProto(createWorkOrderProto(file, text_offset, file_size
- text_offset),
+                                     op_index_);
+      }
+    }
+    work_generated_ = true;
+  }
+  return work_generated_;
+}
+
+serialization::WorkOrder* TextScanOperator::createWorkOrderProto(const string &filename,
+                                                                 const size_t text_offset,
+                                                                 const size_t text_segment_size)
{
+  serialization::WorkOrder *proto = new serialization::WorkOrder;
+  proto->set_work_order_type(serialization::TEXT_SCAN);
+  proto->set_query_id(query_id_);
+
+  proto->SetExtension(serialization::TextScanWorkOrder::filename, filename);
+  proto->SetExtension(serialization::TextScanWorkOrder::text_offset, text_offset);
+  proto->SetExtension(serialization::TextScanWorkOrder::text_segment_size, text_segment_size);
+  proto->SetExtension(serialization::TextScanWorkOrder::field_terminator, field_terminator_);
+  proto->SetExtension(serialization::TextScanWorkOrder::process_escape_sequences,
+                      process_escape_sequences_);
+  proto->SetExtension(serialization::TextScanWorkOrder::insert_destination_index,
+                      output_destination_index_);
+
+  return proto;
 }
 
 void TextScanWorkOrder::execute() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index d73e7dd..1a62ded 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -33,6 +33,8 @@
 #include "types/containers/Tuple.hpp"
 #include "utility/Macros.hpp"
 
+#include "glog/logging.h"
+
 #include "tmb/id_typedefs.h"
 
 namespace tmb { class MessageBus; }
@@ -42,8 +44,11 @@ namespace quickstep {
 class CatalogRelationSchema;
 class InsertDestination;
 class StorageManager;
+class WorkOrderProtosContainer;
 class WorkOrdersContainer;
 
+namespace serialization { class WorkOrder; }
+
 /** \addtogroup RelationalOperators
  *  @{
  */
@@ -135,6 +140,8 @@ class TextScanOperator : public RelationalOperator {
                         const tmb::client_id scheduler_client_id,
                         tmb::MessageBus *bus) override;
 
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
   QueryContext::insert_destination_id getInsertDestinationID() const override {
     return output_destination_index_;
   }
@@ -144,6 +151,10 @@ class TextScanOperator : public RelationalOperator {
   }
 
  private:
+  serialization::WorkOrder* createWorkOrderProto(const std::string &filename,
+                                                 const std::size_t text_offset,
+                                                 const std::size_t text_segment_size);
+
   const std::string file_pattern_;
   const char field_terminator_;
   const bool process_escape_sequences_;
@@ -173,7 +184,6 @@ class TextScanWorkOrder : public WorkOrder {
    * @param process_escape_sequences Whether to decode escape sequences in the
    *        text file.
    * @param output_destination The InsertDestination to insert tuples.
-   * @param storage_manager The StorageManager to use.
    **/
   TextScanWorkOrder(
       const std::size_t query_id,
@@ -182,8 +192,14 @@ class TextScanWorkOrder : public WorkOrder {
       const std::size_t text_segment_size,
       const char field_terminator,
       const bool process_escape_sequences,
-      InsertDestination *output_destination,
-      StorageManager *storage_manager);
+      InsertDestination *output_destination)
+      : WorkOrder(query_id),
+        filename_(filename),
+        text_offset_(text_offset),
+        text_segment_size_(text_segment_size),
+        field_terminator_(field_terminator),
+        process_escape_sequences_(process_escape_sequences),
+        output_destination_(DCHECK_NOTNULL(output_destination)) {}
 
   ~TextScanWorkOrder() override {}
 
@@ -233,7 +249,6 @@ class TextScanWorkOrder : public WorkOrder {
   Tuple parseRow(const char **row_ptr,
                  const CatalogRelationSchema &relation) const;
 
-
   /**
    * @brief Parse up to three octal digits (0-7) starting at \p *literal_ptr as
    *        a char literal. \p *literal_ptr will be modified to the last position
@@ -297,7 +312,6 @@ class TextScanWorkOrder : public WorkOrder {
   const bool process_escape_sequences_;
 
   InsertDestination *output_destination_;
-  StorageManager *storage_manager_;
 
   DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/TextScanOperator.proto
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.proto b/relational_operators/TextScanOperator.proto
deleted file mode 100644
index 8ead3f3..0000000
--- a/relational_operators/TextScanOperator.proto
+++ /dev/null
@@ -1,22 +0,0 @@
-//   Copyright 2015 Pivotal Software, Inc.
-//
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//       http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-
-syntax = "proto2";
-
-package quickstep.serialization;
-
-message TextBlob {
-  required fixed64 blob_id = 1;
-  required uint64 size = 2;
-}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index 2130563..1b2979e 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -26,7 +26,9 @@
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
 #include "storage/InsertDestination.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -72,6 +74,27 @@ bool UpdateOperator::getAllWorkOrders(
   return started_;
 }
 
+bool UpdateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  if (blocking_dependencies_met_ && !started_) {
+    for (const block_id input_block_id : input_blocks_) {
+      serialization::WorkOrder *proto = new serialization::WorkOrder;
+      proto->set_work_order_type(serialization::UPDATE);
+      proto->set_query_id(query_id_);
+
+      proto->SetExtension(serialization::UpdateWorkOrder::operator_index, op_index_);
+      proto->SetExtension(serialization::UpdateWorkOrder::relation_id, relation_.getID());
+      proto->SetExtension(serialization::UpdateWorkOrder::insert_destination_index, relocation_destination_index_);
+      proto->SetExtension(serialization::UpdateWorkOrder::predicate_index, predicate_index_);
+      proto->SetExtension(serialization::UpdateWorkOrder::update_group_index, update_group_index_);
+      proto->SetExtension(serialization::UpdateWorkOrder::block_id, input_block_id);
+
+      container->addWorkOrderProto(proto, op_index_);
+    }
+    started_ = true;
+  }
+  return started_;
+}
+
 void UpdateWorkOrder::execute() {
   MutableBlockReference block(
       storage_manager_->getBlockMutable(input_block_id_, relation_));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/UpdateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.hpp b/relational_operators/UpdateOperator.hpp
index cebb9b5..f6c5053 100644
--- a/relational_operators/UpdateOperator.hpp
+++ b/relational_operators/UpdateOperator.hpp
@@ -45,6 +45,7 @@ class InsertDestination;
 class Predicate;
 class Scalar;
 class StorageManager;
+class WorkOrderProtosContainer;
 class WorkOrdersContainer;
 
 /** \addtogroup RelationalOperators
@@ -99,6 +100,8 @@ class UpdateOperator : public RelationalOperator {
                         const tmb::client_id scheduler_client_id,
                         tmb::MessageBus *bus) override;
 
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
   QueryContext::insert_destination_id getInsertDestinationID() const override {
     return relocation_destination_index_;
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 60d4c8f..3ed065a 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -232,23 +232,14 @@ message TextScanWorkOrder {
   }
 }
 
-message TextSplitWorkOrder {
-  extend WorkOrder {
-    // All required.
-    optional uint64 operator_index = 320;
-    optional string filename = 321;
-    optional bool process_escape_sequences = 322;
-  }
-}
-
 message UpdateWorkOrder {
   extend WorkOrder {
     // All required.
-    optional uint64 operator_index = 336;
-    optional int32 relation_id = 337;
-    optional int32 insert_destination_index = 338;
-    optional int32 predicate_index = 339;
-    optional uint32 update_group_index = 340;
-    optional fixed64 block_id = 341;
+    optional uint64 operator_index = 320;
+    optional int32 relation_id = 321;
+    optional int32 insert_destination_index = 322;
+    optional int32 predicate_index = 323;
+    optional uint32 update_group_index = 324;
+    optional fixed64 block_id = 325;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index da42b4d..e078b84 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -396,8 +396,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
           proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
           query_context->getInsertDestination(
-              proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
-          storage_manager);
+              proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)));
     }
     case serialization::UPDATE: {
       LOG(INFO) << "Creating UpdateWorkOrder";
@@ -425,6 +424,10 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
 bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
                                     const CatalogDatabaseLite &catalog_database,
                                     const QueryContext &query_context) {
+  if (!proto.IsInitialized()) {
+    return false;
+  }
+
   switch (proto.work_order_type()) {
     case serialization::AGGREGATION: {
       return proto.HasExtension(serialization::AggregationWorkOrder::block_id) &&


Mime
View raw message