This is an automated email from the ASF dual-hosted git repository.
aldrin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit f01953b8cd8814fa5b4faa0a7cb588f0545bb231
Author: Arpad Boda <aboda@hortonworks.com>
AuthorDate: Mon Feb 18 23:19:16 2019 +0100
MINIFICPP-738 - EL should be able to access global properties
---
extensions/expression-language/Expression.cpp | 8 ++++++--
extensions/expression-language/ProcessContextExpr.cpp | 10 ++++++++--
.../expression-language/impl/expression/Expression.h | 9 +++++++--
extensions/rocksdb-repos/DatabaseContentRepository.cpp | 1 +
libminifi/include/core/ContentRepository.h | 5 +++++
libminifi/src/core/repository/FileSystemRepository.cpp | 1 +
.../src/core/repository/VolatileContentRepository.cpp | 1 +
libminifi/test/TestBase.h | 6 ++++--
.../ExpressionLanguageTests.cpp | 17 ++++++++++++++++-
9 files changed, 49 insertions(+), 9 deletions(-)
diff --git a/extensions/expression-language/Expression.cpp b/extensions/expression-language/Expression.cpp
index b031b04..07fd509 100644
--- a/extensions/expression-language/Expression.cpp
+++ b/extensions/expression-language/Expression.cpp
@@ -69,9 +69,13 @@ Expression make_dynamic_attr(const std::string &attribute_id) {
const auto cur_flow_file = params.flow_file.lock();
if (cur_flow_file && cur_flow_file->getAttribute(attribute_id, result)) {
return Value(result);
- } else {
- return Value();
+ } else if (attribute_id.rfind("nifi.", 0) == 0) {
+ const auto config = params.configuration.lock();
+ if (config && config->get(attribute_id, result)) {
+ return Value(result);
+ }
}
+ return Value();
});
}
diff --git a/extensions/expression-language/ProcessContextExpr.cpp b/extensions/expression-language/ProcessContextExpr.cpp
index 5cceb77..71c0b36 100644
--- a/extensions/expression-language/ProcessContextExpr.cpp
+++ b/extensions/expression-language/ProcessContextExpr.cpp
@@ -34,7 +34,10 @@ bool ProcessContext::getProperty(const Property &property, std::string
&value, c
expressions_.emplace(name, expression::compile(expression_str));
}
- value = expressions_[name]( { flow_file }).asString();
+ minifi::expression::Parameters p(flow_file);
+ p.configuration = content_repo_->getConfig();
+
+ value = expressions_[name]( p ).asString();
return true;
}
@@ -50,7 +53,10 @@ bool ProcessContext::getDynamicProperty(const Property &property, std::string
&v
dynamic_property_expressions_.emplace(name, expression::compile(expression_str));
}
- value = dynamic_property_expressions_[name]( { flow_file }).asString();
+ minifi::expression::Parameters p(flow_file);
+ p.configuration = content_repo_->getConfig();
+
+ value = dynamic_property_expressions_[name]( p ).asString();
return true;
}
diff --git a/extensions/expression-language/impl/expression/Expression.h b/extensions/expression-language/impl/expression/Expression.h
index 15f7558..826f588 100644
--- a/extensions/expression-language/impl/expression/Expression.h
+++ b/extensions/expression-language/impl/expression/Expression.h
@@ -45,9 +45,14 @@ namespace nifi {
namespace minifi {
namespace expression {
-typedef struct {
+struct Parameters {
std::weak_ptr<core::FlowFile> flow_file;
-} Parameters;
+ std::weak_ptr<minifi::Configure> configuration;
+ Parameters(std::shared_ptr<core::FlowFile> ff = nullptr) {
+ flow_file = ff;
+ }
+
+};
class Expression;
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 5842bf2..29864f4 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -30,6 +30,7 @@ namespace core {
namespace repository {
bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configure>
&configuration) {
+ configuration_ = configuration;
std::string value;
if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, value))
{
directory_ = value;
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 51d2765..50c6de1 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -108,6 +108,10 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim>
{
}
}
+ virtual std::weak_ptr<Configure> getConfig(){
+ return configuration_;
+ }
+
protected:
std::string directory_;
@@ -116,6 +120,7 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim>
{
std::map<std::string, uint32_t> count_map_;
+ std::weak_ptr<Configure> configuration_;
};
} /* namespace core */
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp
index c0f1694..0a114d3 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -30,6 +30,7 @@ namespace core {
namespace repository {
bool FileSystemRepository::initialize(const std::shared_ptr<minifi::Configure> &configuration)
{
+ configuration_ = configuration;
std::string value;
if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, value))
{
directory_ = value;
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index 674566b..1b27157 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -35,6 +35,7 @@ namespace repository {
const char *VolatileContentRepository::minimal_locking = "minimal.locking";
bool VolatileContentRepository::initialize(const std::shared_ptr<Configure> &configure)
{
+ configuration_ = configure;
VolatileRepository::initialize(configure);
resource_claim_comparator_ = [](std::shared_ptr<minifi::ResourceClaim> lhsPtr, std::shared_ptr<minifi::ResourceClaim>
rhsPtr) {
if (lhsPtr == nullptr || rhsPtr == nullptr) {
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index c12edeb..c8104d5 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -249,8 +249,10 @@ class TestController {
flow_version_ = std::make_shared<minifi::state::response::FlowVersion>("test",
"test", "test");
}
- std::shared_ptr<TestPlan> createPlan() {
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+ std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> configuration
= nullptr) {
+ if(configuration == nullptr) {
+ configuration = std::make_shared<minifi::Configure>();
+ }
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);
diff --git a/libminifi/test/expression-language-tests/ExpressionLanguageTests.cpp b/libminifi/test/expression-language-tests/ExpressionLanguageTests.cpp
index 3ee6ec3..c96cd12 100644
--- a/libminifi/test/expression-language-tests/ExpressionLanguageTests.cpp
+++ b/libminifi/test/expression-language-tests/ExpressionLanguageTests.cpp
@@ -25,6 +25,7 @@
#include <ExtractText.h>
#include <GetFile.h>
#include <PutFile.h>
+#include <UpdateAttribute.h>
#include <LogAttribute.h>
namespace expression = org::apache::nifi::minifi::expression;
@@ -176,8 +177,13 @@ TEST_CASE("GetFile PutFile dynamic attribute", "[expressionLanguageTestGetFilePu
LogTestController::getInstance().setTrace<processors::GetFile>();
LogTestController::getInstance().setTrace<processors::PutFile>();
LogTestController::getInstance().setTrace<processors::LogAttribute>();
+ LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
- auto plan = testController.createPlan();
+ auto conf = std::make_shared<minifi::Configure>();
+
+ conf->set("nifi.my.own.property", "custom_value");
+
+ auto plan = testController.createPlan(conf);
auto repo = std::make_shared<TestRepository>();
std::string in_dir("/tmp/gt.XXXXXX");
@@ -203,6 +209,12 @@ TEST_CASE("GetFile PutFile dynamic attribute", "[expressionLanguageTestGetFilePu
get_file,
processors::GetFile::KeepSourceFile.getName(),
"false");
+ auto update = plan->addProcessor(
+ "UpdateAttribute",
+ "UpdateAttribute",
+ core::Relationship("success", "description"),
+ true);
+ update->setDynamicProperty("prop_attr", "${'nifi.my.own.property'}_added");
plan->addProcessor(
"LogAttribute",
"LogAttribute",
@@ -246,6 +258,7 @@ TEST_CASE("GetFile PutFile dynamic attribute", "[expressionLanguageTestGetFilePu
}
plan->runNextProcessor(); // GetFile
+ plan->runNextProcessor(); // Update
plan->runNextProcessor(); // Log
plan->runNextProcessor(); // ExtractText
plan->runNextProcessor(); // Log
@@ -258,6 +271,8 @@ TEST_CASE("GetFile PutFile dynamic attribute", "[expressionLanguageTestGetFilePu
output_str << out_file_stream.rdbuf();
REQUIRE("extracted_attr" == output_str.str());
}
+
+ REQUIRE(LogTestController::getInstance().contains("key:prop_attr value:custom_value_added"));
}
TEST_CASE("Substring 2 arg", "[expressionLanguageSubstring2]") { // NOLINT
|