nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject [1/5] nifi-minifi-cpp git commit: MINIFI-226: Add controller services capabilities along with unit tests Fix test failures Update Travis YML Update readme to link to MiNiFi licensing information
Date Thu, 18 May 2017 13:07:51 GMT
Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 372f2d6f8 -> c9940e945


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/InvokeHTTPTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/InvokeHTTPTests.cpp b/libminifi/test/unit/InvokeHTTPTests.cpp
index 6b96549..0200710 100644
--- a/libminifi/test/unit/InvokeHTTPTests.cpp
+++ b/libminifi/test/unit/InvokeHTTPTests.cpp
@@ -18,13 +18,13 @@
 #include <uuid/uuid.h>
 #include <fstream>
 #include "FlowController.h"
-#include "ProvenanceTestHelper.h"
 #include "../TestBase.h"
 #include "core/logging/LogAppenders.h"
 #include "core/logging/BaseLogger.h"
 #include "processors/GetFile.h"
 #include "core/Core.h"
 #include "../../include/core/FlowFile.h"
+#include "../unit/ProvenanceTestHelper.h"
 #include "core/Processor.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
@@ -35,7 +35,8 @@ TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
   std::stringstream oss;
   std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
       logging::BaseLogger>(
-      new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,0));
+      new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
+                                                                         0));
   std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
   logger->updateLogger(std::move(outputLogger));
 
@@ -43,9 +44,7 @@ TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
 
   testController.enableDebug();
 
-
-  std::shared_ptr<TestRepository> repo = std::make_shared<
-      TestRepository>();
+  std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
 
   std::shared_ptr<core::Processor> processor = std::make_shared<
       org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
@@ -58,26 +57,23 @@ TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
   uuid_t invokehttp_uuid;
   REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));
 
-
   std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(repo,"getfileCreate2Connection");
+      minifi::Connection>(repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
   std::shared_ptr<minifi::Connection> connection2 = std::make_shared<
-      minifi::Connection>(repo,"listenhttp");
+      minifi::Connection>(repo, "listenhttp");
 
   connection2->setRelationship(core::Relationship("No Retry", "description"));
 
   // link the connections so that we can test results at the end for this
   connection->setSource(processor);
 
-
   // link the connections so that we can test results at the end for this
   connection->setDestination(invokehttp);
 
   connection2->setSource(invokehttp);
 
-
   connection2->setSourceUUID(invokehttp_uuid);
   connection->setSourceUUID(processoruuid);
   connection->setDestinationUUID(invokehttp_uuid);
@@ -86,21 +82,23 @@ TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
   invokehttp->addConnection(connection);
   invokehttp->addConnection(connection2);
 
-
   core::ProcessorNode node(processor);
   core::ProcessorNode node2(invokehttp);
 
-  core::ProcessContext context(node, repo);
-  core::ProcessContext context2(node2, repo);
+  std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider
=
+      nullptr;
+  core::ProcessContext context(node, controller_services_provider, repo);
+  core::ProcessContext context2(node2, controller_services_provider, repo);
   context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port,
                       "8685");
-  context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath,
-                        "/testytesttest");
+  context.setProperty(
+      org::apache::nifi::minifi::processors::ListenHTTP::BasePath,
+      "/testytesttest");
 
-  context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method,
-                        "POST");
+  context2.setProperty(
+      org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST");
   context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL,
-                          "http://localhost:8685/testytesttest");
+                       "http://localhost:8685/testytesttest");
   core::ProcessSession session(&context);
   core::ProcessSession session2(&context2);
 
@@ -125,7 +123,6 @@ TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
   REQUIRE(record == nullptr);
   REQUIRE(records.size() == 0);
 
-
   processor->incrementActiveTasks();
   processor->setScheduledState(core::ScheduledState::RUNNING);
   processor->onTrigger(&context, &session);
@@ -142,25 +139,24 @@ TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
   session2.commit();
   records = reporter->getEvents();
 
-
-
   for (provenance::ProvenanceEventRecord *provEventRecord : records) {
     REQUIRE(provEventRecord->getComponentType() == processor->getName());
   }
   std::shared_ptr<core::FlowFile> ffr = session2.get();
   std::string log_attribute_output = oss.str();
-std::cout << log_attribute_output << std::endl;
-  REQUIRE( log_attribute_output.find("exiting because method is POST") != std::string::npos
);
+  REQUIRE(
+      log_attribute_output.find("exiting because method is POST")
+          != std::string::npos);
 
 }
 
