quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject incubator-quickstep git commit: Removed an unnecessary API in RelationalOperator.
Date Thu, 24 Aug 2017 19:25:32 GMT
Repository: incubator-quickstep
Updated Branches:
  refs/heads/master e95fee93c -> c018882f5


Removed an unnecessary API in RelationalOperator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c018882f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c018882f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c018882f

Branch: refs/heads/master
Commit: c018882f51372fde91672a506298a7003471d82a
Parents: e95fee9
Author: Zuyu Zhang <zuyu@cs.wisc.edu>
Authored: Sun Aug 20 22:37:15 2017 -0500
Committer: Zuyu Zhang <zuyu@cs.wisc.edu>
Committed: Sun Aug 20 23:12:09 2017 -0500

----------------------------------------------------------------------
 query_execution/QueryManagerBase.cpp            |  13 +-
 query_execution/QueryManagerDistributed.cpp     |   1 -
 query_execution/QueryManagerSingleNode.cpp      |   1 -
 .../tests/QueryManagerSingleNode_unittest.cpp   |  31 +--
 relational_operators/DeleteOperator.cpp         |  44 +++--
 .../DestroyAggregationStateOperator.cpp         |  41 ++--
 relational_operators/DestroyHashOperator.cpp    |  42 +++--
 relational_operators/DropTableOperator.cpp      |  50 ++---
 .../FinalizeAggregationOperator.cpp             |  79 ++++----
 relational_operators/HashJoinOperator.cpp       | 188 +++++++++----------
 .../InitializeAggregationOperator.cpp           |  35 ++--
 relational_operators/InsertOperator.cpp         |  45 ++---
 relational_operators/RelationalOperator.hpp     |  18 +-
 relational_operators/SampleOperator.cpp         |  63 ++++---
 relational_operators/TableGeneratorOperator.cpp |  50 ++---
 relational_operators/TextScanOperator.cpp       | 118 ++++++------
 relational_operators/UpdateOperator.cpp         |  79 ++++----
 .../WindowAggregationOperator.cpp               |  39 ++--
 .../tests/AggregationOperator_unittest.cpp      |   4 -
 .../tests/HashJoinOperator_unittest.cpp         |  27 ---
 .../tests/TextScanOperator_unittest.cpp         |   1 -
 21 files changed, 455 insertions(+), 514 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index f84ad4e..14c9ba5 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -70,14 +70,14 @@ QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
     for (const pair<dag_node_index, bool> &dependent_link :
          query_dag_->getDependents(node_index)) {
       const dag_node_index dependent_op_index = dependent_link.first;
-      if (!query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
-        // The link is not a pipeline-breaker. Streaming of blocks is possible
-        // between these two operators.
-        output_consumers_[node_index].push_back(dependent_op_index);
-      } else {
+      if (query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
         // The link is a pipeline-breaker. Streaming of blocks is not possible
         // between these two operators.
         blocking_dependencies_[dependent_op_index].push_back(node_index);
+      } else {
+        // The link is not a pipeline-breaker. Streaming of blocks is possible
+        // between these two operators.
+        output_consumers_[node_index].push_back(dependent_op_index);
       }
     }
   }
