nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ab...@apache.org
Subject [nifi-minifi-cpp] branch main updated: MINIFICPP-1568 Retry PDH processor run to fix test flakiness
Date Fri, 04 Jun 2021 10:19:11 GMT
This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 05ae25c  MINIFICPP-1568 Retry PDH processor run to fix test flakiness
05ae25c is described below

commit 05ae25c1aae280767206b641f8c35d485aa85245
Author: Gabor Gyimesi <gamezbird@gmail.com>
AuthorDate: Wed May 26 17:05:47 2021 +0200

    MINIFICPP-1568 Retry PDH processor run to fix test flakiness
    
    Signed-off-by: Arpad Boda <aboda@apache.org>
    
    This closes #1085
---
 .../pdh/tests/PerformanceDataMonitorTests.cpp      | 283 +++++++++++----------
 1 file changed, 148 insertions(+), 135 deletions(-)

diff --git a/extensions/pdh/tests/PerformanceDataMonitorTests.cpp b/extensions/pdh/tests/PerformanceDataMonitorTests.cpp
index 7160bc5..b3d5acd 100644
--- a/extensions/pdh/tests/PerformanceDataMonitorTests.cpp
+++ b/extensions/pdh/tests/PerformanceDataMonitorTests.cpp
@@ -20,6 +20,7 @@
 #include <vector>
 #include <set>
 #include <fstream>
+#include <functional>
 
 #include "TestBase.h"
 #include "processors/PutFile.h"
@@ -43,12 +44,18 @@ class PerformanceDataMonitorTester {
     plan_->setProperty(putfile_, PutFile::Directory.getName(), dir_);
   }
 