-
 TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
 
   std::stringstream oss;
   std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
       logging::BaseLogger>(
-      new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,0));
+      new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
+                                                                         0));
   std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
   logger->updateLogger(std::move(outputLogger));
 
@@ -168,19 +164,16 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
 
   testController.enableDebug();
 
-
-
-  std::shared_ptr<TestRepository> repo = std::make_shared<
-      TestRepository>();
+  std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
 
   std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<
-        org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
 
-    std::shared_ptr<core::Processor> logAttribute = std::make_shared<
-        org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+  std::shared_ptr<core::Processor> logAttribute = std::make_shared<
+      org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
 
-    char format[] = "/tmp/gt.XXXXXX";
-    char *dir = testController.createTempDirectory(format);
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
 
   std::shared_ptr<core::Processor> listenhttp = std::make_shared<
       org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
@@ -193,30 +186,26 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
   uuid_t invokehttp_uuid;
   REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));
 
-
   std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<
-    minifi::Connection>(repo, "getfileCreate2Connection");
+      minifi::Connection>(repo, "getfileCreate2Connection");
   gcConnection->setRelationship(core::Relationship("success", "description"));
 
-std::shared_ptr<minifi::Connection> laConnection = std::make_shared<
-    minifi::Connection>(repo, "logattribute");
-laConnection->setRelationship(core::Relationship("success", "description"));
-
-
+  std::shared_ptr<minifi::Connection> laConnection = std::make_shared<
+      minifi::Connection>(repo, "logattribute");
+  laConnection->setRelationship(core::Relationship("success", "description"));
 
   std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(repo,"getfileCreate2Connection");
+      minifi::Connection>(repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
   std::shared_ptr<minifi::Connection> connection2 = std::make_shared<
-      minifi::Connection>(repo,"listenhttp");
+      minifi::Connection>(repo, "listenhttp");
 
   connection2->setRelationship(core::Relationship("No Retry", "description"));
 
   // link the connections so that we can test results at the end for this
   connection->setSource(listenhttp);
 
-
   connection2->setSourceUUID(invokehttp_uuid);
   connection->setSourceUUID(processoruuid);
   connection->setDestinationUUID(invokehttp_uuid);
@@ -225,21 +214,22 @@ laConnection->setRelationship(core::Relationship("success", "description"));
   invokehttp->addConnection(connection);
   invokehttp->addConnection(connection2);
 
-
   core::ProcessorNode node(listenhttp);
   core::ProcessorNode node2(invokehttp);
-
-  core::ProcessContext context(node, repo);
-  core::ProcessContext context2(node2, repo);
+  std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider
=
+      nullptr;
+  core::ProcessContext context(node, controller_services_provider, repo);
+  core::ProcessContext context2(node2, controller_services_provider, repo);
   context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port,
                       "8686");
-  context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath,
-                        "/testytesttest");
+  context.setProperty(
+      org::apache::nifi::minifi::processors::ListenHTTP::BasePath,
+      "/testytesttest");
 
-  context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method,
-                        "POST");
+  context2.setProperty(
+      org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST");
   context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL,
-                          "http://localhost:8686/testytesttest");
+                       "http://localhost:8686/testytesttest");
   core::ProcessSession session(&context);
   core::ProcessSession session2(&context2);
 
@@ -264,7 +254,6 @@ laConnection->setRelationship(core::Relationship("success", "description"));
   REQUIRE(record == nullptr);
   REQUIRE(records.size() == 0);
 
-
   listenhttp->incrementActiveTasks();
   listenhttp->setScheduledState(core::ScheduledState::RUNNING);
   listenhttp->onTrigger(&context, &session);
@@ -281,32 +270,28 @@ laConnection->setRelationship(core::Relationship("success", "description"));
   session2.commit();
   records = reporter->getEvents();
 
-
-
   for (provenance::ProvenanceEventRecord *provEventRecord : records) {
     REQUIRE(provEventRecord->getComponentType() == listenhttp->getName());
   }
   std::shared_ptr<core::FlowFile> ffr = session2.get();
   std::string log_attribute_output = oss.str();
-std::cout << log_attribute_output << std::endl;
-  REQUIRE( log_attribute_output.find("exiting because method is POST") != std::string::npos
);
+  REQUIRE(
+      log_attribute_output.find("exiting because method is POST")
+          != std::string::npos);
 
 }
 