@@ -231,9 +231,6 @@ void QueryManagerBase::markOperatorFinished(const dag_node_index index) {
     if (output_rel >= 0) {
       dependent_op->doneFeedingInputBlocks(output_rel);
     }
-    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-      dependent_op->informAllBlockingDependenciesMet();
-    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index a248391..1144e9f 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -70,7 +70,6 @@ QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
   // Collect all the workorders from all the relational operators in the DAG.
   for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
     if (checkAllBlockingDependenciesMet(index)) {
-      query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
       processOperator(index, false);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/query_execution/QueryManagerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
index f33a501..001faa8 100644
--- a/query_execution/QueryManagerSingleNode.cpp
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -64,7 +64,6 @@ QueryManagerSingleNode::QueryManagerSingleNode(
   // Collect all the workorders from all the relational operators in the DAG.
   for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
     if (checkAllBlockingDependenciesMet(index)) {
-      query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
       processOperator(index, false);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/query_execution/tests/QueryManagerSingleNode_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
index 9183d32..c65364c 100644
--- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp
+++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
@@ -134,11 +134,6 @@ class MockOperator: public RelationalOperator {
     }
   }
 
-  inline bool getBlockingDependenciesMet() const {
-    MOCK_OP_LOG(3) << "met.";
-    return blocking_dependencies_met_;
-  }
-
   void setInsertDestinationID(const QueryContext::insert_destination_id insert_destination_index) {
     insert_destination_index_ = insert_destination_index;
   }
@@ -165,7 +160,7 @@ class MockOperator: public RelationalOperator {
           ++num_workorders_generated_;
         }
       } else {
-        if (blocking_dependencies_met_ && (num_workorders_generated_ < max_workorders_)) {
+        if (num_workorders_generated_ < max_workorders_) {
           MOCK_OP_LOG(3) << "[static] generate WorkOrder";
           container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
           ++num_workorders_generated_;
@@ -304,9 +299,6 @@ TEST_F(QueryManagerTest, SingleNodeDAGNoWorkOrdersTest) {
 
   constructQueryManager();
 
-  // op doesn't have any dependencies.
-  EXPECT_TRUE(op.getBlockingDependenciesMet());
-
   // We expect one call for op's getAllWorkOrders().
   EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
   EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
@@ -322,9 +314,6 @@ TEST_F(QueryManagerTest, SingleNodeDAGStaticWorkOrdersTest) {
 
   constructQueryManager();
 
-  // op doesn't have any dependencies.
-  EXPECT_TRUE(op.getBlockingDependenciesMet());
-
   // We expect one call for op's getAllWorkOrders().
   EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
   EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
@@ -372,9 +361,6 @@ TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
 
   constructQueryManager();
 
-  // op doesn't have any dependencies.
-  EXPECT_TRUE(op.getBlockingDependenciesMet());
-
   for (int i = 0; i < 3; ++i) {
     // We expect one call for op's getAllWorkOrders().
     EXPECT_EQ(i + 1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
@@ -437,9 +423,6 @@ TEST_F(QueryManagerTest, TwoNodesDAGBlockingLinkTest) {
 
   constructQueryManager();
 
-  // op1 doesn't have any dependencies
-  EXPECT_TRUE(op1.getBlockingDependenciesMet());
-
   // Only op1 should receive a call to getAllWorkOrders initially.
   EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
   EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
@@ -488,9 +471,6 @@ TEST_F(QueryManagerTest, TwoNodesDAGBlockingLinkTest) {
 
   EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
 
-  // op1 is op2's blocking dependency.
-  EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
   EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
   // op2 should get first call of getAllWorkOrders() when op1 is over.
   EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
@@ -541,11 +521,6 @@ TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
 
   constructQueryManager();
 
-  // As none of the operators have a blocking link, blocking dependencies should
-  // be met.
-  EXPECT_TRUE(op1.getBlockingDependenciesMet());
-  EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
   EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
   EXPECT_EQ(1, op1.getNumWorkOrders());
   EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
@@ -686,10 +661,6 @@ TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
   static_cast<BlockPoolInsertDestination *>(insert_destination)
       ->available_block_refs_.push_back(move(block_ref));
 
-  // There's no blocking dependency in the DAG.
-  EXPECT_TRUE(op1.getBlockingDependenciesMet());
-  EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
   EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
   EXPECT_EQ(1, op1.getNumWorkOrders());
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/DeleteOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp
index 14cbf6f..a8723a9 100644
--- a/relational_operators/DeleteOperator.cpp
+++ b/relational_operators/DeleteOperator.cpp
@@ -54,22 +54,24 @@ bool DeleteOperator::getAllWorkOrders(
 
   if (relation_is_stored_) {
     // If relation_ is stored, iterate over the list of blocks in relation_.
-    if (!started_) {
-      for (const block_id input_block_id : relation_block_ids_) {
-        container->addNormalWorkOrder(
-            new DeleteWorkOrder(query_id_,
-                                relation_,
-                                input_block_id,
-                                predicate,
-                                storage_manager,
-                                op_index_,
-                                scheduler_client_id,
-                                bus),
-            op_index_);
-      }
-      started_ = true;
+    if (started_) {
+      return true;
     }
-    return started_;
+
+    for (const block_id input_block_id : relation_block_ids_) {
+      container->addNormalWorkOrder(
+          new DeleteWorkOrder(query_id_,
+                              relation_,
+                              input_block_id,
+                              predicate,
+                              storage_manager,
+                              op_index_,
+                              scheduler_client_id,
+                              bus),
+          op_index_);
+    }
+    started_ = true;
+    return true;
   } else {
     while (num_workorders_generated_ < relation_block_ids_.size()) {
       container->addNormalWorkOrder(
@@ -91,12 +93,14 @@ bool DeleteOperator::getAllWorkOrders(
 bool DeleteOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
   if (relation_is_stored_) {
     // If relation_ is stored, iterate over the list of blocks in relation_.
-    if (!started_) {
-      for (const block_id input_block_id : relation_block_ids_) {
-        container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_);
-      }
-      started_ = true;
+    if (started_) {
+      return true;
+    }
+
+    for (const block_id input_block_id : relation_block_ids_) {
+      container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_);
     }
+    started_ = true;
     return true;
   } else {
     while (num_workorders_generated_ < relation_block_ids_.size()) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/DestroyAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp
index 013bf18..3d36e20 100644
--- a/relational_operators/DestroyAggregationStateOperator.cpp
+++ b/relational_operators/DestroyAggregationStateOperator.cpp
@@ -35,32 +35,35 @@ bool DestroyAggregationStateOperator::getAllWorkOrders(
     StorageManager *storage_manager,
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
-  if (blocking_dependencies_met_ && !work_generated_) {
-    work_generated_ = true;
-    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
-      container->addNormalWorkOrder(
-          new DestroyAggregationStateWorkOrder(query_id_, aggr_state_index_, part_id, query_context),
-          op_index_);
-    }
+  if (work_generated_) {
+    return true;
   }
-  return work_generated_;
+
+  for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+    container->addNormalWorkOrder(
+        new DestroyAggregationStateWorkOrder(query_id_, aggr_state_index_, part_id, query_context),
+        op_index_);
+  }
+  work_generated_ = true;
+  return true;
 }
 
 bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
-  if (blocking_dependencies_met_ && !work_generated_) {
-    work_generated_ = true;
+  if (work_generated_) {
+    return true;
+  }
 
-    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
-      serialization::WorkOrder *proto = new serialization::WorkOrder;
-      proto->set_work_order_type(serialization::DESTROY_AGGREGATION_STATE);
-      proto->set_query_id(query_id_);
-      proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index, aggr_state_index_);
-      proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::partition_id, part_id);
+  for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+    serialization::WorkOrder *proto = new serialization::WorkOrder;
+    proto->set_work_order_type(serialization::DESTROY_AGGREGATION_STATE);
+    proto->set_query_id(query_id_);
+    proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index, aggr_state_index_);
+    proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::partition_id, part_id);
 
-      container->addWorkOrderProto(proto, op_index_);
-    }
+    container->addWorkOrderProto(proto, op_index_);
   }
-  return work_generated_;
+  work_generated_ = true;
+  return true;
 }
 
 void DestroyAggregationStateWorkOrder::execute() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/DestroyHashOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyHashOperator.cpp b/relational_operators/DestroyHashOperator.cpp
index 5b84bba..d9ea19b 100644
--- a/relational_operators/DestroyHashOperator.cpp
+++ b/relational_operators/DestroyHashOperator.cpp
@@ -34,32 +34,36 @@ bool DestroyHashOperator::getAllWorkOrders(
     StorageManager *storage_manager,
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
-  if (blocking_dependencies_met_ && !work_generated_) {
-    for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
-      container->addNormalWorkOrder(
-          new DestroyHashWorkOrder(query_id_, hash_table_index_, part_id, query_context),
-          op_index_);
-    }
-    work_generated_ = true;
+  if (work_generated_) {
+    return true;
   }
-  return work_generated_;
+
+  for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+    container->addNormalWorkOrder(
+        new DestroyHashWorkOrder(query_id_, hash_table_index_, part_id, query_context),
+        op_index_);
+  }
+  work_generated_ = true;
+  return true;
 }
 
 bool DestroyHashOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
-  if (blocking_dependencies_met_ && !work_generated_) {
-    for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
-      serialization::WorkOrder *proto = new serialization::WorkOrder;
-      proto->set_work_order_type(serialization::DESTROY_HASH);
-      proto->set_query_id(query_id_);
-      proto->SetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index, hash_table_index_);
-      proto->SetExtension(serialization::DestroyHashWorkOrder::partition_id, part_id);
+  if (work_generated_) {
+    return true;
+  }
 
-      container->addWorkOrderProto(proto, op_index_);
-    }
+  for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+    serialization::WorkOrder *proto = new serialization::WorkOrder;
+    proto->set_work_order_type(serialization::DESTROY_HASH);
+    proto->set_query_id(query_id_);
+    proto->SetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index, hash_table_index_);
+    proto->SetExtension(serialization::DestroyHashWorkOrder::partition_id, part_id);
 
-    work_generated_ = true;
+    container->addWorkOrderProto(proto, op_index_);
   }
-  return work_generated_;
+
+  work_generated_ = true;
+  return true;
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/DropTableOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DropTableOperator.cpp b/relational_operators/DropTableOperator.cpp
index 5cd5ebc..59f72a0 100644
--- a/relational_operators/DropTableOperator.cpp
+++ b/relational_operators/DropTableOperator.cpp
@@ -42,42 +42,44 @@ bool DropTableOperator::getAllWorkOrders(
     StorageManager *storage_manager,
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
-  if (blocking_dependencies_met_ && !work_generated_) {
-    work_generated_ = true;
+  if (work_generated_) {
+    return true;
+  }
 
-    std::vector<block_id> relation_blocks(relation_.getBlocksSnapshot());
+  std::vector<block_id> relation_blocks(relation_.getBlocksSnapshot());
 
-    // DropTableWorkOrder only drops blocks, if any.
-    container->addNormalWorkOrder(
-        new DropTableWorkOrder(
-            query_id_, std::move(relation_blocks), storage_manager),
-        op_index_);
+  // DropTableWorkOrder only drops blocks, if any.
+  container->addNormalWorkOrder(
+      new DropTableWorkOrder(
+          query_id_, std::move(relation_blocks), storage_manager),
+      op_index_);
 
-    database_->setStatus(CatalogDatabase::Status::kPendingBlockDeletions);
-  }
+  database_->setStatus(CatalogDatabase::Status::kPendingBlockDeletions);
 
-  return work_generated_;
+  work_generated_ = true;
+  return true;
 }
 
 bool DropTableOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
-  if (blocking_dependencies_met_ && !work_generated_) {
-    work_generated_ = true;
+  if (work_generated_) {
+    return true;
+  }
 
-    serialization::WorkOrder *proto = new serialization::WorkOrder;
-    proto->set_work_order_type(serialization::DROP_TABLE);
-    proto->set_query_id(query_id_);
+  serialization::WorkOrder *proto = new serialization::WorkOrder;
+  proto->set_work_order_type(serialization::DROP_TABLE);
+  proto->set_query_id(query_id_);
 
-    std::vector<block_id> relation_blocks(relation_.getBlocksSnapshot());
-    for (const block_id relation_block : relation_blocks) {
-      proto->AddExtension(serialization::DropTableWorkOrder::block_ids, relation_block);
-    }
+  std::vector<block_id> relation_blocks(relation_.getBlocksSnapshot());
+  for (const block_id relation_block : relation_blocks) {
+    proto->AddExtension(serialization::DropTableWorkOrder::block_ids, relation_block);
+  }
 
-    container->addWorkOrderProto(proto, op_index_);
+  container->addWorkOrderProto(proto, op_index_);
 
-    database_->setStatus(CatalogDatabase::Status::kPendingBlockDeletions);
-  }
+  database_->setStatus(CatalogDatabase::Status::kPendingBlockDeletions);
 
-  return work_generated_;
+  work_generated_ = true;
+  return true;
 }
 
 void DropTableOperator::updateCatalogOnCompletion() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 8283437..efa4cba 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -40,56 +40,59 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
     StorageManager *storage_manager,
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
-  DCHECK(query_context != nullptr);
-
-  if (blocking_dependencies_met_ && !started_) {
-    started_ = true;
+  if (started_) {
+    return true;
+  }
 
-    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
-      AggregationOperationState *agg_state =
-          query_context->getAggregationState(aggr_state_index_, part_id);
-      DCHECK(agg_state != nullptr);
-      for (std::size_t state_part_id = 0;
-           state_part_id < aggr_state_num_partitions_;
-           ++state_part_id) {
-        container->addNormalWorkOrder(
-            new FinalizeAggregationWorkOrder(
-                query_id_,
-                part_id,
-                state_part_id,
-                agg_state,
-                query_context->getInsertDestination(output_destination_index_)),
-            op_index_);
-      }
+  DCHECK(query_context != nullptr);
+  for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+    AggregationOperationState *agg_state =
+        query_context->getAggregationState(aggr_state_index_, part_id);
+    DCHECK(agg_state != nullptr);
+    for (std::size_t state_part_id = 0;
+         state_part_id < aggr_state_num_partitions_;
+         ++state_part_id) {
+      container->addNormalWorkOrder(
+          new FinalizeAggregationWorkOrder(
+              query_id_,
+              part_id,
+              state_part_id,
+              agg_state,
+              query_context->getInsertDestination(output_destination_index_)),
+          op_index_);
     }
   }
-  return started_;
+
+  started_ = true;
+  return true;
 }
 
 // TODO(quickstep-team) : Think about how the number of partitions could be
 // accessed in this function. Until then, we can't use partitioned aggregation
 // finalization with the distributed version.
 bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
-  if (blocking_dependencies_met_ && !started_) {
-    started_ = true;
+  if (started_) {
+    return true;
+  }
 
-    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
-      serialization::WorkOrder *proto = new serialization::WorkOrder;
-      proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
-      proto->set_query_id(query_id_);
-      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
-                          aggr_state_index_);
-      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id,
-                          part_id);
-      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id,
-                          0u);
-      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
-                          output_destination_index_);
+  for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+    serialization::WorkOrder *proto = new serialization::WorkOrder;
+    proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
+    proto->set_query_id(query_id_);
+    proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
+                        aggr_state_index_);
+    proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id,
+                        part_id);
+    proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id,
+                        0u);
+    proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
+                        output_destination_index_);
 
