nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject [nifi-minifi-cpp] 01/02: MINIFICPP-738 - EL should be able to access global properties
Date Fri, 01 Mar 2019 02:09:31 GMT
This is an automated email from the ASF dual-hosted git repository.

aldrin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit f01953b8cd8814fa5b4faa0a7cb588f0545bb231
Author: Arpad Boda <aboda@hortonworks.com>
AuthorDate: Mon Feb 18 23:19:16 2019 +0100

    MINIFICPP-738 - EL should be able to access global properties
---
 extensions/expression-language/Expression.cpp           |  8 ++++++--
 extensions/expression-language/ProcessContextExpr.cpp   | 10 ++++++++--
 .../expression-language/impl/expression/Expression.h    |  9 +++++++--
 extensions/rocksdb-repos/DatabaseContentRepository.cpp  |  1 +
 libminifi/include/core/ContentRepository.h              |  5 +++++
 libminifi/src/core/repository/FileSystemRepository.cpp  |  1 +
 .../src/core/repository/VolatileContentRepository.cpp   |  1 +
 libminifi/test/TestBase.h                               |  6 ++++--
 .../ExpressionLanguageTests.cpp                         | 17 ++++++++++++++++-
 9 files changed, 49 insertions(+), 9 deletions(-)

diff --git a/extensions/expression-language/Expression.cpp b/extensions/expression-language/Expression.cpp
index b031b04..07fd509 100644
--- a/extensions/expression-language/Expression.cpp
+++ b/extensions/expression-language/Expression.cpp
@@ -69,9 +69,13 @@ Expression make_dynamic_attr(const std::string &attribute_id) {
     const auto cur_flow_file = params.flow_file.lock();
     if (cur_flow_file && cur_flow_file->getAttribute(attribute_id, result)) {
       return Value(result);
-    } else {
-      return Value();
+    } else if (attribute_id.rfind("nifi.", 0) == 0) {
+      const auto config = params.configuration.lock();
+      if (config && config->get(attribute_id, result)) {
+        return Value(result);
+      }
     }
+    return Value();
   });
 }
 
diff --git a/extensions/expression-language/ProcessContextExpr.cpp b/extensions/expression-language/ProcessContextExpr.cpp
index 5cceb77..71c0b36 100644
--- a/extensions/expression-language/ProcessContextExpr.cpp
+++ b/extensions/expression-language/ProcessContextExpr.cpp
@@ -34,7 +34,10 @@ bool ProcessContext::getProperty(const Property &property, std::string
&value, c
     expressions_.emplace(name, expression::compile(expression_str));
   }
 
-  value = expressions_[name]( { flow_file }).asString();
+  minifi::expression::Parameters p(flow_file);
+  p.configuration = content_repo_->getConfig();
+
+  value = expressions_[name]( p ).asString();
   return true;
 }
 
@@ -50,7 +53,10 @@ bool ProcessContext::getDynamicProperty(const Property &property, std::string
&v
     dynamic_property_expressions_.emplace(name, expression::compile(expression_str));
   }
 
-  value = dynamic_property_expressions_[name]( { flow_file }).asString();
+  minifi::expression::Parameters p(flow_file);
+  p.configuration = content_repo_->getConfig();
+
+  value = dynamic_property_expressions_[name]( p ).asString();
   return true;
 }
 
