nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject [2/2] nifi-minifi-cpp git commit: MINIFI-270: Move Base RPG capabilities back into RemoteProcessGroupPort
Date Thu, 18 May 2017 16:57:04 GMT
MINIFI-270: Move Base RPG capabilities back into RemoteProcessGroupPort

This allows us to extend capabilities in RPG into other classes. Since
the same basic functions are the same we can bring the code up from
Processor into RemoteProcessGroupPort.h instead of a separate class.

Updated LICENSE and added concurrentqueue to locking for the object pool.

This closes #99.

Signed-off-by: Aldrin Piri <aldrin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/082c85a8
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/082c85a8
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/082c85a8

Branch: refs/heads/master
Commit: 082c85a866e355b08b1d47fcf3d2792983003cea
Parents: 1498ec8
Author: Marc Parisi <phrocker@apache.org>
Authored: Wed May 17 08:18:26 2017 -0400
Committer: Aldrin Piri <aldrin@apache.org>
Committed: Thu May 18 12:56:06 2017 -0400

----------------------------------------------------------------------
 CMakeLists.txt                                  |    3 +-
 LICENSE                                         |   27 +
 libminifi/CMakeLists.txt                        |    1 +
 libminifi/include/RemoteProcessorGroupPort.h    |   22 +-
 libminifi/include/core/Processor.h              |   10 +-
 .../SiteToSiteProvenanceReportingTask.h         |  112 +-
 libminifi/src/RemoteProcessorGroupPort.cpp      |  119 +-
 libminifi/src/core/Processor.cpp                |   47 -
 .../SiteToSiteProvenanceReportingTask.cpp       |   52 +-
 main/CMakeLists.txt                             |    2 +-
 thirdparty/concurrentqueue/concurrentqueue.h    | 3626 ++++++++++++++++++
 11 files changed, 3837 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/082c85a8/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index bf349a6..73222dd 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -105,6 +105,7 @@ set(JSONCPP_LIB "${JSONCPP_LIB_DIR}/lib/${prefix}jsoncpp${suffix}")
 set(CIVETWEB_ENABLE_CXX ON CACHE BOOL "Enable civet C++ library")
 add_subdirectory(thirdparty/yaml-cpp-yaml-cpp-0.5.3)
 add_subdirectory(thirdparty/civetweb-1.9.1)
+include_directories(thirdparty/concurrentqueue)
 add_subdirectory(libminifi)
 add_subdirectory(main)
 
@@ -144,7 +145,7 @@ set(CPACK_COMPONENTS_ALL bin)
 include(CPack)
 
 include(BuildTests)
-       
+
 include(BuildDocs)
 
 include(DockerConfig)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/082c85a8/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index 30be538..f67f624 100644
--- a/LICENSE
+++ b/LICENSE
@@ -401,6 +401,33 @@ ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 The views and conclusions contained in the software and documentation are those of the authors
and
 should not be interpreted as representing official policies, either expressed or implied,
of Dmitry Vyukov.
 
+
+This product bundles 'concurrentqueue' which is available a Boost Software License.
+
+Boost Software License - Version 1.0 - August 17th, 2003
+
+Permission is hereby granted, free of charge, to any person or organization
+obtaining a copy of the software and accompanying documentation covered by
+this license (the "Software") to use, reproduce, display, distribute,
+execute, and transmit the Software, and to prepare derivative works of the
+Software, and to permit third-parties to whom the Software is furnished to
+do so, all subject to the following:
+
+The copyright notices in the Software and this entire statement, including
+the above license grant, this restriction and the following disclaimer,
+must be included in all copies of the Software, in whole or in part, and
+all derivative works of the Software, unless such copies or derivative
+works are solely in the form of machine-executable object code generated by
+a source language processor.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
+SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
+FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
+
 This product bundles 'Catch' which is available under a Boost Software License.
 
 Boost Software License - Version 1.0 - August 17th, 2003

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/082c85a8/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index a0ec75f..f674b68 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -59,6 +59,7 @@ include_directories(../include)
 include_directories(../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include)
 include_directories(../thirdparty/civetweb-1.9.1/include)
 include_directories(../thirdparty/jsoncpp/include)
