nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject nifi-minifi-cpp git commit: MINIFICPP-217: Resolve compilation failures within Bin files and merge files test
Date Mon, 02 Oct 2017 15:55:40 GMT
Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 9f161a27e -> 1a2fa1ea5


MINIFICPP-217: Resolve compilation failures within Bin files and merge files test

This closes #140.

Signed-off-by: Aldrin Piri <aldrin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/1a2fa1ea
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/1a2fa1ea
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/1a2fa1ea

Branch: refs/heads/master
Commit: 1a2fa1ea52389a821c6f5f56bf6e4b1a352d7f16
Parents: 9f161a2
Author: Marc Parisi <phrocker@apache.org>
Authored: Mon Oct 2 11:36:52 2017 -0400
Committer: Aldrin Piri <aldrin@apache.org>
Committed: Mon Oct 2 11:54:59 2017 -0400

----------------------------------------------------------------------
 libminifi/include/processors/BinFiles.h |  45 +++++++----
 libminifi/src/processors/BinFiles.cpp   |  12 +--
 libminifi/test/unit/MergeFileTests.cpp  | 116 +++++++++++++--------------
 3 files changed, 92 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a2fa1ea/libminifi/include/processors/BinFiles.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/BinFiles.h b/libminifi/include/processors/BinFiles.h