-
-class CallBack : public minifi::OutputStreamCallback
-{
+class CallBack : public minifi::OutputStreamCallback {
  public:
-  CallBack()
-    {
+  CallBack() {
 
-    }
-  virtual ~CallBack(){
+  }
+  virtual ~CallBack() {
 
   }
-  virtual void process(std::ofstream *stream){
+  virtual void process(std::ofstream *stream) {
     std::string st = "we're gnna write some test stuff";
-    stream->write(st.c_str(),st.length());
+    stream->write(st.c_str(), st.length());
   }
 };
 
@@ -315,7 +300,8 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
   std::stringstream oss;
   std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
       logging::BaseLogger>(
-      new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,0));
+      new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
+                                                                         0));
   std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
   logger->updateLogger(std::move(outputLogger));
 
@@ -323,19 +309,16 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
 
   testController.enableDebug();
 
-
-
-  std::shared_ptr<TestRepository> repo = std::make_shared<
-      TestRepository>();
+  std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
 
   std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<
-        org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
 
-    std::shared_ptr<core::Processor> logAttribute = std::make_shared<
-        org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+  std::shared_ptr<core::Processor> logAttribute = std::make_shared<
+      org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
 
-    char format[] = "/tmp/gt.XXXXXX";
-    char *dir = testController.createTempDirectory(format);
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
 
   std::shared_ptr<core::Processor> listenhttp = std::make_shared<
       org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
@@ -348,23 +331,20 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
   uuid_t invokehttp_uuid;
   REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));
 
-
   std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<
-    minifi::Connection>(repo, "getfileCreate2Connection");
+      minifi::Connection>(repo, "getfileCreate2Connection");
   gcConnection->setRelationship(core::Relationship("success", "description"));
 
-std::shared_ptr<minifi::Connection> laConnection = std::make_shared<
-    minifi::Connection>(repo, "logattribute");
-laConnection->setRelationship(core::Relationship("success", "description"));
-
-
+  std::shared_ptr<minifi::Connection> laConnection = std::make_shared<
+      minifi::Connection>(repo, "logattribute");
+  laConnection->setRelationship(core::Relationship("success", "description"));
 
   std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(repo,"getfileCreate2Connection");
+      minifi::Connection>(repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
   std::shared_ptr<minifi::Connection> connection2 = std::make_shared<
-      minifi::Connection>(repo,"listenhttp");
+      minifi::Connection>(repo, "listenhttp");
 
   connection2->setRelationship(core::Relationship("No Retry", "description"));
 
@@ -372,31 +352,31 @@ laConnection->setRelationship(core::Relationship("success", "description"));
   connection->setSource(listenhttp);
 
   connection->setSourceUUID(invokehttp_uuid);
-    connection->setDestinationUUID(processoruuid);
+  connection->setDestinationUUID(processoruuid);
 
   connection2->setSourceUUID(processoruuid);
   connection2->setSourceUUID(processoruuid);
 
-
   listenhttp->addConnection(connection);
   invokehttp->addConnection(connection);
   invokehttp->addConnection(connection2);
 
-
   core::ProcessorNode node(invokehttp);
   core::ProcessorNode node2(listenhttp);
-
-  core::ProcessContext context(node, repo);
-  core::ProcessContext context2(node2, repo);
+  std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider
=
+      nullptr;
+  core::ProcessContext context(node, controller_services_provider, repo);
+  core::ProcessContext context2(node2, controller_services_provider, repo);
   context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port,
                       "8680");
-  context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath,
-                        "/testytesttest");
+  context.setProperty(
+      org::apache::nifi::minifi::processors::ListenHTTP::BasePath,
+      "/testytesttest");
 
-  context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method,
-                        "POST");
+  context2.setProperty(
+      org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST");
   context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL,
-                          "http://localhost:8680/testytesttest");
+                       "http://localhost:8680/testytesttest");
   core::ProcessSession session(&context);
   core::ProcessSession session2(&context2);
 
@@ -410,13 +390,14 @@ laConnection->setRelationship(core::Relationship("success", "description"));
 
   /*
    explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository,
-                          std::map<std::string, std::string> attributes,
-                          std::shared_ptr<ResourceClaim> claim = nullptr);
+   std::map<std::string, std::string> attributes,
+   std::shared_ptr<ResourceClaim> claim = nullptr);
    */
-  std::map<std::string,std::string> attributes;
+  std::map<std::string, std::string> attributes;
   attributes["testy"] = "test";