-      container->addWorkOrderProto(proto, op_index_);
-    }
+    container->addWorkOrderProto(proto, op_index_);
   }
-  return started_;
+
+  started_ = true;
+  return true;
 }
 
 void FinalizeAggregationWorkOrder::execute() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index d3e7e08..e385e46 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -202,56 +202,53 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
     WorkOrdersContainer *container,
     QueryContext *query_context,
     StorageManager *storage_manager) {
-  // We wait until the building of global hash table is complete.
-  if (blocking_dependencies_met_) {
-    DCHECK(query_context != nullptr);
-
-    const Predicate *residual_predicate =
-        query_context->getPredicate(residual_predicate_index_);
-    const vector<unique_ptr<const Scalar>> &selection =
-        query_context->getScalarGroup(selection_index_);
-    InsertDestination *output_destination =
-        query_context->getInsertDestination(output_destination_index_);
-
-    if (probe_relation_is_stored_) {
-      if (started_) {
-        return true;
-      }
+  DCHECK(query_context != nullptr);
 
-      for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
-        const JoinHashTable &hash_table =
-            *(query_context->getJoinHashTable(hash_table_index_, part_id));
-
-        for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) {
-          container->addNormalWorkOrder(
-              new JoinWorkOrderClass(query_id_, build_relation_, probe_relation_, join_key_attributes_,
-                                     any_join_key_attributes_nullable_, part_id, probe_block_id, residual_predicate,
-                                     selection, hash_table, output_destination, storage_manager,
-                                     CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
-              op_index_);
-        }
-      }
-      started_ = true;
+  const Predicate *residual_predicate =
+      query_context->getPredicate(residual_predicate_index_);
+  const vector<unique_ptr<const Scalar>> &selection =
+      query_context->getScalarGroup(selection_index_);
+  InsertDestination *output_destination =
+      query_context->getInsertDestination(output_destination_index_);
+
+  if (probe_relation_is_stored_) {
+    if (started_) {
       return true;
-    } else {
-      for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
-        const JoinHashTable &hash_table =
-            *(query_context->getJoinHashTable(hash_table_index_, part_id));
-
-        while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) {
-          container->addNormalWorkOrder(
-              new JoinWorkOrderClass(query_id_, build_relation_, probe_relation_, join_key_attributes_,
-                                     any_join_key_attributes_nullable_, part_id,
-                                     probe_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
-                                     residual_predicate, selection, hash_table, output_destination, storage_manager,
-                                     CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
-              op_index_);
-          ++num_workorders_generated_[part_id];
-        }  // end while
-      }  // end for
-      return done_feeding_input_relation_;
-    }  // end else (probe_relation_is_stored_)
-  }  // end if (blocking_dependencies_met_)
+    }
+
+    for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+      const JoinHashTable &hash_table =
+          *(query_context->getJoinHashTable(hash_table_index_, part_id));
+
+      for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) {
+        container->addNormalWorkOrder(
+            new JoinWorkOrderClass(query_id_, build_relation_, probe_relation_, join_key_attributes_,
+                                   any_join_key_attributes_nullable_, part_id, probe_block_id, residual_predicate,
+                                   selection, hash_table, output_destination, storage_manager,
+                                   CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
+            op_index_);
+      }
+    }
+    started_ = true;
+    return true;
+  } else {
+    for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+      const JoinHashTable &hash_table =
+          *(query_context->getJoinHashTable(hash_table_index_, part_id));
+
+      while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) {
+        container->addNormalWorkOrder(
+            new JoinWorkOrderClass(query_id_, build_relation_, probe_relation_, join_key_attributes_,
+                                   any_join_key_attributes_nullable_, part_id,
+                                   probe_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
+                                   residual_predicate, selection, hash_table, output_destination, storage_manager,
+                                   CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
+            op_index_);
+        ++num_workorders_generated_[part_id];
+      }  // end while
+    }  // end for
+    return done_feeding_input_relation_;
+  }  // end else (probe_relation_is_stored_)
   return false;
 }
 