index 6c619a8..69399a3 100644
--- a/libminifi/include/processors/BinFiles.h
+++ b/libminifi/include/processors/BinFiles.h
@@ -41,13 +41,17 @@ namespace processors {
 class Bin {
  public:
   // Constructor
-   /*!
-    * Create a new Bin. Note: this object is not thread safe
-    */
-  explicit Bin(const uint64_t &minSize, const uint64_t &maxSize, const int &minEntries,
const int & maxEntries,
-      const std::string &fileCount, const std::string &groupId)
-      : minSize_(minSize), maxSize_(maxSize), maxEntries_(maxEntries), minEntries_(minEntries),
fileCount_(fileCount),
-        groupId_(groupId), logger_(logging::LoggerFactory<Bin>::getLogger()) {
+  /*!
+   * Create a new Bin. Note: this object is not thread safe
+   */
+  explicit Bin(const uint64_t &minSize, const uint64_t &maxSize, const int &minEntries,
const int & maxEntries, const std::string &fileCount, const std::string &groupId)
+      : minSize_(minSize),
+        maxSize_(maxSize),
+        maxEntries_(maxEntries),
+        minEntries_(minEntries),
+        fileCount_(fileCount),
+        groupId_(groupId),
+        logger_(logging::LoggerFactory<Bin>::getLogger()) {
     queued_data_size_ = 0;
     creation_dated_ = getTimeMillis();
     std::shared_ptr<utils::IdGenerator> id_generator = utils::IdGenerator::getIdGenerator();
@@ -103,8 +107,7 @@ class Bin {
 
     queue_.push_back(flow);
     queued_data_size_ += flow->getSize();
-    logger_->log_info("Bin %s for group %s offer size %d byte %d min_entry %d max_entry
%d",
-        uuid_str_, groupId_, queue_.size(), queued_data_size_, minEntries_, maxEntries_);
+    logger_->log_info("Bin %s for group %s offer size %d byte %d min_entry %d max_entry
%d", uuid_str_, groupId_, queue_.size(), queued_data_size_, minEntries_, maxEntries_);
 
     return true;
   }
@@ -132,7 +135,7 @@ class Bin {
   int minEntries_;
   // Queued data size
   uint64_t queued_data_size_;
-   // Queue for the Flow File
+  // Queue for the Flow File
   std::deque<std::shared_ptr<core::FlowFile>> queue_;
   uint64_t creation_dated_;
   std::string fileCount_;
@@ -148,11 +151,16 @@ class Bin {
 class BinManager {
  public:
   // Constructor
-   /*!
-    * Create a new BinManager
-    */
+  /*!
+   * Create a new BinManager
+   */
   BinManager()
-      : minSize_(0), maxSize_(ULLONG_MAX), maxEntries_(INT_MAX), minEntries_(1), binAge_(ULLONG_MAX),
binCount_(0),
+      : minSize_(0),
+        maxSize_(ULLONG_MAX),
+        maxEntries_(INT_MAX),
+        minEntries_(1),
+        binAge_(ULLONG_MAX),
+        binCount_(0),
         logger_(logging::LoggerFactory<BinManager>::getLogger()) {
   }
   virtual ~BinManager() {
@@ -180,7 +188,7 @@ class BinManager {
     fileCount_ = value;
   }
   void purge() {
-    std::lock_guard < std::mutex > lock(mutex_);
+    std::lock_guard<std::mutex> lock(mutex_);
     groupBinMap_.clear();
     binCount_ = 0;
   }
@@ -204,7 +212,7 @@ class BinManager {
   std::string fileCount_;
   // Bin Age in msec
   uint64_t binAge_;
-  std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>>>
groupBinMap_;
+  std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>>
>groupBinMap_;
   std::deque<std::unique_ptr<Bin>> readyBin_;
   int binCount_;
   std::shared_ptr<logging::Logger> logger_;
@@ -258,7 +266,10 @@ class BinFiles : public core::Processor {
    */
   void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
   // OnTrigger method, implemented by NiFi BinFiles
-  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  }
+  // OnTrigger method, implemented by NiFi BinFiles
+  virtual void onTrigger(std::shared_ptr<core::ProcessContext> context, std::shared_ptr<core::ProcessSession>
session);
   // Initialize, over write by NiFi BinFiles
   virtual void initialize(void);
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a2fa1ea/libminifi/src/processors/BinFiles.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/BinFiles.cpp b/libminifi/src/processors/BinFiles.cpp
index bd4afca..5d118a2 100644
--- a/libminifi/src/processors/BinFiles.cpp
+++ b/libminifi/src/processors/BinFiles.cpp
@@ -233,12 +233,12 @@ bool BinManager::offer(const std::string &group, std::shared_ptr<core::FlowFile>
   return true;
 }
 
-void BinFiles::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void BinFiles::onTrigger(std::shared_ptr<core::ProcessContext> context, std::shared_ptr<core::ProcessSession>
session) {
   std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast < FlowFileRecord
> (session->get());
 
   if (flow != nullptr) {
-    preprocessFlowFile(context, session, flow);
-    std::string groupId = getGroupId(context, flow);
+    preprocessFlowFile(context.get(), session.get(), flow);
+    std::string groupId = getGroupId(context.get(), flow);
 
     bool offer = this->binManager_.offer(groupId, flow);
     if (!offer) {
@@ -272,10 +272,10 @@ void BinFiles::onTrigger(core::ProcessContext *context, core::ProcessSession
*se
       std::unique_ptr<Bin> bin = std::move(readyBins.front());
       readyBins.pop_front();
       // add bin's flows to the session
-      this->addFlowsToSession(context, &mergeSession, bin);
+      this->addFlowsToSession(context.get(), &mergeSession, bin);
       logger_->log_info("BinFiles start to process bin %s for group %s", bin->getUUIDStr(),
bin->getGroupId());
-      if (!this->processBin(context, &mergeSession, bin))
-          this->transferFlowsToFail(context, &mergeSession, bin);
+      if (!this->processBin(context.get(), &mergeSession, bin))
+          this->transferFlowsToFail(context.get(), &mergeSession, bin);
     }
     mergeSession.commit();
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a2fa1ea/libminifi/test/unit/MergeFileTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/MergeFileTests.cpp b/libminifi/test/unit/MergeFileTests.cpp
index d3e4fa9..cf09df1 100644
--- a/libminifi/test/unit/MergeFileTests.cpp
+++ b/libminifi/test/unit/MergeFileTests.cpp
@@ -143,18 +143,18 @@ TEST_CASE("MergeFileDefragment", "[mergefiletest1]") {
     logAttributeProcessor->incrementActiveTasks();
     logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
 
-    core::ProcessorNode node(processor);
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
     std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider
= nullptr;
-    core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
MERGE_FORMAT_CONCAT_VALUE);
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
MERGE_STRATEGY_DEFRAGMENT);
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
DELIMITER_STRATEGY_TEXT);
+    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider,
repo, repo, content_repo);
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
MERGE_FORMAT_CONCAT_VALUE);
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
MERGE_STRATEGY_DEFRAGMENT);
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
DELIMITER_STRATEGY_TEXT);
 
-    core::ProcessSession sessionGenFlowFile(&context);
+    core::ProcessSession sessionGenFlowFile(context);
     std::shared_ptr<core::FlowFile> record[6];
 
     // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection();
+    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
     std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
     for (int i = 0; i < 6; i++) {
       std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile
> (sessionGenFlowFile.create());
@@ -180,12 +180,12 @@ TEST_CASE("MergeFileDefragment", "[mergefiletest1]") {
     income_connection->put(record[3]);
 
     REQUIRE(processor->getName() == "mergecontent");
-    core::ProcessSessionFactory factory(&context);
-    processor->onSchedule(&context, &factory);
+    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+    processor->onSchedule(context, factory);
     for (int i = 0; i < 6; i++) {
-      core::ProcessSession session(&context);
-      processor->onTrigger(&context, &session);
-      session.commit();
+      auto session = std::make_shared<core::ProcessSession>(context);
+      processor->onTrigger(context, session);
+      session->commit();
     }
     // validate the merge content
     std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
@@ -315,21 +315,21 @@ TEST_CASE("MergeFileDefragmentDelimiter", "[mergefiletest2]") {
     logAttributeProcessor->incrementActiveTasks();
     logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
 
-    core::ProcessorNode node(processor);
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
     std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider
= nullptr;
-    core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
MERGE_FORMAT_CONCAT_VALUE);
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
MERGE_STRATEGY_DEFRAGMENT);
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
DELIMITER_STRATEGY_FILENAME);
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::Header, "/tmp/minifi-mergecontent.header");
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer, "/tmp/minifi-mergecontent.footer");
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator,
"/tmp/minifi-mergecontent.demarcator");
-
-    core::ProcessSession sessionGenFlowFile(&context);
+    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider,
repo, repo, content_repo);
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
MERGE_FORMAT_CONCAT_VALUE);
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
MERGE_STRATEGY_DEFRAGMENT);
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
DELIMITER_STRATEGY_FILENAME);
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header,
"/tmp/minifi-mergecontent.header");
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer,
"/tmp/minifi-mergecontent.footer");
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator,
"/tmp/minifi-mergecontent.demarcator");
+
+    core::ProcessSession sessionGenFlowFile(context);
     std::shared_ptr<core::FlowFile> record[6];
 
     // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection();