+include_directories(../thirdparty/concurrentqueue/)
 include_directories(include)
 
 file(GLOB SOURCES  "src/core/logging/*.cpp"  "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/controller/*.cpp"
"src/controllers/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp"
"src/core/reporting/*.cpp"  "src/provenance/*.cpp" "src/processors/*.cpp" "src/*.cpp")

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/082c85a8/libminifi/include/RemoteProcessorGroupPort.h
----------------------------------------------------------------------
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index cc96a0b..e74342e 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -23,6 +23,7 @@
 #include <mutex>
 #include <memory>
 #include <stack>
+#include "concurrentqueue.h"
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
@@ -40,13 +41,17 @@ class RemoteProcessorGroupPort : public core::Processor {
   /*!
    * Create a new processor
    */
-  RemoteProcessorGroupPort(const std::shared_ptr<io::StreamFactory> &stream_factory,
std::string name, uuid_t uuid = NULL)
+  RemoteProcessorGroupPort(
+      const std::shared_ptr<io::StreamFactory> &stream_factory,
+      std::string name, uuid_t uuid = nullptr)
       : core::Processor(name, uuid),
         direction_(SEND),
         transmitting_(false) {
     stream_factory_ = stream_factory;
     logger_ = logging::Logger::getLogger();
-    uuid_copy(protocol_uuid_, uuid);
+    if (uuid != nullptr) {
+      uuid_copy(protocol_uuid_, uuid);
+    }
   }
   // Destructor
   virtual ~RemoteProcessorGroupPort() {
@@ -61,6 +66,8 @@ class RemoteProcessorGroupPort : public core::Processor {
   // Supported Relationships
   static core::Relationship relation;
  public:
+  void onSchedule(core::ProcessContext *context,
+                  core::ProcessSessionFactory *sessionFactory);
   // OnTrigger method, implemented by NiFi RemoteProcessorGroupPort
   virtual void onTrigger(core::ProcessContext *context,
                          core::ProcessSession *session);
@@ -83,13 +90,12 @@ class RemoteProcessorGroupPort : public core::Processor {
 
  protected:
 
- private:
   std::shared_ptr<io::StreamFactory> stream_factory_;
-  std::unique_ptr<Site2SiteClientProtocol> getNextProtocol();
+  std::unique_ptr<Site2SiteClientProtocol> getNextProtocol(bool create);
   void returnProtocol(std::unique_ptr<Site2SiteClientProtocol> protocol);
 
-  std::stack<std::unique_ptr<Site2SiteClientProtocol>> available_protocols_;
-  std::mutex protocol_mutex_;
+  moodycamel::ConcurrentQueue<std::unique_ptr<Site2SiteClientProtocol>> available_protocols_;
+
   // Logger
   std::shared_ptr<logging::Logger> logger_;
   // Transaction Direction
@@ -101,6 +107,10 @@ class RemoteProcessorGroupPort : public core::Processor {
 
   uuid_t protocol_uuid_;
 
+  std::string host_;
+
+  uint16_t port_;
+
 };
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/082c85a8/libminifi/include/core/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 9f9b31a..1a18e77 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -243,21 +243,13 @@ class Processor : public Connectable, public ConfigurableComponent,
   // Trigger the Processor even if the incoming connection is empty
   std::atomic<bool> _triggerWhenEmpty;
 
-//! obtainSite2SiteProtocol for use
-  std::shared_ptr<Site2SiteClientProtocol> obtainSite2SiteProtocol(const std::shared_ptr<io::StreamFactory>
&stream_factory, std::string host, uint16_t sport, uuid_t portId);
-  //! returnSite2SiteProtocol after use
-  void returnSite2SiteProtocol(std::shared_ptr<Site2SiteClientProtocol> protocol);private:
+  private:
 
   // Mutex for protection
   std::mutex mutex_;
   // Yield Expiration
   std::atomic<uint64_t> yield_expiration_;
 
-  // Site2Site Protocols
-  std::stack<std::shared_ptr<Site2SiteClientProtocol>> available_protocols_;
-  std::atomic<bool> protocols_created_;
-
-
   // Check all incoming connections for work
   bool isWorkAvailable();
   // Prevent default copy constructor and assignment operation

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/082c85a8/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
index 34921eb..f69690c 100644
--- a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
+++ b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
@@ -26,6 +26,7 @@
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
+#include "RemoteProcessorGroupPort.h"
 #include "Site2SiteClientProtocol.h"
 #include "io/StreamFactory.h"
 
@@ -37,52 +38,59 @@ namespace core {
 namespace reporting {
 
 //! SiteToSiteProvenanceReportingTask Class
-class SiteToSiteProvenanceReportingTask: public core::Processor {
-public:
-	//! Constructor
-	/*!
-	 * Create a new processor
-	 */
-	SiteToSiteProvenanceReportingTask(const std::shared_ptr<io::StreamFactory> &stream_factory)
:
-			core::Processor(ReportTaskName) {
-		logger_ = logging::Logger::getLogger();
-		this->setTriggerWhenEmpty(true);
-		port_ = 0;
-		batch_size_ = 100;
-	}
-	//! Destructor
-	virtual ~SiteToSiteProvenanceReportingTask() {
+class SiteToSiteProvenanceReportingTask :
+    public minifi::RemoteProcessorGroupPort {
+ public:
+  //! Constructor
+  /*!
+   * Create a new processor
+   */
+  SiteToSiteProvenanceReportingTask(
+      const std::shared_ptr<io::StreamFactory> &stream_factory)
+      : minifi::RemoteProcessorGroupPort(stream_factory, ReportTaskName) {
+    logger_ = logging::Logger::getLogger();
+    this->setTriggerWhenEmpty(true);
+    port_ = 0;
+    batch_size_ = 100;
+  }
+  //! Destructor
+  virtual ~SiteToSiteProvenanceReportingTask() {
 
-	}
-	//! Report Task Name
-	static constexpr char const* ReportTaskName = "SiteToSiteProvenanceReportingTask";
-	static const char *ProvenanceAppStr;
+  }
+  //! Report Task Name
+  static constexpr char const* ReportTaskName =
+      "SiteToSiteProvenanceReportingTask";
+  static const char *ProvenanceAppStr;
 
-public:
-	//! Get provenance json report
-	void getJsonReport(core::ProcessContext *context,
-	    core::ProcessSession *session, std::vector < std::shared_ptr < provenance::ProvenanceEventRecord
>> &records,
-	    std::string &report);
-	//! OnTrigger method, implemented by NiFi SiteToSiteProvenanceReportingTask
-	virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
-	//! Initialize, over write by NiFi SiteToSiteProvenanceReportingTask
-	virtual void initialize(void);
-	//! Set Port UUID
-	void setPortUUID(uuid_t port_uuid) {
-	  uuid_copy(port_uuid_, port_uuid);
-	}
-	//! Set Host
-	void setHost(std::string host) {
-	  host_ = host;
-	}
-	//! Set Port
-	void setPort(uint16_t port) {
-	  port_ = port;
-	}
-	//! Set Batch Size
-	void setBatchSize(int size) {
-	  batch_size_ = size;
-	}
+ public:
+  //! Get provenance json report
+  void getJsonReport(
+      core::ProcessContext *context, core::ProcessSession *session,
+      std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records,
+      std::string &report);
+  void onSchedule(core::ProcessContext *context,
+                    core::ProcessSessionFactory *sessionFactory);
+  //! OnTrigger method, implemented by NiFi SiteToSiteProvenanceReportingTask
+  virtual void onTrigger(core::ProcessContext *context,
+                         core::ProcessSession *session);
+  //! Initialize, over write by NiFi SiteToSiteProvenanceReportingTask
+  virtual void initialize(void);
+  //! Set Port UUID
+  void setPortUUID(uuid_t port_uuid) {
+    uuid_copy(protocol_uuid_, port_uuid);
+  }
+  //! Set Host
+  void setHost(std::string host) {
+    host_ = host;
+  }
+  //! Set Port
+  void setPort(uint16_t port) {
+    port_ = port;
+  }
+  //! Set Batch Size
+  void setBatchSize(int size) {
+    batch_size_ = size;
+  }
   //! Get Host
   std::string getHost(void) {
     return (host_);
@@ -97,19 +105,15 @@ public:
   }
   //! Get Port UUID
   void getPortUUID(uuid_t port_uuid) {
-    uuid_copy(port_uuid, port_uuid_);
+    uuid_copy(port_uuid, protocol_uuid_);
   }
 
-protected:
+ protected:
 
-private:
-	uuid_t port_uuid_;
-	std::string host_;
-	uint16_t port_;
-	int batch_size_;
-	//! Logger
-	std::shared_ptr<logging::Logger> logger_;
-        std::shared_ptr<io::StreamFactory> stream_factory_;
+ private:
+  int batch_size_;
+  //! Logger
+  std::shared_ptr<logging::Logger> logger_;
 };
 
 // SiteToSiteProvenanceReportingTask 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/082c85a8/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 70771af..2e7e61a 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -17,24 +17,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "RemoteProcessorGroupPort.h"
-#include <sys/time.h>
-#include <string.h>
-#include <time.h>
-#include <vector>
-#include <queue>
-#include <map>
+
+#include "../include/RemoteProcessorGroupPort.h"
+
+#include <uuid/uuid.h>
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <deque>
+#include <iostream>
 #include <set>
 #include <string>
+#include <type_traits>
 #include <utility>
-#include <memory>
-#include <sstream>
-#include <iostream>
-#include "../include/io/StreamFactory.h"
-#include "io/ClientSocket.h"
-#include "utils/TimeUtil.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
+
+#include "../include/core/logging/Logger.h"
+#include "../include/core/ProcessContext.h"
+#include "../include/core/ProcessorNode.h"
+#include "../include/core/Property.h"
+#include "../include/core/Relationship.h"
+#include "../include/Site2SitePeer.h"
 
 namespace org {
 namespace apache {
@@ -47,42 +49,74 @@ core::Property RemoteProcessorGroupPort::hostName("Host Name",
                                                   "Remote Host Name.",
                                                   "localhost");
 core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999");
-core::Property RemoteProcessorGroupPort::portUUID("Port UUID",
-    "Specifies remote NiFi Port UUID.", "");
+core::Property RemoteProcessorGroupPort::portUUID(
+    "Port UUID", "Specifies remote NiFi Port UUID.", "");
 core::Relationship RemoteProcessorGroupPort::relation;
 
-std::unique_ptr<Site2SiteClientProtocol> RemoteProcessorGroupPort::getNextProtocol()
{
-  std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_);
-  if (available_protocols_.empty())
-    return nullptr;
-
-  std::unique_ptr<Site2SiteClientProtocol> return_pointer = std::move(
-      available_protocols_.top());
-  available_protocols_.pop();
-  return std::move(return_pointer);
+std::unique_ptr<Site2SiteClientProtocol> RemoteProcessorGroupPort::getNextProtocol(
+    bool create = true) {
+  std::unique_ptr<Site2SiteClientProtocol> nextProtocol = nullptr;
+  if (!available_protocols_.try_dequeue(nextProtocol)) {
+    if (create) {
+      // create
+      nextProtocol = std::unique_ptr<Site2SiteClientProtocol>(
+          new Site2SiteClientProtocol(nullptr));
+      nextProtocol->setPortId(protocol_uuid_);
+      std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
+          std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(
+              stream_factory_->createSocket(host_, port_));
+      std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(
+          new Site2SitePeer(std::move(str), host_, port_));
+      nextProtocol->setPeer(std::move(peer_));
+    }
+  }
+  return std::move(nextProtocol);
 }
 
 void RemoteProcessorGroupPort::returnProtocol(
     std::unique_ptr<Site2SiteClientProtocol> return_protocol) {
-  std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_);
-  available_protocols_.push(std::move(return_protocol));
+
+  if (available_protocols_.size_approx() >= max_concurrent_tasks_) {
+    // let the memory be freed
+    return;
+  }
+  available_protocols_.enqueue(std::move(return_protocol));
 }
 
 void RemoteProcessorGroupPort::initialize() {
-  // Set the supported properties
+// Set the supported properties
   std::set<core::Property> properties;
   properties.insert(hostName);
   properties.insert(port);
   properties.insert(portUUID);
   setSupportedProperties(properties);
-  // Set the supported relationships
+// Set the supported relationships
   std::set<core::Relationship> relationships;
   relationships.insert(relation);
   setSupportedRelationships(relationships);
 }
 
+void RemoteProcessorGroupPort::onSchedule(
+    core::ProcessContext *context,
+    core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+
+  int64_t lvalue;
+
+  if (context->getProperty(hostName.getName(), value)) {
+    host_ = value;
+  }
+  if (context->getProperty(port.getName(), value)
+      && core::Property::StringToInt(value, lvalue)) {
+    port_ = (uint16_t) lvalue;
+  }
+  if (context->getProperty(portUUID.getName(), value)) {
+    uuid_parse(value.c_str(), protocol_uuid_);
+  }
+}
+
 void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
-    core::ProcessSession *session) {
+                                         core::ProcessSession *session) {
   if (!transmitting_)
     return;
 
@@ -90,9 +124,8 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
 
   int64_t lvalue;
 
-    std::string host = "";
-    uint16_t sport = 0;
-
+  std::string host = "";
+  uint16_t sport = 0;
 
   if (context->getProperty(hostName.getName(), value)) {
     host = value;
@@ -105,8 +138,7 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
     uuid_parse(value.c_str(), protocol_uuid_);
   }
 
-  std::shared_ptr<Site2SiteClientProtocol> protocol_ =
-      this->obtainSite2SiteProtocol(stream_factory_, host, sport, protocol_uuid_);
+  std::unique_ptr<Site2SiteClientProtocol> protocol_ = getNextProtocol();
 
   if (!protocol_) {
     context->yield();
@@ -116,20 +148,21 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
   if (!protocol_->bootstrap()) {
     // bootstrap the client protocol if needeed
     context->yield();
-    std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor
-        > (context->getProcessorNode().getProcessor());
+    std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(
+        context->getProcessorNode().getProcessor());
     logger_->log_error("Site2Site bootstrap failed yield period %d peer ",
-        processor->getYieldPeriodMsec());
-    returnSite2SiteProtocol(protocol_);
+                       processor->getYieldPeriodMsec());
+    returnProtocol(std::move(protocol_));
     return;
   }
 
-  if (direction_ == RECEIVE)
+  if (direction_ == RECEIVE) {
     protocol_->receiveFlowFiles(context, session);
-  else
+  } else {
     protocol_->transferFlowFiles(context, session);
+  }
 
-  returnSite2SiteProtocol(protocol_);
+  returnProtocol(std::move(protocol_));
 
   return;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/082c85a8/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 7464af2..ac03fde 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -52,7 +52,6 @@ Processor::Processor(std::string name, uuid_t uuid)
   strategy_ = TIMER_DRIVEN;
   loss_tolerant_ = false;
   _triggerWhenEmpty = false;
-  protocols_created_ = false;
   scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
   run_durantion_nano_ = 0;
   yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
@@ -194,52 +193,6 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn)
{
   }
 }
 
-std::shared_ptr<Site2SiteClientProtocol> Processor::obtainSite2SiteProtocol(
-    const std::shared_ptr<io::StreamFactory> &stream_factory, std::string host,
uint16_t sport, uuid_t portId) {
-  std::lock_guard < std::mutex > lock(mutex_);
-
-  if (!protocols_created_) {
-    for (int i = 0; i < this->max_concurrent_tasks_; i++) {
-      // create the protocol pool based on max threads allowed
-      std::shared_ptr<Site2SiteClientProtocol> protocol = std::make_shared<Site2SiteClientProtocol>(nullptr);
-      protocols_created_ = true;
-      protocol->setPortId(portId);
-      std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
-          std::unique_ptr < org::apache::nifi::minifi::io::DataStream
-              > (stream_factory->createSocket(host, sport));
-      std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer
-          > (new Site2SitePeer(std::move(str), host, sport));
-      protocol->setPeer(std::move(peer_));
-      available_protocols_.push(protocol);
-    }
-  }
-  if (!available_protocols_.empty()) {
-    std::shared_ptr<Site2SiteClientProtocol> return_pointer =
-        available_protocols_.top();
-    available_protocols_.pop();
-    return return_pointer;
-  } else {
-    // create the protocol on demand if we exceed the pool
-    std::shared_ptr<Site2SiteClientProtocol> protocol = std::make_shared<Site2SiteClientProtocol>(nullptr);
-    protocol->setPortId(portId);
-    std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
-        std::unique_ptr < org::apache::nifi::minifi::io::DataStream
-            > (stream_factory->createSocket(host, sport));
-    std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer
-        > (new Site2SitePeer(std::move(str), host, sport));
-    protocol->setPeer(std::move(peer_));
-    return protocol;
-  }
-}
-
-void Processor::returnSite2SiteProtocol(
-    std::shared_ptr<Site2SiteClientProtocol> protocol) {
-  std::lock_guard < std::mutex > lock(mutex_);
-  if (protocol && available_protocols_.size() < max_concurrent_tasks_) {
-    available_protocols_.push(protocol);
-  }
-}
-
 bool Processor::flowFilesQueued() {
   std::lock_guard<std::mutex> lock(mutex_);
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/082c85a8/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
index adffc88..3d21683 100644
--- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
+++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
@@ -25,7 +25,7 @@
 #include <memory>
 #include <sstream>
 #include <iostream>
-
+#include <utility>
 #include "core/reporting/SiteToSiteProvenanceReportingTask.h"
 #include "../include/io/StreamFactory.h"
 #include "io/ClientSocket.h"
@@ -50,8 +50,8 @@ const char *SiteToSiteProvenanceReportingTask::ProvenanceAppStr = "MiNiFi
Flow";
 void SiteToSiteProvenanceReportingTask::initialize() {
 }
 
-void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *context,
-    core::ProcessSession *session,
+void SiteToSiteProvenanceReportingTask::getJsonReport(
+    core::ProcessContext *context, core::ProcessSession *session,
     std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records,
     std::string &report) {
 
@@ -63,7 +63,8 @@ void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext
*cont
     Json::Value childUuidJson;
     recordJson["eventId"] = record->getEventId().c_str();
     recordJson["eventType"] =
-        provenance::ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()];
+        provenance::ProvenanceEventRecord::ProvenanceEventTypeStr[record
+            ->getEventType()];
     recordJson["timestampMillis"] = record->getEventTime();
     recordJson["durationMillis"] = record->getEventDuration();
     recordJson["lineageStart"] = record->getlineageStartDate();
@@ -90,10 +91,10 @@ void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext
*cont
     }
     recordJson["childIds"] = childUuidJson;
     recordJson["transitUri"] = record->getTransitUri().c_str();
-    recordJson["remoteIdentifier"] =
-        record->getSourceSystemFlowFileIdentifier().c_str();
-    recordJson["alternateIdentifier"] =
-        record->getAlternateIdentifierUri().c_str();
+    recordJson["remoteIdentifier"] = record->getSourceSystemFlowFileIdentifier()
+        .c_str();
+    recordJson["alternateIdentifier"] = record->getAlternateIdentifierUri()
+        .c_str();
     recordJson["application"] = ProvenanceAppStr;
     array.append(recordJson);
   }
@@ -102,11 +103,15 @@ void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext
*cont
   report = writer.write(array);
 }
 
-void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context,
-    core::ProcessSession *session) {
+void SiteToSiteProvenanceReportingTask::onSchedule(
+    core::ProcessContext *context,
+    core::ProcessSessionFactory *sessionFactory) {
+}
+
+void SiteToSiteProvenanceReportingTask::onTrigger(
+    core::ProcessContext *context, core::ProcessSession *session) {
 
-  std::shared_ptr<Site2SiteClientProtocol> protocol_ =
-      this->obtainSite2SiteProtocol(stream_factory_, host_, port_, port_uuid_);
+  std::unique_ptr<Site2SiteClientProtocol> protocol_ = getNextProtocol(true);
 
   if (!protocol_) {
     context->yield();
@@ -116,32 +121,33 @@ void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext
*context,
   if (!protocol_->bootstrap()) {
     // bootstrap the client protocol if needeed
     context->yield();
-    std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor
-        > (context->getProcessorNode().getProcessor());
+    std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(
+        context->getProcessorNode().getProcessor());
     logger_->log_error("Site2Site bootstrap failed yield period %d peer ",
-        processor->getYieldPeriodMsec());
-    returnSite2SiteProtocol(protocol_);
+                       processor->getYieldPeriodMsec());
+    returnProtocol(std::move(protocol_));
     return;
   }
 
-  std::vector < std::shared_ptr < provenance::ProvenanceEventRecord >> records;
-  std::shared_ptr<provenance::ProvenanceRepository> repo = std::static_pointer_cast
-      < provenance::ProvenanceRepository > (context->getProvenanceRepository());
+  std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> records;
+  std::shared_ptr<provenance::ProvenanceRepository> repo =
+      std::static_pointer_cast<provenance::ProvenanceRepository>(
+          context->getProvenanceRepository());
   repo->getProvenanceRecord(records, batch_size_);
   if (records.size() <= 0) {
-    returnSite2SiteProtocol(protocol_);
+    returnProtocol(std::move(protocol_));
     return;
   }
 
   std::string jsonStr;
   this->getJsonReport(context, session, records, jsonStr);
   if (jsonStr.length() <= 0) {
-    returnSite2SiteProtocol(protocol_);
+    returnProtocol(std::move(protocol_));
     return;
   }
 
   try {
-    std::map < std::string, std::string > attributes;
+    std::map<std::string, std::string> attributes;
     protocol_->transferString(context, session, jsonStr, attributes);
   } catch (...) {
     // if transfer bytes failed, return instead of purge the provenance records
@@ -150,7 +156,7 @@ void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext
*context,
 
   // we transfer the record, purge the record from DB
   repo->purgeProvenanceRecord(records);
-  returnSite2SiteProtocol(protocol_);
+  returnProtocol(std::move(protocol_));
 }
 
 } /* namespace reporting */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/082c85a8/main/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt
index 87506fa..f7f16e7 100644
--- a/main/CMakeLists.txt
+++ b/main/CMakeLists.txt
@@ -23,7 +23,7 @@ IF(POLICY CMP0048)
   CMAKE_POLICY(SET CMP0048 OLD)
 ENDIF(POLICY CMP0048)
 
-include_directories(../include ../libminifi/include  ../libminifi/include/processors/ ../libminifi/include/core/yaml
 ../libminifi/include/core ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include
../thirdparty/leveldb-1.18/include ../thirdparty/)
+include_directories(../include ../libminifi/include  ../libminifi/include/core/yaml  ../libminifi/include/core
../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include
../thirdparty/jsoncpp/include ../thirdparty/leveldb-1.18/include ../thirdparty/)
 
 find_package(Boost REQUIRED)
 include_directories(${Boost_INCLUDE_DIRS})


Mime
View raw message