diff --git a/extensions/expression-language/impl/expression/Expression.h b/extensions/expression-language/impl/expression/Expression.h
index 15f7558..826f588 100644
--- a/extensions/expression-language/impl/expression/Expression.h
+++ b/extensions/expression-language/impl/expression/Expression.h
@@ -45,9 +45,14 @@ namespace nifi {
 namespace minifi {
 namespace expression {
 
-typedef struct {
+struct Parameters {
   std::weak_ptr<core::FlowFile> flow_file;
-} Parameters;
+  std::weak_ptr<minifi::Configure> configuration;
+  Parameters(std::shared_ptr<core::FlowFile> ff = nullptr) {
+    flow_file = ff;
+  }
+
+};
 
 class Expression;
 
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 5842bf2..29864f4 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -30,6 +30,7 @@ namespace core {
 namespace repository {
 
 bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configure>
&configuration) {
+  configuration_ = configuration;
   std::string value;
   if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, value))
{
     directory_ = value;
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 51d2765..50c6de1 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -108,6 +108,10 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim>
{
     }
   }
 
+  virtual std::weak_ptr<Configure> getConfig(){
+    return configuration_;
+  }
+
  protected:
 
   std::string directory_;
@@ -116,6 +120,7 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim>
{
 
   std::map<std::string, uint32_t> count_map_;
 
+  std::weak_ptr<Configure> configuration_;
 };
 
 } /* namespace core */
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp
index c0f1694..0a114d3 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -30,6 +30,7 @@ namespace core {
 namespace repository {
 
 bool FileSystemRepository::initialize(const std::shared_ptr<minifi::Configure> &configuration)
{
+  configuration_ = configuration;
   std::string value;
   if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, value))
{
     directory_ = value;
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index 674566b..1b27157 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -35,6 +35,7 @@ namespace repository {
 const char *VolatileContentRepository::minimal_locking = "minimal.locking";
 
 bool VolatileContentRepository::initialize(const std::shared_ptr<Configure> &configure)
{
+  configuration_ = configure;
   VolatileRepository::initialize(configure);
   resource_claim_comparator_ = [](std::shared_ptr<minifi::ResourceClaim> lhsPtr, std::shared_ptr<minifi::ResourceClaim>
rhsPtr) {
     if (lhsPtr == nullptr || rhsPtr == nullptr) {
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index c12edeb..c8104d5 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -249,8 +249,10 @@ class TestController {
     flow_version_ = std::make_shared<minifi::state::response::FlowVersion>("test",
"test", "test");
   }
 
-  std::shared_ptr<TestPlan> createPlan() {
-    std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> configuration
= nullptr) {
+    if(configuration == nullptr) {
+      configuration = std::make_shared<minifi::Configure>();
+    }
     std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
 
     content_repo->initialize(configuration);
diff --git a/libminifi/test/expression-language-tests/ExpressionLanguageTests.cpp b/libminifi/test/expression-language-tests/ExpressionLanguageTests.cpp
index 3ee6ec3..c96cd12 100644
--- a/libminifi/test/expression-language-tests/ExpressionLanguageTests.cpp
+++ b/libminifi/test/expression-language-tests/ExpressionLanguageTests.cpp
@@ -25,6 +25,7 @@
 #include <ExtractText.h>
 #include <GetFile.h>
 #include <PutFile.h>
+#include <UpdateAttribute.h>
 #include <LogAttribute.h>
 
 namespace expression = org::apache::nifi::minifi::expression;
@@ -176,8 +177,13 @@ TEST_CASE("GetFile PutFile dynamic attribute", "[expressionLanguageTestGetFilePu
   LogTestController::getInstance().setTrace<processors::GetFile>();
   LogTestController::getInstance().setTrace<processors::PutFile>();
   LogTestController::getInstance().setTrace<processors::LogAttribute>();
+  LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
 
-  auto plan = testController.createPlan();
+  auto conf = std::make_shared<minifi::Configure>();
+
+  conf->set("nifi.my.own.property", "custom_value");
+
+  auto plan = testController.createPlan(conf);
   auto repo = std::make_shared<TestRepository>();
 
   std::string in_dir("/tmp/gt.XXXXXX");
@@ -203,6 +209,12 @@ TEST_CASE("GetFile PutFile dynamic attribute", "[expressionLanguageTestGetFilePu
       get_file,
       processors::GetFile::KeepSourceFile.getName(),
       "false");
+  auto update = plan->addProcessor(
+      "UpdateAttribute",
+      "UpdateAttribute",
+      core::Relationship("success", "description"),
+      true);
+  update->setDynamicProperty("prop_attr", "${'nifi.my.own.property'}_added");
   plan->addProcessor(
       "LogAttribute",
       "LogAttribute",
@@ -246,6 +258,7 @@ TEST_CASE("GetFile PutFile dynamic attribute", "[expressionLanguageTestGetFilePu
   }
 
   plan->runNextProcessor();  // GetFile
+  plan->runNextProcessor();  // Update
   plan->runNextProcessor();  // Log
   plan->runNextProcessor();  // ExtractText
   plan->runNextProcessor();  // Log
@@ -258,6 +271,8 @@ TEST_CASE("GetFile PutFile dynamic attribute", "[expressionLanguageTestGetFilePu
     output_str << out_file_stream.rdbuf();
     REQUIRE("extracted_attr" == output_str.str());
   }
+
+  REQUIRE(LogTestController::getInstance().contains("key:prop_attr value:custom_value_added"));
 }
 
 TEST_CASE("Substring 2 arg", "[expressionLanguageSubstring2]") {  // NOLINT


Mime
View raw message