+    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
     std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
     for (int i = 0; i < 6; i++) {
       std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile
> (sessionGenFlowFile.create());
@@ -355,12 +355,12 @@ TEST_CASE("MergeFileDefragmentDelimiter", "[mergefiletest2]") {
     income_connection->put(record[3]);
 
     REQUIRE(processor->getName() == "mergecontent");
-    core::ProcessSessionFactory factory(&context);
-    processor->onSchedule(&context, &factory);
+    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+    processor->onSchedule(context, factory);
     for (int i = 0; i < 6; i++) {
-      core::ProcessSession session(&context);
-      processor->onTrigger(&context, &session);
-      session.commit();
+      auto session = std::make_shared<core::ProcessSession>(context);
+      processor->onTrigger(context, session);
+      session->commit();
     }
     // validate the merge content
     std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
@@ -478,19 +478,19 @@ TEST_CASE("MergeFileDefragmentDropFlow", "[mergefiletest3]") {
     logAttributeProcessor->incrementActiveTasks();
     logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
 
-    core::ProcessorNode node(processor);
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
     std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider
= nullptr;
-    core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
MERGE_FORMAT_CONCAT_VALUE);
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
MERGE_STRATEGY_DEFRAGMENT);
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
DELIMITER_STRATEGY_TEXT);
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1
sec");
+    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider,
repo, repo, content_repo);
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
MERGE_FORMAT_CONCAT_VALUE);
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
MERGE_STRATEGY_DEFRAGMENT);
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
DELIMITER_STRATEGY_TEXT);
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge,
"1 sec");
 
-    core::ProcessSession sessionGenFlowFile(&context);
+    core::ProcessSession sessionGenFlowFile(context);
     std::shared_ptr<core::FlowFile> record[6];
 
     // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection();
+    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
     std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
     for (int i = 0; i < 6; i++) {
       if (i == 4)
@@ -517,19 +517,19 @@ TEST_CASE("MergeFileDefragmentDropFlow", "[mergefiletest3]") {
     income_connection->put(record[3]);
 
     REQUIRE(processor->getName() == "mergecontent");
-    core::ProcessSessionFactory factory(&context);
-    processor->onSchedule(&context, &factory);
+    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+    processor->onSchedule(context, factory);
     for (int i = 0; i < 6; i++) {
       if (i == 4)
         continue;
-      core::ProcessSession session(&context);
-      processor->onTrigger(&context, &session);
-      session.commit();
+      auto session = std::make_shared<core::ProcessSession>(context);
+      processor->onTrigger(context, session);
+      session->commit();
     }
     std::this_thread::sleep_for(std::chrono::milliseconds(2000));
     {
-      core::ProcessSession session(&context);
-      processor->onTrigger(&context, &session);
+      auto session = std::make_shared<core::ProcessSession>(context);
+      processor->onTrigger(context, session);
     }
     // validate the merge content
     std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
@@ -643,20 +643,20 @@ TEST_CASE("MergeFileBinPack", "[mergefiletest4]") {
     logAttributeProcessor->incrementActiveTasks();
     logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
 
-    core::ProcessorNode node(processor);
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
     std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider
= nullptr;
-    core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
MERGE_FORMAT_CONCAT_VALUE);
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
MERGE_STRATEGY_BIN_PACK);
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
DELIMITER_STRATEGY_TEXT);
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
-    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName,
"tag");
-
-    core::ProcessSession sessionGenFlowFile(&context);
+    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider,
repo, repo, content_repo);
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
MERGE_FORMAT_CONCAT_VALUE);
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
MERGE_STRATEGY_BIN_PACK);
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
DELIMITER_STRATEGY_TEXT);
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize,
"96");
+    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName,
"tag");
+
+    core::ProcessSession sessionGenFlowFile(context);
     std::shared_ptr<core::FlowFile> record[6];
 
     // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection();
+    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
     std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
     for (int i = 0; i < 6; i++) {
       std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile
> (sessionGenFlowFile.create());
@@ -673,12 +673,12 @@ TEST_CASE("MergeFileBinPack", "[mergefiletest4]") {
     income_connection->put(record[5]);
 
     REQUIRE(processor->getName() == "mergecontent");
-    core::ProcessSessionFactory factory(&context);
-    processor->onSchedule(&context, &factory);
+    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+    processor->onSchedule(context, factory);
     for (int i = 0; i < 6; i++) {
-      core::ProcessSession session(&context);
-      processor->onTrigger(&context, &session);
-      session.commit();
+      auto session = std::make_shared<core::ProcessSession>(context);
+      processor->onTrigger(context, session);
+      session->commit();
     }
     // validate the merge content
     std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;


Mime
View raw message