@@ -259,56 +256,53 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders(
     WorkOrdersContainer *container,
     QueryContext *query_context,
     StorageManager *storage_manager) {
-  // We wait until the building of global hash table is complete.
-  if (blocking_dependencies_met_) {
-    DCHECK(query_context != nullptr);
+  DCHECK(query_context != nullptr);
 
-    const vector<unique_ptr<const Scalar>> &selection =
-        query_context->getScalarGroup(selection_index_);
+  const vector<unique_ptr<const Scalar>> &selection =
+      query_context->getScalarGroup(selection_index_);
 
-    InsertDestination *output_destination =
-        query_context->getInsertDestination(output_destination_index_);
+  InsertDestination *output_destination =
+      query_context->getInsertDestination(output_destination_index_);
 
-    if (probe_relation_is_stored_) {
-      if (started_) {
-        return true;
-      }
+  if (probe_relation_is_stored_) {
+    if (started_) {
+      return true;
+    }
 
-      for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
-        const JoinHashTable &hash_table =
-            *(query_context->getJoinHashTable(hash_table_index_, part_id));
-
-        for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) {
-          container->addNormalWorkOrder(
-              new HashOuterJoinWorkOrder(query_id_, build_relation_, probe_relation_, join_key_attributes_,
-                                         any_join_key_attributes_nullable_, part_id, probe_block_id, selection,
-                                         is_selection_on_build_, hash_table, output_destination, storage_manager,
-                                         CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
-              op_index_);
-        }
+    for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+      const JoinHashTable &hash_table =
+          *(query_context->getJoinHashTable(hash_table_index_, part_id));
+
+      for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) {
+        container->addNormalWorkOrder(
+            new HashOuterJoinWorkOrder(query_id_, build_relation_, probe_relation_, join_key_attributes_,
+                                       any_join_key_attributes_nullable_, part_id, probe_block_id, selection,
+                                       is_selection_on_build_, hash_table, output_destination, storage_manager,
+                                       CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
+            op_index_);
       }
-      started_ = true;
-      return true;
-    } else {
-      for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
-        const JoinHashTable &hash_table =
-            *(query_context->getJoinHashTable(hash_table_index_, part_id));
-
-        while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) {
-          container->addNormalWorkOrder(
-              new HashOuterJoinWorkOrder(query_id_, build_relation_, probe_relation_, join_key_attributes_,
-                                         any_join_key_attributes_nullable_, part_id,
-                                         probe_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
-                                         selection, is_selection_on_build_, hash_table, output_destination,
-                                         storage_manager,
-                                         CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
-              op_index_);
-          ++num_workorders_generated_[part_id];
-        }
+    }
+    started_ = true;
+    return true;
+  } else {
+    for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+      const JoinHashTable &hash_table =
+          *(query_context->getJoinHashTable(hash_table_index_, part_id));
+
+      while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) {
+        container->addNormalWorkOrder(
+            new HashOuterJoinWorkOrder(query_id_, build_relation_, probe_relation_, join_key_attributes_,
+                                       any_join_key_attributes_nullable_, part_id,
+                                       probe_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
+                                       selection, is_selection_on_build_, hash_table, output_destination,
+                                       storage_manager,
+                                       CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
+            op_index_);
+        ++num_workorders_generated_[part_id];
       }
-      return done_feeding_input_relation_;
-    }  // end else (probe_relation_is_stored_)
-  }  // end if (blocking_dependencies_met_)
+    }
+    return done_feeding_input_relation_;
+  }  // end else (probe_relation_is_stored_)
   return false;
 }
 
