nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update
Date Fri, 10 Jul 2020 15:56:40 GMT

arpadboda commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r452930520



##########
File path: libminifi/src/FlowController.cpp
##########
@@ -239,8 +239,35 @@ int16_t FlowController::stop(bool force, uint64_t timeToWait) {
   if (running_) {
     // immediately indicate that we are not running
     logger_->log_info("Stop Flow Controller");
-    if (this->root_)
+    if (this->root_) {
+      // stop source processors first
+      this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_,
[] (const std::shared_ptr<core::Processor>& proc) -> bool {
+        return !proc->hasIncomingConnections();
+      });
+      std::chrono::milliseconds shutdown_timer{0};
+      // we enable C2 to progressively increase the timeout
+      // in case it sees that waiting for a little longer could
+      // allow the FlowFiles to be processed
+      auto shutdown_timeout = [&]() -> std::chrono::milliseconds {
+        if (timeToWait != 0) {
+          return std::chrono::milliseconds{timeToWait};
+        }
+        static const core::TimePeriodValue default_timeout{"10 sec"};

Review comment:
       I think this shouldn't be defined in the middle of the code, very difficult to search
for.
   Moreover it can delay shutdown, so I would prefer to leave it 0 by default. 

##########
File path: libminifi/src/FlowController.cpp
##########
@@ -239,8 +239,35 @@ int16_t FlowController::stop(bool force, uint64_t timeToWait) {
   if (running_) {
     // immediately indicate that we are not running
     logger_->log_info("Stop Flow Controller");
-    if (this->root_)
+    if (this->root_) {
+      // stop source processors first
+      this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_,
[] (const std::shared_ptr<core::Processor>& proc) -> bool {
+        return !proc->hasIncomingConnections();
+      });
+      std::chrono::milliseconds shutdown_timer{0};
+      // we enable C2 to progressively increase the timeout
+      // in case it sees that waiting for a little longer could
+      // allow the FlowFiles to be processed
+      auto shutdown_timeout = [&]() -> std::chrono::milliseconds {
+        if (timeToWait != 0) {
+          return std::chrono::milliseconds{timeToWait};
+        }
+        static const core::TimePeriodValue default_timeout{"10 sec"};
+        utils::optional<core::TimePeriodValue> shutdown_timeout;
+        std::string shutdown_timeout_str;
+        if (configuration_->get(minifi::Configure::nifi_flowcontroller_drain_timeout,
shutdown_timeout_str)) {
+          shutdown_timeout = core::TimePeriodValue::fromString(shutdown_timeout_str);
+        }
+        return std::chrono::milliseconds{shutdown_timeout.value_or(default_timeout).getMilliseconds()};
+      };
+      std::size_t count;
+      while (shutdown_timer < shutdown_timeout() && (count = this->root_->getTotalFlowFileCount())
!= 0) {
+        std::this_thread::sleep_for(shutdown_check_interval_);
+        shutdown_timer += shutdown_check_interval_;

Review comment:
       I would do it by saving the current time when this activity is started and check if
the time elapsed exceeds the maximum allowed for this operation.

##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -412,6 +414,18 @@ void ProcessGroup::drainConnections() {
   }
 }
 
+std::size_t ProcessGroup::getTotalFlowFileCount() const {
+  std::size_t sum = 0;
+  for (auto& conn : connections_) {

Review comment:
       Nitpicking: const auto&

##########
File path: libminifi/test/flow-tests/FlowControllerTests.cpp
##########
@@ -119,7 +107,125 @@ TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
 
   controller->stop(true);
 
+  REQUIRE(sinkProc->trigger_count == 0);
+
   for (auto& it : connectionMap) {
     REQUIRE(it.second->isEmpty());
   }
 }
+
+TEST_CASE("Flow shutdown waits for a while", "[TestFlow2]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  // before we could initiate the shutdown
+  sinkProc->yield(100);
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 0);
+
+  controller->stop(true);
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 3);
+}
+
+TEST_CASE("Flow stopped after grace period", "[TestFlow3]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout,
"1000 ms");
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  sinkProc->yield(100);
+
+  sinkProc->onTriggerCb_ = [&]{
+    static std::atomic<bool> first_onTrigger{true};
+    bool isFirst = true;
+    // sleep only on the first trigger
+    if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
+      std::this_thread::sleep_for(std::chrono::milliseconds{1500});
+    }
+  };
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});

Review comment:
       Can't we simply wait here while the source processor's trigger count becomes one?

##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -412,6 +414,18 @@ void ProcessGroup::drainConnections() {
   }
 }
 
+std::size_t ProcessGroup::getTotalFlowFileCount() const {
+  std::size_t sum = 0;
+  for (auto& conn : connections_) {
+    sum += conn->getQueueSize();
+  }
+
+  for (ProcessGroup* childGroup : child_process_groups_) {

Review comment:
       const ptr




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message