nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update
Date Mon, 13 Jul 2020 08:28:53 GMT

adamdebreceni commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r453479898



##########
File path: libminifi/test/flow-tests/FlowControllerTests.cpp
##########
@@ -119,7 +107,125 @@ TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
 
   controller->stop(true);
 
+  REQUIRE(sinkProc->trigger_count == 0);
+
   for (auto& it : connectionMap) {
     REQUIRE(it.second->isEmpty());
   }
 }
+
+TEST_CASE("Flow shutdown waits for a while", "[TestFlow2]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  // before we could initiate the shutdown
+  sinkProc->yield(100);
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 0);
+
+  controller->stop(true);
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 3);
+}
+
+TEST_CASE("Flow stopped after grace period", "[TestFlow3]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout,
"1000 ms");
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  sinkProc->yield(100);
+
+  sinkProc->onTriggerCb_ = [&]{
+    static std::atomic<bool> first_onTrigger{true};
+    bool isFirst = true;
+    // sleep only on the first trigger
+    if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
+      std::this_thread::sleep_for(std::chrono::milliseconds{1500});
+    }
+  };
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});

Review comment:
       we are not waiting for the trigger but for the FlowFiles to be enqueued into the Connection,
during shutdown we cannot check if a source processor stopped, thus we can only check the
connections' contents, should we expose the fact that there are active sessions for a processor?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message