-  void runProcessors() {
-    plan_->runNextProcessor();      // PerformanceMonitor
-    std::this_thread::sleep_for(std::chrono::milliseconds(200));
-    plan_->runCurrentProcessor();   // PerformanceMonitor
-    plan_->runNextProcessor();      // PutFile
-    plan_->runCurrentProcessor();   // PutFile
+  bool runWithRetries(std::function<bool()>&& assertions, uint32_t max_tries
= 10) {
+    uint32_t tries = 0;
+    for (uint32_t tries = 0; tries < max_tries; ++tries) {
+      test_controller_.runSession(plan_);
+      if (assertions()) {
+        return true;
+      }
+      plan_->reset();
+      LogTestController::getInstance().reset();
+      std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    }
+    return false;
   }
 
   void setPerformanceMonitorProperty(const core::Property& property, const std::string&
value) {
@@ -65,8 +72,7 @@ class PerformanceDataMonitorTester {
 
 TEST_CASE("PerformanceDataMonitorEmptyPropertiesTest", "[performancedatamonitoremptypropertiestest]")
{
   PerformanceDataMonitorTester tester;
-  tester.runProcessors();
-
+  tester.test_controller_.runSession(tester.plan_);
   REQUIRE(tester.test_controller_.getLog().getInstance().contains("No valid counters for
PerformanceDataMonitor", std::chrono::seconds(0)));
 
   auto created_flow_files = utils::file::FileUtils::list_dir_all(tester.dir_, tester.plan_->getLogger(),
false);
@@ -77,61 +83,64 @@ TEST_CASE("PerformanceDataMonitorPartiallyInvalidGroupPropertyTest", "[performan
   PerformanceDataMonitorTester tester;
   tester.setPerformanceMonitorProperty(PerformanceDataMonitor::PredefinedGroups, "Disk,CPU,Asd");
   tester.setPerformanceMonitorProperty(PerformanceDataMonitor::CustomPDHCounters, "\\Invalid\\Counter,\\System\\Processes");
-  tester.runProcessors();
-
-  REQUIRE(tester.test_controller_.getLog().getInstance().contains("Asd is not a valid predefined
group", std::chrono::seconds(0)));
-  REQUIRE(tester.test_controller_.getLog().getInstance().contains("Error adding \\Invalid\\Counter
to query", std::chrono::seconds(0)));
-
-  uint32_t number_of_flowfiles = 0;
-
-  auto lambda = [&number_of_flowfiles](const std::string& path, const std::string&
filename) -> bool {
-    ++number_of_flowfiles;
-    FILE* fp = fopen((path + utils::file::FileUtils::get_separator() + filename).c_str(),
"r");
-    REQUIRE(fp != nullptr);
-    char readBuffer[500];
-    rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
-    rapidjson::Document document;
-    document.ParseStream(is);
-    fclose(fp);
-    REQUIRE(document.IsObject());
-    REQUIRE(document.HasMember("LogicalDisk"));
-    REQUIRE(document["LogicalDisk"].HasMember("_Total"));
-    REQUIRE(document["LogicalDisk"]["_Total"].HasMember("Free Megabytes"));
-    REQUIRE(document["System"].HasMember("Processes"));
-    return true;
+  const auto assertions = [&tester]() {
+    REQUIRE(tester.test_controller_.getLog().getInstance().contains("Asd is not a valid predefined
group", std::chrono::seconds(0)));
+    REQUIRE(tester.test_controller_.getLog().getInstance().contains("Error adding \\Invalid\\Counter
to query", std::chrono::seconds(0)));
+
+    bool ff_contains_all_data = false;
+
+    const auto lambda = [&ff_contains_all_data](const std::string& path, const std::string&
filename) -> bool {
+      FILE* fp = fopen((path + utils::file::FileUtils::get_separator() + filename).c_str(),
"r");
+      REQUIRE(fp != nullptr);
+      char readBuffer[500];
+      rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+      rapidjson::Document document;
+      document.ParseStream(is);
+      fclose(fp);
+      ff_contains_all_data =
+        document.IsObject() &&
+        document.HasMember("LogicalDisk") &&
+        document["LogicalDisk"].HasMember("_Total") &&
+        document["LogicalDisk"]["_Total"].HasMember("Free Megabytes") &&
+        document["System"].HasMember("Processes");
+      return !ff_contains_all_data;
+    };
+
+    utils::file::FileUtils::list_dir(tester.dir_, lambda, tester.plan_->getLogger(), false);
+    return ff_contains_all_data;
   };
-
-  utils::file::FileUtils::list_dir(tester.dir_, lambda, tester.plan_->getLogger(), false);
-  REQUIRE(number_of_flowfiles == 2);
+  REQUIRE(tester.runWithRetries(assertions));
 }
 
 TEST_CASE("PerformanceDataMonitorCustomPDHCountersTest", "[performancedatamonitorcustompdhcounterstest]")
{
   PerformanceDataMonitorTester tester;
   tester.setPerformanceMonitorProperty(PerformanceDataMonitor::CustomPDHCounters, "\\System\\Processes,\\Process(*)\\%
Processor Time");
-  tester.runProcessors();
-
-  uint32_t number_of_flowfiles = 0;
-
-  auto lambda = [&number_of_flowfiles](const std::string& path, const std::string&
filename) -> bool {
-    ++number_of_flowfiles;
-    FILE* fp = fopen((path + utils::file::FileUtils::get_separator() + filename).c_str(),
"r");
-    REQUIRE(fp != nullptr);
-    char readBuffer[50000];
-    rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
-    rapidjson::Document document;
-    document.ParseStream(is);
-    fclose(fp);
-    REQUIRE(document.IsObject());
-    REQUIRE(document.HasMember("System"));
-    REQUIRE(document["System"].HasMember("Processes"));
-    REQUIRE(document.HasMember("Process"));
-    REQUIRE(document["Process"].HasMember("PerformanceDataMonitorTests"));
-    REQUIRE(document["Process"]["PerformanceDataMonitorTests"].HasMember("% Processor Time"));
-    return true;
+
+  const auto assertions = [&tester]() {
+    bool ff_contains_all_data = false;
+    const auto lambda = [&ff_contains_all_data](const std::string& path, const std::string&
filename) -> bool {
+      FILE* fp = fopen((path + utils::file::FileUtils::get_separator() + filename).c_str(),
"r");
+      REQUIRE(fp != nullptr);
+      char readBuffer[50000];
+      rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+      rapidjson::Document document;
+      document.ParseStream(is);
+      fclose(fp);
+      ff_contains_all_data =
+        document.IsObject() &&
+        document.HasMember("System") &&
+        document["System"].HasMember("Processes") &&
+        document.HasMember("Process") &&
+        document["Process"].HasMember("PerformanceDataMonitorTests") &&
+        document["Process"]["PerformanceDataMonitorTests"].HasMember("% Processor Time");
+      return !ff_contains_all_data;
+    };
+
+    utils::file::FileUtils::list_dir(tester.dir_, lambda, tester.plan_->getLogger(), false);
+    return ff_contains_all_data;
   };
 
-  utils::file::FileUtils::list_dir(tester.dir_, lambda, tester.plan_->getLogger(), false);
-  REQUIRE(number_of_flowfiles == 2);
+  REQUIRE(tester.runWithRetries(assertions));
 }
 
 TEST_CASE("PerformanceDataMonitorCustomPDHCountersTestOpenTelemetry", "[performancedatamonitorcustompdhcounterstestopentelemetry]")
{
@@ -140,33 +149,35 @@ TEST_CASE("PerformanceDataMonitorCustomPDHCountersTestOpenTelemetry",
"[performa
   tester.setPerformanceMonitorProperty(PerformanceDataMonitor::CustomPDHCounters, "\\System\\Processes,\\Process(*)\\ID
Process,\\Process(*)\\Private Bytes");
   tester.setPerformanceMonitorProperty(PerformanceDataMonitor::OutputFormatProperty, "OpenTelemetry");
   tester.setPerformanceMonitorProperty(PerformanceDataMonitor::OutputCompactness, "Compact");
-  tester.runProcessors();
-
-  uint32_t number_of_flowfiles = 0;
-
-  auto lambda = [&number_of_flowfiles](const std::string& path, const std::string&
filename) -> bool {
-    ++number_of_flowfiles;
-    FILE* fp = fopen((path + utils::file::FileUtils::get_separator() + filename).c_str(),
"r");
-    REQUIRE(fp != nullptr);
-    char readBuffer[50000];
-    rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
-    rapidjson::Document document;
-    document.ParseStream(is);
-    fclose(fp);
-    REQUIRE(document.IsObject());
-    REQUIRE(document.HasMember("Name"));
-    REQUIRE(document.HasMember("Timestamp"));
-    REQUIRE(document.HasMember("Body"));
-    REQUIRE(document["Body"].HasMember("System"));
-    REQUIRE(document["Body"]["System"].HasMember("Processes"));
-    REQUIRE(document["Body"].HasMember("Process"));
-    REQUIRE(document["Body"]["Process"].HasMember("PerformanceDataMonitorTests"));
-    REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("ID Process"));
-    return true;
+
+  const auto assertions = [&tester]() {
+    bool ff_contains_all_data = false;
+    const auto lambda = [&ff_contains_all_data](const std::string& path, const std::string&
filename) -> bool {
+      FILE* fp = fopen((path + utils::file::FileUtils::get_separator() + filename).c_str(),
"r");
+      REQUIRE(fp != nullptr);
+      char readBuffer[50000];
+      rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+      rapidjson::Document document;
+      document.ParseStream(is);
+      fclose(fp);
+      ff_contains_all_data =
+        document.IsObject() &&
+        document.HasMember("Name") &&
+        document.HasMember("Timestamp") &&
+        document.HasMember("Body") &&
+        document["Body"].HasMember("System") &&
+        document["Body"]["System"].HasMember("Processes") &&
+        document["Body"].HasMember("Process") &&
+        document["Body"]["Process"].HasMember("PerformanceDataMonitorTests") &&
+        document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("ID Process");
+      return !ff_contains_all_data;
+    };
+
+    utils::file::FileUtils::list_dir(tester.dir_, lambda, tester.plan_->getLogger(), false);
+    return ff_contains_all_data;
   };
 
-  utils::file::FileUtils::list_dir(tester.dir_, lambda, tester.plan_->getLogger(), false);
-  REQUIRE(number_of_flowfiles == 2);
+  REQUIRE(tester.runWithRetries(assertions));
 }
 
 TEST_CASE("PerformanceDataMonitorAllPredefinedGroups", "[performancedatamonitorallpredefinedgroups]")
{
@@ -174,63 +185,65 @@ TEST_CASE("PerformanceDataMonitorAllPredefinedGroups", "[performancedatamonitora
   tester.setPerformanceMonitorProperty(PerformanceDataMonitor::PredefinedGroups, "CPU,Disk,Network,Memory,IO,System,Process");
   tester.setPerformanceMonitorProperty(PerformanceDataMonitor::OutputFormatProperty, "OpenTelemetry");
   tester.setPerformanceMonitorProperty(PerformanceDataMonitor::OutputCompactness, "Pretty");
-  tester.runProcessors();
-
-  uint32_t number_of_flowfiles = 0;
-
-  auto lambda = [&number_of_flowfiles](const std::string& path, const std::string&
filename) -> bool {
-    ++number_of_flowfiles;
-    FILE* fp = fopen((path + utils::file::FileUtils::get_separator() + filename).c_str(),
"r");
-    REQUIRE(fp != nullptr);
-    char readBuffer[50000];
-    rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
-    rapidjson::Document document;
-    document.ParseStream(is);
-    fclose(fp);
-    REQUIRE(document.IsObject());
-    REQUIRE(document.HasMember("Name"));
-    REQUIRE(document.HasMember("Timestamp"));
-    REQUIRE(document.HasMember("Body"));
-    REQUIRE(document["Body"].HasMember("PhysicalDisk"));
-    REQUIRE(document["Body"]["PhysicalDisk"].HasMember("_Total"));
-    REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk Read Time"));
-    REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk Time"));
-    REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk Write Time"));
-    REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Idle Time"));
-
-    REQUIRE(document["Body"].HasMember("LogicalDisk"));
-    REQUIRE(document["Body"]["LogicalDisk"].HasMember("_Total"));
-    REQUIRE(document["Body"]["LogicalDisk"]["_Total"].HasMember("Free Megabytes"));
-    REQUIRE(document["Body"]["LogicalDisk"]["_Total"].HasMember("% Free Space"));
-
-    REQUIRE(document["Body"].HasMember("Processor"));
-    REQUIRE(document["Body"]["Processor"].HasMember("_Total"));
-
-    REQUIRE(document["Body"].HasMember("Network Interface"));
-
-    REQUIRE(document["Body"].HasMember("Memory"));
-    REQUIRE(document["Body"]["Memory"].HasMember("% Committed Bytes In Use"));
-    REQUIRE(document["Body"]["Memory"].HasMember("Available MBytes"));
-    REQUIRE(document["Body"]["Memory"].HasMember("Page Faults/sec"));
-    REQUIRE(document["Body"]["Memory"].HasMember("Pages/sec"));
-
-    REQUIRE(document["Body"].HasMember("System"));
-    REQUIRE(document["Body"]["System"].HasMember("% Registry Quota In Use"));
-    REQUIRE(document["Body"]["System"].HasMember("Context Switches/sec"));
-    REQUIRE(document["Body"]["System"].HasMember("File Control Bytes/sec"));
-    REQUIRE(document["Body"]["System"].HasMember("File Control Operations/sec"));
-
-    REQUIRE(document["Body"].HasMember("Process"));
-    REQUIRE(document["Body"]["Process"].HasMember("PerformanceDataMonitorTests"));
-    REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("% Processor
Time"));
-    REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("Elapsed
Time"));
-    REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("ID Process"));
-    REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("Private
Bytes"));
-    return true;
+
+  const auto assertions = [&tester]() {
+    bool ff_contains_all_data = false;
+    const auto lambda = [&ff_contains_all_data](const std::string& path, const std::string&
filename) -> bool {
+      FILE* fp = fopen((path + utils::file::FileUtils::get_separator() + filename).c_str(),
"r");
+      REQUIRE(fp != nullptr);
+      char readBuffer[50000];
+      rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+      rapidjson::Document document;
+      document.ParseStream(is);
+      fclose(fp);
+      ff_contains_all_data =
+        document.IsObject() &&
+        document.HasMember("Name") &&
+        document.HasMember("Timestamp") &&
+        document.HasMember("Body") &&
+        document["Body"].HasMember("PhysicalDisk") &&
+        document["Body"]["PhysicalDisk"].HasMember("_Total") &&
+        document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk Read Time") &&
+        document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk Time") &&
+        document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk Write Time") &&
+        document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Idle Time") &&
+
+        document["Body"].HasMember("LogicalDisk") &&
+        document["Body"]["LogicalDisk"].HasMember("_Total") &&
+        document["Body"]["LogicalDisk"]["_Total"].HasMember("Free Megabytes") &&
+        document["Body"]["LogicalDisk"]["_Total"].HasMember("% Free Space") &&
+
+        document["Body"].HasMember("Processor") &&
+        document["Body"]["Processor"].HasMember("_Total") &&
+
+        document["Body"].HasMember("Network Interface") &&
+
+        document["Body"].HasMember("Memory") &&
+        document["Body"]["Memory"].HasMember("% Committed Bytes In Use") &&
+        document["Body"]["Memory"].HasMember("Available MBytes") &&
+        document["Body"]["Memory"].HasMember("Page Faults/sec") &&
+        document["Body"]["Memory"].HasMember("Pages/sec") &&
+
+        document["Body"].HasMember("System") &&
+        document["Body"]["System"].HasMember("% Registry Quota In Use") &&
+        document["Body"]["System"].HasMember("Context Switches/sec") &&
+        document["Body"]["System"].HasMember("File Control Bytes/sec") &&
+        document["Body"]["System"].HasMember("File Control Operations/sec") &&
+
+        document["Body"].HasMember("Process") &&
+        document["Body"]["Process"].HasMember("PerformanceDataMonitorTests") &&
+        document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("% Processor
Time") &&
+        document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("Elapsed Time")
&&
+        document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("ID Process")
&&
+        document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("Private Bytes");
+      return !ff_contains_all_data;
+    };
+
+    utils::file::FileUtils::list_dir(tester.dir_, lambda, tester.plan_->getLogger(), false);
+    return ff_contains_all_data;
   };
 
-  utils::file::FileUtils::list_dir(tester.dir_, lambda, tester.plan_->getLogger(), false);
-  REQUIRE(number_of_flowfiles == 2);
+  REQUIRE(tester.runWithRetries(assertions));
 }
 
 TEST_CASE("PerformanceDataMonitorDecimalPlacesPropertyTest", "[performancedatamonitordecimalplacespropertytest]")
{

Mime
View raw message