-  std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared<minifi::FlowFileRecord>(repo,attributes);
-  session2.write(flow,&callback);
+  std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared<
+      minifi::FlowFileRecord>(repo, attributes);
+  session2.write(flow, &callback);
 
   invokehttp->incrementActiveTasks();
   invokehttp->setScheduledState(core::ScheduledState::RUNNING);
@@ -429,15 +410,12 @@ laConnection->setRelationship(core::Relationship("success", "description"));
   listenhttp->onSchedule(&context, &factory);
   listenhttp->onTrigger(&context, &session);
 
-
-
   provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
   std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
   record = session.get();
   REQUIRE(record == nullptr);
   REQUIRE(records.size() == 0);
 
-
   listenhttp->incrementActiveTasks();
   listenhttp->setScheduledState(core::ScheduledState::RUNNING);
   listenhttp->onTrigger(&context, &session);
@@ -454,18 +432,14 @@ laConnection->setRelationship(core::Relationship("success", "description"));
   session2.commit();
   records = reporter->getEvents();
 
-
-
   for (provenance::ProvenanceEventRecord *provEventRecord : records) {
     REQUIRE(provEventRecord->getComponentType() == listenhttp->getName());
   }
   std::shared_ptr<core::FlowFile> ffr = session2.get();
   std::string log_attribute_output = oss.str();
-std::cout << log_attribute_output << std::endl;
-  REQUIRE( log_attribute_output.find("exiting because method is POST") != std::string::npos
);
+  REQUIRE(
+      log_attribute_output.find("exiting because method is POST")
+          != std::string::npos);
 
 }
 