@@ -330,11 +324,6 @@ bool HashJoinOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container
 bool HashJoinOperator::getAllNonOuterJoinWorkOrderProtos(
     WorkOrderProtosContainer *container,
     const serialization::HashJoinWorkOrder::HashJoinWorkOrderType hash_join_type) {
-  // We wait until the building of global hash table is complete.
-  if (!blocking_dependencies_met_) {
-    return false;
-  }
-
   if (probe_relation_is_stored_) {
     if (started_) {
       return true;
@@ -396,11 +385,6 @@ serialization::WorkOrder* HashJoinOperator::createNonOuterJoinWorkOrderProto(
 }
 
 bool HashJoinOperator::getAllOuterJoinWorkOrderProtos(WorkOrderProtosContainer *container) {
-  // We wait until the building of global hash table is complete.
-  if (!blocking_dependencies_met_) {
-    return false;
-  }
-
   if (probe_relation_is_stored_) {
     if (started_) {
       return true;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/InitializeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp
index f91299d..136686b 100644
--- a/relational_operators/InitializeAggregationOperator.cpp
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -39,24 +39,27 @@ bool InitializeAggregationOperator::getAllWorkOrders(
     StorageManager *storage_manager,
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
-  if (!started_) {
-    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
-      AggregationOperationState *agg_state =
-          query_context->getAggregationState(aggr_state_index_, part_id);
-      DCHECK(agg_state != nullptr);
-
-      for (std::size_t state_part_id = 0;
-           state_part_id < aggr_state_num_init_partitions_;
-           ++state_part_id) {
-        container->addNormalWorkOrder(
-            new InitializeAggregationWorkOrder(query_id_,
-                                               state_part_id,
-                                               agg_state),
-            op_index_);
-      }
+  if (started_) {
+    return true;
+  }
+
+  for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+    AggregationOperationState *agg_state =
+        query_context->getAggregationState(aggr_state_index_, part_id);
+    DCHECK(agg_state != nullptr);
+
+    for (std::size_t state_part_id = 0;
+         state_part_id < aggr_state_num_init_partitions_;
+         ++state_part_id) {
+      container->addNormalWorkOrder(
+          new InitializeAggregationWorkOrder(query_id_,
+                                             state_part_id,
+                                             agg_state),
+          op_index_);
     }
-    started_ = true;
   }
+
+  started_ = true;
   return true;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/InsertOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InsertOperator.cpp b/relational_operators/InsertOperator.cpp
index 31c7fa8..fbd3a07 100644
--- a/relational_operators/InsertOperator.cpp
+++ b/relational_operators/InsertOperator.cpp
@@ -39,34 +39,37 @@ bool InsertOperator::getAllWorkOrders(
     StorageManager *storage_manager,
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
-  if (blocking_dependencies_met_ && !work_generated_) {
-    DCHECK(query_context != nullptr);
-
-    work_generated_ = true;
-    container->addNormalWorkOrder(
-        new InsertWorkOrder(
-            query_id_,
-            query_context->getInsertDestination(output_destination_index_),
-            query_context->releaseTuple(tuple_index_)),
-        op_index_);
+  if (work_generated_) {
+    return true;
   }
-  return work_generated_;
+
+  DCHECK(query_context != nullptr);
+  container->addNormalWorkOrder(
+      new InsertWorkOrder(
+          query_id_,
+          query_context->getInsertDestination(output_destination_index_),
+          query_context->releaseTuple(tuple_index_)),
+      op_index_);
+
+  work_generated_ = true;
+  return true;
 }
 
 bool InsertOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
-  if (blocking_dependencies_met_ && !work_generated_) {
-    work_generated_ = true;
+  if (work_generated_) {
+    return true;
+  }
 
-    serialization::WorkOrder *proto = new serialization::WorkOrder;
-    proto->set_work_order_type(serialization::INSERT);
-    proto->set_query_id(query_id_);
-    proto->SetExtension(serialization::InsertWorkOrder::insert_destination_index, output_destination_index_);
-    proto->SetExtension(serialization::InsertWorkOrder::tuple_index, tuple_index_);
+  serialization::WorkOrder *proto = new serialization::WorkOrder;
+  proto->set_work_order_type(serialization::INSERT);
+  proto->set_query_id(query_id_);
+  proto->SetExtension(serialization::InsertWorkOrder::insert_destination_index, output_destination_index_);
+  proto->SetExtension(serialization::InsertWorkOrder::tuple_index, tuple_index_);
 
-    container->addWorkOrderProto(proto, op_index_);
-  }
+  container->addWorkOrderProto(proto, op_index_);
 
-  return work_generated_;
+  work_generated_ = true;
+  return true;
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/RelationalOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index 425fa32..5de7eb5 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -163,19 +163,6 @@ class RelationalOperator {
   }
 
   /**
-   * @brief Inform this RelationalOperator that ALL the dependencies which break
-   *        the pipeline have been met.
-   *
-   * @note This function is only relevant in certain operators like HashJoin
-   *       which have a pipeline breaking dependency on BuildHash operator.
-   *       Such operators can start generating WorkOrders when all the pipeline
-   *       breaking dependencies are met.
-   **/
-  inline void informAllBlockingDependenciesMet() {
-    blocking_dependencies_met_ = true;
-  }
-
-  /**
    * @brief Receive input blocks for this RelationalOperator.
    *
    * @param input_block_id The ID of the input block.
@@ -289,16 +276,13 @@ class RelationalOperator {
    * @param blocking_dependencies_met If those dependencies which break the
    *        pipeline have been met.
    **/
-  explicit RelationalOperator(const std::size_t query_id,
-                              const bool blocking_dependencies_met = false)
+  explicit RelationalOperator(const std::size_t query_id)
       : query_id_(query_id),
-        blocking_dependencies_met_(blocking_dependencies_met),
         done_feeding_input_relation_(false),
         lip_deployment_index_(QueryContext::kInvalidLIPDeploymentId) {}
 
   const std::size_t query_id_;
 
-  bool blocking_dependencies_met_;
   bool done_feeding_input_relation_;
   std::size_t op_index_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/SampleOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SampleOperator.cpp b/relational_operators/SampleOperator.cpp
index 63733bf..d4f84b0 100644
--- a/relational_operators/SampleOperator.cpp
+++ b/relational_operators/SampleOperator.cpp
@@ -53,39 +53,42 @@ bool SampleOperator::getAllWorkOrders(
   std::uniform_real_distribution<> distribution(0, 1);
   const double probability = static_cast<double>(percentage_) / 100;
   if (input_relation_is_stored_) {
-    if (!started_) {
-      // If the sampling is by block choose blocks randomly
-      if (is_block_sample_) {
-        for (const block_id input_block_id : input_relation_block_ids_) {
-          if (distribution(generator) <= probability) {
-            container->addNormalWorkOrder(
-                new SampleWorkOrder(query_id_,
-                                    input_relation_,
-                                    input_block_id,
-                                    is_block_sample_,
-                                    percentage_,
-                                    output_destination,
-                                    storage_manager),
-                op_index_);
-          }
-        }
-      } else {
-        // Add all the blocks for tuple sampling which would handle
-        // the sampling from each block
-        for (const block_id input_block_id : input_relation_block_ids_) {
-          container->addNormalWorkOrder(new SampleWorkOrder(query_id_,
-                                                            input_relation_,
-                                                            input_block_id,
-                                                            is_block_sample_,
-                                                            percentage_,
-                                                            output_destination,
-                                                            storage_manager),
-                                        op_index_);
+    if (started_) {
+      return true;
+    }
+
+    // If the sampling is by block choose blocks randomly
+    if (is_block_sample_) {
+      for (const block_id input_block_id : input_relation_block_ids_) {
+        if (distribution(generator) <= probability) {
+          container->addNormalWorkOrder(
+              new SampleWorkOrder(query_id_,
+                                  input_relation_,
+                                  input_block_id,
+                                  is_block_sample_,
+                                  percentage_,
+                                  output_destination,
+                                  storage_manager),
+              op_index_);
         }
       }
-      started_ = true;
+    } else {
+      // Add all the blocks for tuple sampling which would handle
+      // the sampling from each block
+      for (const block_id input_block_id : input_relation_block_ids_) {
+        container->addNormalWorkOrder(new SampleWorkOrder(query_id_,
+                                                          input_relation_,
+                                                          input_block_id,
+                                                          is_block_sample_,
+                                                          percentage_,
+                                                          output_destination,
+                                                          storage_manager),
+                                      op_index_);
+      }
     }
-    return started_;
+
+    started_ = true;
+    return true;
   } else {
     if (is_block_sample_) {
       while (num_workorders_generated_ < input_relation_block_ids_.size()) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/TableGeneratorOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableGeneratorOperator.cpp b/relational_operators/TableGeneratorOperator.cpp
index 3f62fc9..a9e187d 100644
--- a/relational_operators/TableGeneratorOperator.cpp
+++ b/relational_operators/TableGeneratorOperator.cpp
@@ -39,35 +39,39 @@ bool TableGeneratorOperator::getAllWorkOrders(
     StorageManager *storage_manager,
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
-  if (!started_) {
-    DCHECK(query_context != nullptr);
-
-    // Currently the generator function is not abstracted to be parallelizable,
-    // so just produce one work order.
-    container->addNormalWorkOrder(
-        new TableGeneratorWorkOrder(
-            query_id_,
-            query_context->getGeneratorFunctionHandle(
-                generator_function_index_),
-            query_context->getInsertDestination(output_destination_index_)),
-        op_index_);
-    started_ = true;
+  if (started_) {
+    return true;
   }
-  return started_;
+
+  DCHECK(query_context != nullptr);
+
+  // Currently the generator function is not abstracted to be parallelizable,
+  // so just produce one work order.
+  container->addNormalWorkOrder(
+      new TableGeneratorWorkOrder(
+          query_id_,
+          query_context->getGeneratorFunctionHandle(
+              generator_function_index_),
+          query_context->getInsertDestination(output_destination_index_)),
+      op_index_);
+  started_ = true;
+  return true;
 }
 
 bool TableGeneratorOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
-  if (!started_) {
-    serialization::WorkOrder *proto = new serialization::WorkOrder;
-    proto->set_work_order_type(serialization::TABLE_GENERATOR);
-    proto->set_query_id(query_id_);
+  if (started_) {
+    return true;
+  }
 
-    proto->SetExtension(serialization::TableGeneratorWorkOrder::generator_function_index, generator_function_index_);
-    proto->SetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index, output_destination_index_);
+  serialization::WorkOrder *proto = new serialization::WorkOrder;
+  proto->set_work_order_type(serialization::TABLE_GENERATOR);
+  proto->set_query_id(query_id_);
 
-    container->addWorkOrderProto(proto, op_index_);
-    started_ = true;
-  }
+  proto->SetExtension(serialization::TableGeneratorWorkOrder::generator_function_index, generator_function_index_);
+  proto->SetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index, output_destination_index_);
+
+  container->addWorkOrderProto(proto, op_index_);
+  started_ = true;
   return true;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index a133e0c..3ca3af4 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -115,75 +115,81 @@ bool TextScanOperator::getAllWorkOrders(
   InsertDestination *output_destination =
       query_context->getInsertDestination(output_destination_index_);
 
-  if (blocking_dependencies_met_ && !work_generated_) {
-    for (const std::string &file : files) {
+  if (work_generated_) {
+    return true;
+  }
+
+  for (const std::string &file : files) {
 #ifdef QUICKSTEP_HAVE_UNISTD
-      // Check file permissions before trying to open it.
-      const int access_result = access(file.c_str(), R_OK);
-      CHECK_EQ(0, access_result)
-          << "File " << file << " is not readable due to permission issues.";
+    // Check file permissions before trying to open it.
+    const int access_result = access(file.c_str(), R_OK);
+    CHECK_EQ(0, access_result)
+        << "File " << file << " is not readable due to permission issues.";
 #endif  // QUICKSTEP_HAVE_UNISTD
 
-      const std::size_t file_size = getFileSize(file);
-
-      std::size_t text_offset = 0;
-      for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size;
-           num_full_segments > 0;
-           --num_full_segments, text_offset += FLAGS_textscan_text_segment_size) {
-        container->addNormalWorkOrder(
-            new TextScanWorkOrder(query_id_,
-                                  file,
-                                  text_offset,
-                                  FLAGS_textscan_text_segment_size,
-                                  field_terminator_,
-                                  process_escape_sequences_,
-                                  output_destination),
-            op_index_);
-      }
+    const std::size_t file_size = getFileSize(file);
+
+    std::size_t text_offset = 0;
+    for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size;
+         num_full_segments > 0;
+         --num_full_segments, text_offset += FLAGS_textscan_text_segment_size) {
+      container->addNormalWorkOrder(
+          new TextScanWorkOrder(query_id_,
+                                file,
+                                text_offset,
+                                FLAGS_textscan_text_segment_size,
+                                field_terminator_,
+                                process_escape_sequences_,
+                                output_destination),
+          op_index_);
+    }
 
-      // Deal with the residual partial segment whose size is less than
-      // 'FLAGS_textscan_text_segment_size'.
-      if (text_offset < file_size) {
-        container->addNormalWorkOrder(
-            new TextScanWorkOrder(query_id_,
-                                  file,
-                                  text_offset,
-                                  file_size - text_offset,
-                                  field_terminator_,
-                                  process_escape_sequences_,
-                                  output_destination),
-            op_index_);
-      }
+    // Deal with the residual partial segment whose size is less than
+    // 'FLAGS_textscan_text_segment_size'.
+    if (text_offset < file_size) {
+      container->addNormalWorkOrder(
+          new TextScanWorkOrder(query_id_,
+                                file,
+                                text_offset,
+                                file_size - text_offset,
+                                field_terminator_,
+                                process_escape_sequences_,
+                                output_destination),
+          op_index_);
     }
-    work_generated_ = true;
   }
-  return work_generated_;
+
+  work_generated_ = true;
+  return true;
 }
 
 bool TextScanOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
   const std::vector<std::string> files = utility::file::GlobExpand(file_pattern_);
-  if (blocking_dependencies_met_ && !work_generated_) {
-    for (const string &file : files) {
-      const std::size_t file_size = getFileSize(file);
-
-      size_t text_offset = 0;
-      for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size;
-           num_full_segments > 0;
-           --num_full_segments, text_offset += FLAGS_textscan_text_segment_size) {
-        container->addWorkOrderProto(createWorkOrderProto(file, text_offset, FLAGS_textscan_text_segment_size),
-                                     op_index_);
-      }
+  if (work_generated_) {
+    return true;
+  }
 
-      // Deal with the residual partial segment whose size is less than
-      // 'FLAGS_textscan_text_segment_size'.
-      if (text_offset < file_size) {
-        container->addWorkOrderProto(createWorkOrderProto(file, text_offset, file_size - text_offset),
-                                     op_index_);
-      }
+  for (const string &file : files) {
+    const std::size_t file_size = getFileSize(file);
+
+    size_t text_offset = 0;
+    for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size;
+         num_full_segments > 0;
+         --num_full_segments, text_offset += FLAGS_textscan_text_segment_size) {
+      container->addWorkOrderProto(createWorkOrderProto(file, text_offset, FLAGS_textscan_text_segment_size),
+                                   op_index_);
+    }
+
+    // Deal with the residual partial segment whose size is less than
+    // 'FLAGS_textscan_text_segment_size'.
+    if (text_offset < file_size) {
+      container->addWorkOrderProto(createWorkOrderProto(file, text_offset, file_size - text_offset),
+                                   op_index_);
     }
-    work_generated_ = true;
   }
-  return work_generated_;
+
+  work_generated_ = true;
+  return true;
 }
 
 serialization::WorkOrder* TextScanOperator::createWorkOrderProto(const string &filename,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index 40dfb22..08bfa59 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -52,49 +52,52 @@ bool UpdateOperator::getAllWorkOrders(
     StorageManager *storage_manager,
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
-  if (blocking_dependencies_met_ && !started_) {
-    DCHECK(query_context != nullptr);
-
-    for (const block_id input_block_id : input_blocks_) {
-      container->addNormalWorkOrder(
-          new UpdateWorkOrder(
-              query_id_,
-              relation_,
-              input_block_id,
-              query_context->getPredicate(predicate_index_),
-              query_context->getUpdateGroup(update_group_index_),
-              query_context->getInsertDestination(
-                  relocation_destination_index_),
-              storage_manager,
-              op_index_,
-              scheduler_client_id,
-              bus),
-          op_index_);
-    }
-    started_ = true;
+  if (started_) {
+    return true;
   }
-  return started_;
+
+  DCHECK(query_context != nullptr);
+  for (const block_id input_block_id : input_blocks_) {
+    container->addNormalWorkOrder(
+        new UpdateWorkOrder(
+            query_id_,
+            relation_,
+            input_block_id,
+            query_context->getPredicate(predicate_index_),
+            query_context->getUpdateGroup(update_group_index_),
+            query_context->getInsertDestination(
+                relocation_destination_index_),
+            storage_manager,
+            op_index_,
+            scheduler_client_id,
+            bus),
+        op_index_);
+  }
+  started_ = true;
+  return true;
 }
 
 bool UpdateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
-  if (blocking_dependencies_met_ && !started_) {
-    for (const block_id input_block_id : input_blocks_) {
-      serialization::WorkOrder *proto = new serialization::WorkOrder;
-      proto->set_work_order_type(serialization::UPDATE);
-      proto->set_query_id(query_id_);
-
-      proto->SetExtension(serialization::UpdateWorkOrder::operator_index, op_index_);
-      proto->SetExtension(serialization::UpdateWorkOrder::relation_id, relation_.getID());
-      proto->SetExtension(serialization::UpdateWorkOrder::insert_destination_index, relocation_destination_index_);
-      proto->SetExtension(serialization::UpdateWorkOrder::predicate_index, predicate_index_);
-      proto->SetExtension(serialization::UpdateWorkOrder::update_group_index, update_group_index_);
-      proto->SetExtension(serialization::UpdateWorkOrder::block_id, input_block_id);
-
-      container->addWorkOrderProto(proto, op_index_);
-    }
-    started_ = true;
+  if (started_) {
+    return true;
+  }
+
+  for (const block_id input_block_id : input_blocks_) {
+    serialization::WorkOrder *proto = new serialization::WorkOrder;
+    proto->set_work_order_type(serialization::UPDATE);
+    proto->set_query_id(query_id_);
+
+    proto->SetExtension(serialization::UpdateWorkOrder::operator_index, op_index_);
+    proto->SetExtension(serialization::UpdateWorkOrder::relation_id, relation_.getID());
+    proto->SetExtension(serialization::UpdateWorkOrder::insert_destination_index, relocation_destination_index_);
+    proto->SetExtension(serialization::UpdateWorkOrder::predicate_index, predicate_index_);
+    proto->SetExtension(serialization::UpdateWorkOrder::update_group_index, update_group_index_);
+    proto->SetExtension(serialization::UpdateWorkOrder::block_id, input_block_id);
+
+    container->addWorkOrderProto(proto, op_index_);
   }
-  return started_;
+  started_ = true;
+  return true;
 }
 
 void UpdateWorkOrder::execute() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/WindowAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
index 5a1f8ec..2fd9e32 100644
--- a/relational_operators/WindowAggregationOperator.cpp
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -39,32 +39,33 @@ bool WindowAggregationOperator::getAllWorkOrders(
     StorageManager *storage_manager,
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
-  DCHECK(query_context != nullptr);
-
-  if (blocking_dependencies_met_ && !generated_) {
-    std::vector<block_id> relation_blocks =
-        input_relation_.getBlocksSnapshot();
-
-    container->addNormalWorkOrder(
-        new WindowAggregationWorkOrder(
-            query_id_,
-            query_context->releaseWindowAggregationState(window_aggregation_state_index_),
-            std::move(relation_blocks),
-            query_context->getInsertDestination(output_destination_index_)),
-        op_index_);
-    generated_ = true;
+  if (generated_) {
+    return true;
   }
 
-  return generated_;
+  std::vector<block_id> relation_blocks =
+      input_relation_.getBlocksSnapshot();
+
+  DCHECK(query_context != nullptr);
+  container->addNormalWorkOrder(
+      new WindowAggregationWorkOrder(
+          query_id_,
+          query_context->releaseWindowAggregationState(window_aggregation_state_index_),
+          std::move(relation_blocks),
+          query_context->getInsertDestination(output_destination_index_)),
+      op_index_);
+  generated_ = true;
+  return true;
 }
 
 bool WindowAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
-  if (blocking_dependencies_met_ && !generated_) {
-    container->addWorkOrderProto(createWorkOrderProto(), op_index_);
-    generated_ = true;
+  if (generated_) {
+    return true;
   }
 
-  return generated_;
+  container->addWorkOrderProto(createWorkOrderProto(), op_index_);
+  generated_ = true;
+  return true;
 }
 
 serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index 3b4a737..4cda2d1 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -425,8 +425,6 @@ class AggregationOperatorTest : public ::testing::Test {
       delete work_order;
     }
 
-    finalize_op_->informAllBlockingDependenciesMet();
-
     WorkOrdersContainer finalize_op_container(1, 0);
     const std::size_t finalize_op_index = 0;
     finalize_op_->getAllWorkOrders(&finalize_op_container,
@@ -441,8 +439,6 @@ class AggregationOperatorTest : public ::testing::Test {
       delete work_order;
     }
 
-    destroy_aggr_state_op_->informAllBlockingDependenciesMet();
-
     WorkOrdersContainer destroy_aggr_state_op_container(1, 0);
     const std::size_t destroy_aggr_state_op_index = 0;
     destroy_aggr_state_op_->getAllWorkOrders(&destroy_aggr_state_op_container,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/tests/HashJoinOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/HashJoinOperator_unittest.cpp b/relational_operators/tests/HashJoinOperator_unittest.cpp
index 8338872..89aafe5 100644
--- a/relational_operators/tests/HashJoinOperator_unittest.cpp
+++ b/relational_operators/tests/HashJoinOperator_unittest.cpp
@@ -469,8 +469,6 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
 
   // Execute the operators.
   fetchAndExecuteWorkOrders(builder.get());
-
-  prober->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(prober.get());
 
   // Check result values
@@ -512,7 +510,6 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
 
   // Create cleaner operator.
   unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(kQueryId, kSinglePartition, join_hash_table_index));
-  cleaner->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(cleaner.get());
 
   db_->dropRelationById(output_relation_id);
@@ -623,8 +620,6 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
 
   // Execute the operators.
   fetchAndExecuteWorkOrders(builder.get());
-
-  prober->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(prober.get());
 
   // Check result values
@@ -687,7 +682,6 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
 
   // Create cleaner operator.
   unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(kQueryId, kSinglePartition, join_hash_table_index));
-  cleaner->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(cleaner.get());
 
   db_->dropRelationById(output_relation_id);
@@ -784,8 +778,6 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
 
   // Execute the operators.
   fetchAndExecuteWorkOrders(builder.get());
-
-  prober->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(prober.get());
 
   // Check result values
@@ -827,7 +819,6 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
 
   // Create cleaner operator.
   unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(kQueryId, kSinglePartition, join_hash_table_index));
-  cleaner->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(cleaner.get());
 
   db_->dropRelationById(output_relation_id);
@@ -931,8 +922,6 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
 
   // Execute the operators.
   fetchAndExecuteWorkOrders(builder.get());
-
-  prober->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(prober.get());
 
   // Check result values
@@ -999,7 +988,6 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
 
   // Create the cleaner operator.
   unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(kQueryId, kSinglePartition, join_hash_table_index));
-  cleaner->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(cleaner.get());
 
   db_->dropRelationById(output_relation_id);
@@ -1112,8 +1100,6 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
 
   // Execute the operators.
   fetchAndExecuteWorkOrders(builder.get());
-
-  prober->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(prober.get());
 
   // Check result values
@@ -1180,7 +1166,6 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
 
   // Create cleaner operator.
   unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(kQueryId, kSinglePartition, join_hash_table_index));
-  cleaner->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(cleaner.get());
 
   db_->dropRelationById(output_relation_id);
@@ -1304,8 +1289,6 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
 
   // Execute the operators.
   fetchAndExecuteWorkOrders(builder.get());
-
-  prober->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(prober.get());
 
   // Check result values
@@ -1372,7 +1355,6 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
 
   // Create cleaner operator.
   unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(kQueryId, kSinglePartition, join_hash_table_index));
-  cleaner->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(cleaner.get());
 
   db_->dropRelationById(output_relation_id);
@@ -1476,8 +1458,6 @@ TEST_P(HashJoinOperatorTest, SinlgeAttributePartitionedLongKeyHashJoinTest) {
 
   // Execute the operators.
   fetchAndExecuteWorkOrders(builder.get());
-
-  prober->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(prober.get());
 
   // Check result values
@@ -1519,7 +1499,6 @@ TEST_P(HashJoinOperatorTest, SinlgeAttributePartitionedLongKeyHashJoinTest) {
 
   // Create cleaner operator.
   auto cleaner = make_unique<DestroyHashOperator>(kQueryId, kMultiplePartitions, join_hash_table_index);
-  cleaner->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(cleaner.get());
 
   db_->dropRelationById(output_relation_id);
@@ -1624,8 +1603,6 @@ TEST_P(HashJoinOperatorTest, SinlgeAttributePartitionedCompositeKeyHashJoinTest)
 
   // Execute the operators.
   fetchAndExecuteWorkOrders(builder.get());
-
-  prober->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(prober.get());
 
   // Check result values
@@ -1692,7 +1669,6 @@ TEST_P(HashJoinOperatorTest, SinlgeAttributePartitionedCompositeKeyHashJoinTest)
 
   // Create cleaner operator.
   auto cleaner = make_unique<DestroyHashOperator>(kQueryId, kMultiplePartitions, join_hash_table_index);
-  cleaner->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(cleaner.get());
 
   db_->dropRelationById(output_relation_id);
@@ -1807,8 +1783,6 @@ TEST_P(HashJoinOperatorTest, SinlgeAttributePartitionedCompositeKeyHashJoinWithR
 
   // Execute the operators.
   fetchAndExecuteWorkOrders(builder.get());
-
-  prober->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(prober.get());
 
   // Check result values
@@ -1875,7 +1849,6 @@ TEST_P(HashJoinOperatorTest, SinlgeAttributePartitionedCompositeKeyHashJoinWithR
 
   // Create cleaner operator.
   auto cleaner = make_unique<DestroyHashOperator>(kQueryId, kMultiplePartitions, join_hash_table_index);
-  cleaner->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(cleaner.get());
 
   db_->dropRelationById(output_relation_id);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c018882f/relational_operators/tests/TextScanOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp
index 53a9124..c92a3dd 100644
--- a/relational_operators/tests/TextScanOperator_unittest.cpp
+++ b/relational_operators/tests/TextScanOperator_unittest.cpp
@@ -110,7 +110,6 @@ class TextScanOperatorTest : public ::testing::Test {
     op->setOperatorIndex(kOpIndex);
     WorkOrdersContainer container(1, 0);
     const std::size_t op_index = 0;
-    op->informAllBlockingDependenciesMet();
     op->getAllWorkOrders(&container,
                          query_context_.get(),
                          storage_manager_.get(),


Mime
View raw message