-
-
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/MockClasses.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/MockClasses.h b/libminifi/test/unit/MockClasses.h
new file mode 100644
index 0000000..d32184b
--- /dev/null
+++ b/libminifi/test/unit/MockClasses.h
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_TEST_UNIT_MOCKCLASSES_H_
+#define LIBMINIFI_TEST_UNIT_MOCKCLASSES_H_
+
+#include "core/controller/ControllerService.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+std::atomic<bool> disabled;
+
+class MockControllerService : public core::controller::ControllerService {
+ public:
+  explicit MockControllerService(const std::string &name, const std::string &id)
+      : ControllerService(name, id) {
+
+  }
+
+  explicit MockControllerService(const std::string &name, uuid_t uuid)
+      : ControllerService(name, uuid) {
+
+  }
+
+  explicit MockControllerService(const std::string &name)
+      : ControllerService(name, 0) {
+
+  }
+  MockControllerService() {
+
+  }
+
+  ~MockControllerService() {
+
+  }
+
+  virtual void initialize() {
+    core::controller::ControllerService::initialize();
+    enable();
+  }
+
+  std::string doSomething() {
+    return str;
+  }
+
+  virtual void enable() {
+    str = "pushitrealgood";
+  }
+
+  void yield() {
+
+  }
+
+  bool isRunning() {
+    return true;
+  }
+
+  bool isWorkAvailable() {
+    return true;
+  }
+ protected:
+  std::string str;
+};
+
+class MockProcessor : public core::Processor {
+ public:
+
+  explicit MockProcessor(const std::string &name, uuid_t uuid)
+      : Processor(name, uuid) {
+    setTriggerWhenEmpty(true);
+  }
+
+  explicit MockProcessor(const std::string &name)
+      : Processor(name, 0) {
+    setTriggerWhenEmpty(true);
+  }
+
+  ~MockProcessor() {
+
+  }
+
+  virtual void initialize() {
+    core::Property property("linkedService", "Linked service");
+    std::set<core::Property> properties;
+    properties.insert(property);
+    setSupportedProperties(properties);
+
+  }
+
+  // OnTrigger method, implemented by NiFi Processor Designer
+  virtual void onTrigger(core::ProcessContext *context,
+                         core::ProcessSession *session) {
+
+    std::string linked_service = "";
+    getProperty("linkedService", linked_service);
+    if (!IsNullOrEmpty(linked_service)) {
+
+      std::shared_ptr<core::controller::ControllerService> service = context
+          ->getControllerService(linked_service);
+
+      if (!disabled.load()) {
+        assert(true == context->isControllerServiceEnabled(linked_service));
+        assert(nullptr != service);
+        assert(
+            "pushitrealgood"
+                == std::static_pointer_cast<MockControllerService>(service)
+                    ->doSomething());
+      } else {
+        assert(false == context->isControllerServiceEnabled(linked_service));
+      }
+
+      //verify we have access to the controller service
+      // and verify that we can execute it.
+
+    }
+  }
+
+  bool isYield() {
+    return false;
+  }
+
+};
+
+#endif /* LIBMINIFI_TEST_UNIT_MOCKCLASSES_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/ProvenanceTestHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index 58ae870..67b5c65 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -93,7 +93,7 @@ class TestRepository : public core::Repository {
 class TestFlowRepository : public core::repository::FlowFileRepository {
  public:
   TestFlowRepository()
-      : core::repository::FlowFileRepository("./", 1000, 100, 0) {
+      : core::repository::FlowFileRepository("./dir", 1000, 100, 0) {
   }
   // initialize
   bool initialize() {
@@ -154,9 +154,7 @@ class TestFlowRepository : public core::repository::FlowFileRepository
{
   std::map<std::string, std::string> repositoryResults;
 };
 
-
-
-class TestFlowController : public minifi::FlowController {
+class TestFlowController : public minifi::FlowController{
 
 public:
   TestFlowController(std::shared_ptr<core::Repository> repo,

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/ProvenanceTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTests.cpp b/libminifi/test/unit/ProvenanceTests.cpp
index 2e41cc8..6a134ed 100644
--- a/libminifi/test/unit/ProvenanceTests.cpp
+++ b/libminifi/test/unit/ProvenanceTests.cpp
@@ -17,8 +17,8 @@
  */
 
 #include "../TestBase.h"
+#include "../unit/ProvenanceTestHelper.h"
 
-#include "ProvenanceTestHelper.h"
 #include "provenance/Provenance.h"
 #include "FlowFileRecord.h"
 #include "core/Core.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/RepoTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/RepoTests.cpp b/libminifi/test/unit/RepoTests.cpp
index de51ead..c8deb89 100644
--- a/libminifi/test/unit/RepoTests.cpp
+++ b/libminifi/test/unit/RepoTests.cpp
@@ -17,8 +17,8 @@
  */
 
 #include "../TestBase.h"
+#include "../unit/ProvenanceTestHelper.h"
 
-#include "ProvenanceTestHelper.h"
 #include "provenance/Provenance.h"
 #include "FlowFileRecord.h"
 #include "core/Core.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/SerializationTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/SerializationTests.cpp b/libminifi/test/unit/SerializationTests.cpp
index 0841fbd..039ed57 100644
--- a/libminifi/test/unit/SerializationTests.cpp
+++ b/libminifi/test/unit/SerializationTests.cpp
@@ -22,11 +22,12 @@
 #include <uuid/uuid.h>
 #include "core/logging/LogAppenders.h"
 #include "core/logging/BaseLogger.h"
-#include "SiteToSiteHelper.h"
 #include <algorithm>
 #include <string>
 #include <memory>
+
 #include "../TestBase.h"
+#include "../unit/SiteToSiteHelper.h"
 #define FMT_DEFAULT fmt_lower
 
 using namespace org::apache::nifi::minifi::io;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/Site2SiteTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/Site2SiteTests.cpp b/libminifi/test/unit/Site2SiteTests.cpp
index d82a3cf..5af86d5 100644
--- a/libminifi/test/unit/Site2SiteTests.cpp
+++ b/libminifi/test/unit/Site2SiteTests.cpp
@@ -22,11 +22,12 @@
 #include <uuid/uuid.h>
 #include "core/logging/LogAppenders.h"
 #include "core/logging/BaseLogger.h"
-#include "SiteToSiteHelper.h"
 #include <algorithm>
 #include <string>
 #include <memory>
+
 #include "../TestBase.h"
+#include "../unit/SiteToSiteHelper.h"
 #define FMT_DEFAULT fmt_lower
 
 using namespace org::apache::nifi::minifi::io;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/ThreadPoolTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp
new file mode 100644
index 0000000..5c85e19
--- /dev/null
+++ b/libminifi/test/unit/ThreadPoolTests.cpp
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <future>
+#include "../TestBase.h"
+#include "utils/ThreadPool.h"
+
+bool function() {
+  return true;
+}
+
+TEST_CASE("ThreadPoolTest1", "[TPT1]") {
+  utils::ThreadPool<bool> pool(5);
+  std::function<bool()> f_ex = function;
+  utils::Worker<bool> functor(f_ex);
+  pool.start();
+  std::future<bool> fut = pool.execute(std::move(functor));
+
+  fut.wait();
+
+  REQUIRE(true == fut.get());
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/YamlCongifurationTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/YamlCongifurationTests.cpp b/libminifi/test/unit/YamlCongifurationTests.cpp
index 3f804ab..0c229d0 100644
--- a/libminifi/test/unit/YamlCongifurationTests.cpp
+++ b/libminifi/test/unit/YamlCongifurationTests.cpp
@@ -127,7 +127,7 @@ TEST_CASE("Test YAML Config 1", "[testyamlconfig1]") {
       "    timeout: 30 secs\n"
       "    batch size: 1000";
 
-  core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(TEST_PROV_REPO, TEST_FF_REPO,
std::make_shared<minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()));
+  core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(TEST_PROV_REPO, TEST_FF_REPO,
std::make_shared<minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()),std::make_shared<minifi::Configure>());
   std::istringstream yamlstream(TEST_YAML_WITHOUT_IDS);
   std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig->getRoot(yamlstream);
 
@@ -178,7 +178,7 @@ TEST_CASE("Test YAML Config Missing Required Fields", "[testyamlconfig2]")
{
       "            use compression: false\n"
       "\n";
 
-  core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(TEST_PROV_REPO, TEST_FF_REPO,
std::make_shared<minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()));
+  core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(TEST_PROV_REPO, TEST_FF_REPO,
std::make_shared<minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()),std::make_shared<minifi::Configure>());
   std::istringstream yamlstream(TEST_YAML_NO_RPG_PORT_ID);
 
   REQUIRE_THROWS_AS(yamlConfig->getRoot(yamlstream), std::invalid_argument);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/resource/TestHTTPGet.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/resource/TestHTTPGet.yml b/libminifi/test/unit/resource/TestHTTPGet.yml
deleted file mode 100644
index 0783b8e..0000000
--- a/libminifi/test/unit/resource/TestHTTPGet.yml
+++ /dev/null
@@ -1,73 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-Flow Controller:
-    name: MiNiFi Flow
-    id: 2438e3c8-015a-1000-79ca-83af40ec1990
-Processors:
-    - name: invoke
-      id: 2438e3c8-015a-1000-79ca-83af40ec1991
-      class: org.apache.nifi.processors.standard.InvokeHTTP
-      max concurrent tasks: 1
-      scheduling strategy: TIMER_DRIVEN
-      scheduling period: 1 sec
-      penalization period: 30 sec
-      yield period: 1 sec
-      run duration nanos: 0
-      auto-terminated relationships list:
-      Properties:
-          HTTP Method: GET
-          Remote URL: https://curl.haxx.se/libcurl/c/httpput.html
-    - name: OhJeez
-      id: 2438e3c8-015a-1000-79ca-83af40ec1992
-      class: org.apache.nifi.processors.standard.LogAttribute
-      max concurrent tasks: 1
-      scheduling strategy: TIMER_DRIVEN
-      scheduling period: 1 sec
-      penalization period: 30 sec
-      yield period: 1 sec
-      run duration nanos: 0
-      auto-terminated relationships list: response
-      Properties:
-        Log Level: info
-        Log Payload: true
-
-Connections:
-    - name: TransferFilesToRPG
-      id: 2438e3c8-015a-1000-79ca-83af40ec1997
-      source name: invoke
-      source id: 2438e3c8-015a-1000-79ca-83af40ec1991
-      source relationship name: success
-      destination name: OhJeez
-      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
-      max work queue size: 0
-      max work queue data size: 1 MB
-      flowfile expiration: 60 sec
-    - name: TransferFilesToRPG2
-      id: 2438e3c8-015a-1000-79ca-83af40ec1917
-      source name: OhJeez
-      source id: 2438e3c8-015a-1000-79ca-83af40ec1992
-      destination name: OhJeez
-      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992  
-      source relationship name: success
-      max work queue size: 0
-      max work queue data size: 1 MB
-      flowfile expiration: 60 sec
-
-Remote Processing Groups:
-    
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/resource/TestHTTPPost.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/resource/TestHTTPPost.yml b/libminifi/test/unit/resource/TestHTTPPost.yml
deleted file mode 100644
index 837194d..0000000
--- a/libminifi/test/unit/resource/TestHTTPPost.yml
+++ /dev/null
@@ -1,87 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-Flow Controller:
-    name: MiNiFi Flow
-    id: 2438e3c8-015a-1000-79ca-83af40ec1990
-Processors:
-    - name: invoke
-      id: 2438e3c8-015a-1000-79ca-83af40ec1991
-      class: org.apache.nifi.processors.standard.GetFile
-      max concurrent tasks: 1
-      scheduling strategy: TIMER_DRIVEN
-      scheduling period: 1 sec
-      penalization period: 30 sec
-      yield period: 1 sec
-      run duration nanos: 0
-      auto-terminated relationships list:
-      Properties:
-          Input Directory: /tmp/aljr39
-          Keep Source File: false
-
-    - name: OhJeez
-      id: 2438e3c8-015a-1000-79ca-83af40ec1992
-      class: org.apache.nifi.processors.standard.InvokeHTTP
-      max concurrent tasks: 1
-      scheduling strategy: TIMER_DRIVEN
-      scheduling period: 1 sec
-      penalization period: 30 sec
-      yield period: 1 sec
-      run duration nanos: 0
-      auto-terminated relationships list: response
-      Properties:
-          HTTP Method: POST
-          Remote URL: http://requestb.in/u8ax9uu8
-          
-    - name: Loggit
-      id: 2438e3c8-015a-1000-79ca-83af40ec1993
-      class: org.apache.nifi.processors.standard.LogAttribute
-      max concurrent tasks: 1
-      scheduling strategy: TIMER_DRIVEN
-      scheduling period: 1 sec
-      penalization period: 30 sec
-      yield period: 1 sec
-      run duration nanos: 0
-      auto-terminated relationships list: response
-      Properties:
-          LogLevel: info
-
-Connections:
-    - name: TransferFilesToRPG
-      id: 2438e3c8-015a-1000-79ca-83af40ec1997
-      source name: invoke
-      source id: 2438e3c8-015a-1000-79ca-83af40ec1991
-      source relationship name: success
-      destination name: OhJeez
-      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
-      max work queue size: 0
-      max work queue data size: 1 MB
-      flowfile expiration: 60 sec
-    - name: TransferFilesToRPG2
-      id: 2438e3c8-015a-1000-79ca-83af40ec1917
-      source name: OhJeez
-      source id: 2438e3c8-015a-1000-79ca-83af40ec1992
-      destination name: OhJeez
-      destination id: 2438e3c8-015a-1000-79ca-83af40ec1993
-      source relationship name: success
-      max work queue size: 0
-      max work queue data size: 1 MB
-      flowfile expiration: 60 sec
-
-Remote Processing Groups:
-    
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/main/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt
index a0e43b8..87506fa 100644
--- a/main/CMakeLists.txt
+++ b/main/CMakeLists.txt
@@ -23,7 +23,7 @@ IF(POLICY CMP0048)
   CMAKE_POLICY(SET CMP0048 OLD)
 ENDIF(POLICY CMP0048)
 
-include_directories(../include ../libminifi/include  ../libminifi/include/core/yaml  ../libminifi/include/core
../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/jsoncpp/include
../thirdparty/leveldb-1.18/include ../thirdparty/)
+include_directories(../include ../libminifi/include  ../libminifi/include/processors/ ../libminifi/include/core/yaml
 ../libminifi/include/core ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include
../thirdparty/leveldb-1.18/include ../thirdparty/)
 
 find_package(Boost REQUIRED)
 include_directories(${Boost_INCLUDE_DIRS})

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/main/MiNiFiMain.cpp
----------------------------------------------------------------------
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index daf4a8f..3eb16ae 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -60,7 +60,6 @@
 // Variables that allow us to avoid a timed wait.
 sem_t *running;
 //! Flow Controller
-static std::unique_ptr<minifi::FlowController> controller = nullptr;
 
 /**
  * Removed the stop command from the signal handler so that we could trigger
@@ -178,10 +177,11 @@ int main(int argc, char **argv) {
       core::createFlowConfiguration(prov_repo, flow_repo, configure, stream_factory,
                                    nifi_configuration_class_name));
 
-  controller = std::unique_ptr<minifi::FlowController>(
+  std::shared_ptr<minifi::FlowController> controller = std::unique_ptr<minifi::FlowController>(
       new minifi::FlowController(prov_repo, flow_repo, configure,
                                  std::move(flow_configuration)));
 
+  logger->log_info("Loading FlowController");
   // Load flow from specified configuration file
   controller->load();
   // Start Processing the flow


Mime
View raw message