nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [12/12] nifi-minifi-cpp git commit: MINIFI-227: Provenance Report
Date Tue, 11 Apr 2017 17:38:59 GMT
MINIFI-227: Provenance Report


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/33b3759c
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/33b3759c
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/33b3759c

Branch: refs/heads/MINIFI-227-merge
Commit: 33b3759c0d65757bd315e7834c80f05f85953f45
Parents: 4a4602a
Author: Bin Qiu <benqiu2016@gmail.com>
Authored: Tue Apr 11 10:38:28 2017 -0700
Committer: Bin Qiu <benqiu2016@gmail.com>
Committed: Tue Apr 11 10:38:28 2017 -0700

----------------------------------------------------------------------
 CMakeLists.txt                                  |    4 +-
 LICENSE                                         |   53 +-
 README.md                                       |   21 +
 libminifi/CMakeLists.txt                        |    1 +
 libminifi/include/RemoteProcessorGroupPort.h    |    1 +
 libminifi/include/Site2SiteClientProtocol.h     |   13 +-
 libminifi/include/core/FlowConfiguration.h      |    1 +
 libminifi/include/core/Processor.h              |   14 +-
 libminifi/include/provenance/Provenance.h       |    1 +
 .../include/provenance/ProvenanceRepository.h   |   31 +
 .../include/provenance/ProvenanceTaskReport.h   |   91 +
 libminifi/src/FlowFileRecord.cpp                |    2 +-
 libminifi/src/RemoteProcessorGroupPort.cpp      |   81 +-
 libminifi/src/Site2SiteClientProtocol.cpp       |  406 +--
 libminifi/src/core/FlowConfiguration.cpp        |    4 +
 libminifi/src/core/Processor.cpp                |  100 +-
 libminifi/src/provenance/Provenance.cpp         |    5 +
 .../src/provenance/ProvenanceTaskReport.cpp     |  198 ++
 libminifi/test/unit/ProcessorTests.cpp          |   96 +-
 libminifi/test/unit/ProvenanceTestHelper.h      |   44 +-
 main/CMakeLists.txt                             |    6 +-
 thirdparty/jsoncpp/AUTHORS                      |    1 +
 thirdparty/jsoncpp/CMakeLists.txt               |  156 ++
 thirdparty/jsoncpp/LICENSE                      |   55 +
 thirdparty/jsoncpp/NEWS.txt                     |  175 ++
 thirdparty/jsoncpp/README.md                    |  225 ++
 thirdparty/jsoncpp/SConstruct                   |  248 ++
 thirdparty/jsoncpp/amalgamate.py                |  155 ++
 thirdparty/jsoncpp/appveyor.yml                 |   35 +
 thirdparty/jsoncpp/dev.makefile                 |   35 +
 thirdparty/jsoncpp/doc/doxyfile.in              | 2301 ++++++++++++++++
 thirdparty/jsoncpp/doc/footer.html              |    3 +
 thirdparty/jsoncpp/doc/header.html              |   24 +
 thirdparty/jsoncpp/doc/jsoncpp.dox              |  164 ++
 thirdparty/jsoncpp/doc/readme.txt               |    1 +
 thirdparty/jsoncpp/doc/roadmap.dox              |    3 +
 thirdparty/jsoncpp/doc/web_doxyfile.in          | 2301 ++++++++++++++++
 thirdparty/jsoncpp/doxybuild.py                 |  189 ++
 thirdparty/jsoncpp/include/CMakeLists.txt       |    2 +
 thirdparty/jsoncpp/include/json/allocator.h     |   98 +
 thirdparty/jsoncpp/include/json/assertions.h    |   54 +
 thirdparty/jsoncpp/include/json/autolink.h      |   25 +
 thirdparty/jsoncpp/include/json/config.h        |  184 ++
 thirdparty/jsoncpp/include/json/features.h      |   61 +
 thirdparty/jsoncpp/include/json/forwards.h      |   37 +
 thirdparty/jsoncpp/include/json/json.h          |   15 +
 thirdparty/jsoncpp/include/json/reader.h        |  408 +++
 thirdparty/jsoncpp/include/json/value.h         |  870 ++++++
 thirdparty/jsoncpp/include/json/version.h       |   20 +
 thirdparty/jsoncpp/include/json/writer.h        |  335 +++
 .../jsoncpp/makefiles/msvc2010/jsoncpp.sln      |   42 +
 .../jsoncpp/makefiles/msvc2010/jsontest.vcxproj |   96 +
 .../makefiles/msvc2010/jsontest.vcxproj.filters |   13 +
 .../jsoncpp/makefiles/msvc2010/lib_json.vcxproj |  143 +
 .../makefiles/msvc2010/lib_json.vcxproj.filters |   33 +
 .../makefiles/msvc2010/test_lib_json.vcxproj    |  109 +
 .../msvc2010/test_lib_json.vcxproj.filters      |   24 +
 thirdparty/jsoncpp/makefiles/vs71/jsoncpp.sln   |   46 +
 .../jsoncpp/makefiles/vs71/jsontest.vcproj      |  119 +
 .../jsoncpp/makefiles/vs71/lib_json.vcproj      |  205 ++
 .../jsoncpp/makefiles/vs71/test_lib_json.vcproj |  130 +
 thirdparty/jsoncpp/makerelease.py               |  390 +++
 thirdparty/jsoncpp/pkg-config/jsoncpp.pc.in     |    9 +
 thirdparty/jsoncpp/scons-tools/globtool.py      |   58 +
 thirdparty/jsoncpp/scons-tools/srcdist.py       |  183 ++
 thirdparty/jsoncpp/scons-tools/substinfile.py   |   85 +
 thirdparty/jsoncpp/scons-tools/targz.py         |   87 +
 thirdparty/jsoncpp/src/CMakeLists.txt           |    5 +
 .../jsoncpp/src/jsontestrunner/CMakeLists.txt   |   25 +
 thirdparty/jsoncpp/src/jsontestrunner/main.cpp  |  326 +++
 .../jsoncpp/src/jsontestrunner/sconscript       |    9 +
 thirdparty/jsoncpp/src/lib_json/CMakeLists.txt  |  113 +
 thirdparty/jsoncpp/src/lib_json/json_reader.cpp | 2036 ++++++++++++++
 thirdparty/jsoncpp/src/lib_json/json_tool.h     |  117 +
 thirdparty/jsoncpp/src/lib_json/json_value.cpp  | 1617 +++++++++++
 .../jsoncpp/src/lib_json/json_valueiterator.inl |  167 ++
 thirdparty/jsoncpp/src/lib_json/json_writer.cpp | 1224 +++++++++
 thirdparty/jsoncpp/src/lib_json/sconscript      |    8 +
 thirdparty/jsoncpp/src/lib_json/version.h.in    |   20 +
 .../jsoncpp/src/test_lib_json/CMakeLists.txt    |   38 +
 .../jsoncpp/src/test_lib_json/jsontest.cpp      |  457 ++++
 thirdparty/jsoncpp/src/test_lib_json/jsontest.h |  286 ++
 thirdparty/jsoncpp/src/test_lib_json/main.cpp   | 2589 ++++++++++++++++++
 thirdparty/jsoncpp/src/test_lib_json/sconscript |   10 +
 thirdparty/jsoncpp/travis.sh                    |   31 +
 thirdparty/jsoncpp/version                      |    1 +
 thirdparty/jsoncpp/version.in                   |    1 +
 87 files changed, 19910 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b84706d..2ae7332 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -81,6 +81,7 @@ find_package(UUID REQUIRED)
 file(GLOB SPD_SOURCES "include/spdlog/*")
 
 add_subdirectory(thirdparty/yaml-cpp-yaml-cpp-0.5.3)
+add_subdirectory(thirdparty/jsoncpp)
 set(CIVETWEB_ENABLE_CXX ON CACHE BOOL "Enable civet C++ library")
 add_subdirectory(thirdparty/civetweb-1.9.1)
 add_subdirectory(libminifi)
@@ -125,6 +126,7 @@ enable_testing(test)
     add_executable(tests ${LIBMINIFI_TEST_SOURCES} ${SPD_SOURCES})
     target_include_directories(tests PRIVATE BEFORE "thirdparty/catch")
     target_include_directories(tests PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
+    target_include_directories(tests PRIVATE BEFORE "thirdparty/jsoncpp/include")
     target_include_directories(tests PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS})
     target_include_directories(tests PRIVATE BEFORE "include")
     target_include_directories(tests PRIVATE BEFORE "libminifi/include/")
@@ -134,7 +136,7 @@ enable_testing(test)
     target_include_directories(tests PRIVATE BEFORE "libminifi/include/utils")
     target_include_directories(tests PRIVATE BEFORE "libminifi/include/processors")
     target_include_directories(tests PRIVATE BEFORE "libminifi/include/provenance")
-    target_link_libraries(tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp)
+    target_link_libraries(tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp jsoncpp_lib_static)
     add_test(NAME LibMinifiTests COMMAND tests)
     
     file(GLOB LIBMINIFI_TEST_EXECUTE_PROCESS "libminifi/test/TestExecuteProcess.cpp")

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index 8a0f32e..0439ead 100644
--- a/LICENSE
+++ b/LICENSE
@@ -508,4 +508,55 @@ The source is available under a 3-Clause BSD License.
 	LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 	(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
 	USE OF THIS SOFTWARE, EVEN IF NOT ADVISED OF THE POSSIBILITY OF SUCH
-	DAMAGE.
\ No newline at end of file
+	DAMAGE.
+
+This product bundles 'JsonCpp' which is available under a MIT license.
+
+The JsonCpp library's source code, including accompanying documentation,
+tests and demonstration applications, are licensed under the following
+conditions...
+
+The author (Baptiste Lepilleur) explicitly disclaims copyright in all
+jurisdictions which recognize such a disclaimer. In such jurisdictions,
+this software is released into the Public Domain.
+
+In jurisdictions which do not recognize Public Domain property (e.g. Germany as of
+2010), this software is Copyright (c) 2007-2010 by Baptiste Lepilleur, and is
+released under the terms of the MIT License (see below).
+
+In jurisdictions which recognize Public Domain property, the user of this
+software may choose to accept it either as 1) Public Domain, 2) under the
+conditions of the MIT License (see below), or 3) under the terms of dual
+Public Domain/MIT License conditions described here, as they choose.
+
+The MIT License is about as close to Public Domain as a license can get, and is
+described in clear, concise terms at:
+
+   http://en.wikipedia.org/wiki/MIT_License
+
+The full text of the MIT License follows:
+
+========================================================================
+Copyright (c) 2007-2010 Baptiste Lepilleur
+
+Permission is hereby granted, free of charge, to any person
+obtaining a copy of this software and associated documentation
+files (the "Software"), to deal in the Software without
+restriction, including without limitation the rights to use, copy,
+modify, merge, publish, distribute, sublicense, and/or sell copies
+of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+========================================================================
+(END LICENSE TEXT)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 484c97a..74d0afc 100644
--- a/README.md
+++ b/README.md
@@ -53,6 +53,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a
   * ListenSyslog
   * ExecuteProcess
   * AppendHostInfo
+  * ProvenanceTaskReport
 * Provenance events generation is supported and are persisted using levelDB.
 
 ## System Requirements
@@ -286,6 +287,26 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc
     if you do not want to enable client certificate base authorization
     nifi.security.need.ClientAuth=false
 
+### Provenance Report
+
+    Add ProvenanceTaskReport processor to config.yml
+    Port UUID of the ProvenanceTaskReport is the remote NiFi input port UUID
+ 
+    - name: ProvenanceTaskReport
+      id: 471deef6-2a6e-4a7d-912a-81cc17e3a209
+      class: org.apache.nifi.processors.standard.ProvenanceTaskReport
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 10 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+          Port: 10001
+          Host Name: localhost
+          Port UUID: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+
 ### Running
 After completing a [build](#building), the application can be run by issuing the following from :
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 4c71cc1..5419667 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -58,6 +58,7 @@ endif()
 include_directories(../include)
 include_directories(../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include)
 include_directories(../thirdparty/civetweb-1.9.1/include)
+include_directories(../thirdparty/jsoncpp/include)
 include_directories(include)
 
 file(GLOB SOURCES  "src/core/logging/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/provenance/*.cpp" "src/processors/*.cpp" "src/*.cpp")

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/include/RemoteProcessorGroupPort.h
----------------------------------------------------------------------
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index 8667519..f8aac38 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -55,6 +55,7 @@ class RemoteProcessorGroupPort : public core::Processor {
   // Supported Properties
   static core::Property hostName;
   static core::Property port;
+  static core::Property portUUID;
   // Supported Relationships
   static core::Relationship relation;
  public:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/include/Site2SiteClientProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Site2SiteClientProtocol.h b/libminifi/include/Site2SiteClientProtocol.h
index 78673d8..67fd444 100644
--- a/libminifi/include/Site2SiteClientProtocol.h
+++ b/libminifi/include/Site2SiteClientProtocol.h
@@ -376,7 +376,8 @@ class Transaction {
 class DataPacket {
  public:
   DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction,
-             std::map<std::string, std::string> attributes) {
+             std::map<std::string, std::string> attributes, std::string &payload) :
+             payload_ (payload) {
     _protocol = protocol;
     _size = 0;
     _transaction = transaction;
@@ -386,6 +387,8 @@ class DataPacket {
   uint64_t _size;
   Site2SiteClientProtocol *_protocol;
   Transaction *_transaction;
+  std::string & payload_;
+
 };
 
 // Site2SiteClientProtocol Class
@@ -532,8 +535,12 @@ class Site2SiteClientProtocol {
   void receiveFlowFiles(core::ProcessContext *context,
                         core::ProcessSession *session);
   // Transfer flow files for the process session
-  void transferFlowFiles(core::ProcessContext *context,
-                         core::ProcessSession *session);
+  void transferFlowFiles(
+      core::ProcessContext *context,
+      core::ProcessSession *session);
+  //! Transfer string for the process session
+  void transferString(core::ProcessContext *context, core::ProcessSession *session, std::string &payload,
+      std::map<std::string, std::string> attributes);
   // deleteTransaction
   void deleteTransaction(std::string transactionID);
   // Nest Callback Class for write stream

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index de8ceb4..35aa6d9 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -22,6 +22,7 @@
 #include "Connection.h"
 #include "RemoteProcessorGroupPort.h"
 #include "provenance/Provenance.h"
+#include "provenance/ProvenanceTaskReport.h"
 #include "processors/GetFile.h"
 #include "processors/PutFile.h"
 #include "processors/TailFile.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/include/core/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 2b540ec..e945fa4 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -42,6 +42,9 @@
 #include "ProcessSessionFactory.h"
 #include "Scheduling.h"
 
+#include <stack>
+#include "Site2SiteClientProtocol.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -239,12 +242,21 @@ class Processor : public Connectable, public ConfigurableComponent,
   // Trigger the Processor even if the incoming connection is empty
   std::atomic<bool> _triggerWhenEmpty;
 
- private:
+  //! obtainSite2SiteProtocol for use
+  std::shared_ptr<Site2SiteClientProtocol> obtainSite2SiteProtocol(std::string host, uint16_t sport, uuid_t portId);
+  //! returnSite2SiteProtocol after use
+  void returnSite2SiteProtocol(std::shared_ptr<Site2SiteClientProtocol> protocol);
+
+private:
 
   // Mutex for protection
   std::mutex mutex_;
   // Yield Expiration
   std::atomic<uint64_t> yield_expiration_;
+  
+  // Site2Site Protocols
+  std::stack<std::shared_ptr<Site2SiteClientProtocol>> available_protocols_;
+  std::atomic<bool> protocols_created_;
 
   // Check all incoming connections for work
   bool isWorkAvailable();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/include/provenance/Provenance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index 82754c4..2977f28 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -155,6 +155,7 @@ class ProvenanceEventRecord :
      */
     REPLAY
   };
+  static const char *ProvenanceEventTypeStr[REPLAY+1];
  public:
   // Constructor
   /*!

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/include/provenance/ProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h
index 2b71fd9..b96021c 100644
--- a/libminifi/include/provenance/ProvenanceRepository.h
+++ b/libminifi/include/provenance/ProvenanceRepository.h
@@ -113,6 +113,9 @@ class ProvenanceRepository : public core::Repository,
   // Put
   virtual bool Put(std::string key, uint8_t *buf, int bufLen) {
 
+	if (repo_full_)
+		return false;
+
     // persistent to the DB
     leveldb::Slice value((const char *) buf, bufLen);
     leveldb::Status status;
@@ -149,6 +152,34 @@ class ProvenanceRepository : public core::Repository,
   void removeEvent(ProvenanceEventRecord *event) {
     Delete(event->getEventId());
   }
+  //! get record
+  void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize)
+  {
+	std::lock_guard<std::mutex> lock(mutex_);
+	leveldb::Iterator* it = db_->NewIterator(
+				leveldb::ReadOptions());
+	for (it->SeekToFirst(); it->Valid(); it->Next()) {
+			std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
+			std::string key = it->key().ToString();
+			if (records.size() >= maxSize)
+				break;
+			if (eventRead->DeSerialize((uint8_t *) it->value().data(),
+					(int) it->value().size()))
+			{
+				records.push_back(eventRead);
+			}
+	}
+	delete it;
+  }
+  //! purge record
+  void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records)
+  {
+	std::lock_guard<std::mutex> lock(mutex_);
+	for (auto record : records)
+	{
+		Delete(record->getEventId());
+	}
+  }
   // destroy
   void destroy() {
     if (db_) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/include/provenance/ProvenanceTaskReport.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/ProvenanceTaskReport.h b/libminifi/include/provenance/ProvenanceTaskReport.h
new file mode 100644
index 0000000..932ed39
--- /dev/null
+++ b/libminifi/include/provenance/ProvenanceTaskReport.h
@@ -0,0 +1,91 @@
+/**
+ * @file ProvenanceTaskReport.h
+ * ProvenanceTaskReport class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROVENANCE_TASK_REPORT_H__
+#define __PROVENANCE_TASK_REPORT_H__
+
+#include <mutex>
+#include <memory>
+#include <stack>
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "Site2SiteClientProtocol.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace provenance {
+
+//! ProvenanceTaskReport Class
+class ProvenanceTaskReport: public core::Processor {
+public:
+	//! Constructor
+	/*!
+	 * Create a new processor
+	 */
+	ProvenanceTaskReport(std::string name, uuid_t uuid = NULL) :
+			core::Processor(name, uuid) {
+		logger_ = logging::Logger::getLogger();
+		if (uuid)
+		  uuid_copy(protocol_uuid_,uuid);
+		this->setTriggerWhenEmpty(true);
+	}
+	//! Destructor
+	virtual ~ProvenanceTaskReport() {
+
+	}
+	//! Processor Name
+	static constexpr char const* ProcessorName = "ProvenanceTaskReport";
+	//! Supported Properties
+	static core::Property hostName;
+	static core::Property port;
+	static core::Property batchSize;
+	static core::Property portUUID;
+	//! Supported Relationships
+	static core::Relationship relation;
+	static const char *ProvenanceAppStr;
+public:
+	//! Get provenance jason report
+	void getJasonReport(core::ProcessContext *context,
+	    core::ProcessSession *session, std::vector < std::shared_ptr < ProvenanceEventRecord >> &records,
+	    std::string &report);
+	//! OnTrigger method, implemented by NiFi ProvenanceTaskReport
+	virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+	//! Initialize, over write by NiFi ProvenanceTaskReport
+	virtual void initialize(void);
+
+protected:
+
+private:
+	uuid_t protocol_uuid_;
+	//! Logger
+	std::shared_ptr<logging::Logger> logger_;
+};
+
+// Provenance Task Report
+
+} /* namespace provenance */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index de682b0..31f5d58 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -265,7 +265,7 @@ bool FlowFileRecord::Serialize() {
                        uuid_str_.c_str(), outStream.getSize());
     return true;
   } else {
-    logger_->log_error("NiFi FlowFile Store event %s size %d fail",
+    logger_->log_debug("NiFi FlowFile Store event %s size %d fail",
                        uuid_str_.c_str(), outStream.getSize());
     return false;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 33f0cb2..8ee3680 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -44,33 +44,18 @@ namespace minifi {
 const std::string RemoteProcessorGroupPort::ProcessorName(
     "RemoteProcessorGroupPort");
 core::Property RemoteProcessorGroupPort::hostName("Host Name",
-                                                  "Remote Host Name.",
-                                                  "localhost");
+    "Remote Host Name.", "localhost");
 core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999");
+core::Property RemoteProcessorGroupPort::portUUID("Port UUID",
+    "Specifies remote NiFi Port UUID.", "");
 core::Relationship RemoteProcessorGroupPort::relation;
 
-std::unique_ptr<Site2SiteClientProtocol> RemoteProcessorGroupPort::getNextProtocol() {
-  std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_);
-  if (available_protocols_.empty())
-    return nullptr;
-
-  std::unique_ptr<Site2SiteClientProtocol> return_pointer = std::move(
-      available_protocols_.top());
-  available_protocols_.pop();
-  return std::move(return_pointer);
-}
-
-void RemoteProcessorGroupPort::returnProtocol(
-    std::unique_ptr<Site2SiteClientProtocol> return_protocol) {
-  std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_);
-  available_protocols_.push(std::move(return_protocol));
-}
-
 void RemoteProcessorGroupPort::initialize() {
   // Set the supported properties
   std::set<core::Property> properties;
   properties.insert(hostName);
   properties.insert(port);
+  properties.insert(portUUID);
   setSupportedProperties(properties);
   // Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -79,50 +64,42 @@ void RemoteProcessorGroupPort::initialize() {
 }
 
 void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
-                                         core::ProcessSession *session) {
-  std::string value;
-
+    core::ProcessSession *session) {
   if (!transmitting_)
     return;
 
-  std::unique_ptr<Site2SiteClientProtocol> protocol_ = getNextProtocol();
-
-  // Peer Connection
-  if (protocol_ == nullptr) {
-    protocol_ = std::unique_ptr<Site2SiteClientProtocol>(
-        new Site2SiteClientProtocol(0));
-    protocol_->setPortId(protocol_uuid_);
-    protocol_->setTimeOut(timeout_);
-
-    std::string host = "";
-    uint16_t sport = 0;
-    int64_t lvalue;
+  std::string value;
+  int64_t lvalue;
+  std::string host = "";
+  uint16_t sport = 0;
 
-    if (context->getProperty(hostName.getName(), value)) {
-      host = value;
-    }
-    if (context->getProperty(port.getName(), value)
-        && core::Property::StringToInt(value, lvalue)) {
-      sport = (uint16_t) lvalue;
-    }
-    std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
-        std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(
-            org::apache::nifi::minifi::io::StreamFactory::getInstance()
-                ->createSocket(host, sport));
+  if (context->getProperty(hostName.getName(), value)) {
+    host = value;
+  }
+  if (context->getProperty(port.getName(), value)
+      && core::Property::StringToInt(value, lvalue)) {
+    sport = (uint16_t) lvalue;
+  }
+  if (context->getProperty(portUUID.getName(), value)) {
+    uuid_parse(value.c_str(), protocol_uuid_);
+  }
 
-    std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(
-        new Site2SitePeer(std::move(str), host, sport));
+  std::shared_ptr<Site2SiteClientProtocol> protocol_ =
+      this->obtainSite2SiteProtocol(host, sport, protocol_uuid_);
 
-    protocol_->setPeer(std::move(peer_));
+  if (!protocol_) {
+    context->yield();
+    return;
   }
 
   if (!protocol_->bootstrap()) {
     // bootstrap the client protocol if needeed
     context->yield();
-    std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(
-        context->getProcessorNode().getProcessor());
+    std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor
+        > (context->getProcessorNode().getProcessor());
     logger_->log_error("Site2Site bootstrap failed yield period %d peer ",
-                       processor->getYieldPeriodMsec());
+        processor->getYieldPeriodMsec());
+    returnSite2SiteProtocol(protocol_);
     return;
   }
 
@@ -131,7 +108,7 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
   else
     protocol_->transferFlowFiles(context, session);
 
-  returnProtocol(std::move(protocol_));
+  returnSite2SiteProtocol(protocol_);
 
   return;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp
index 52a0a02..fbde8a0 100644
--- a/libminifi/src/Site2SiteClientProtocol.cpp
+++ b/libminifi/src/Site2SiteClientProtocol.cpp
@@ -106,39 +106,39 @@ bool Site2SiteClientProtocol::initiateResourceNegotiation() {
   }
   logger_->log_info("status code is %i", statusCode);
   switch (statusCode) {
-    case RESOURCE_OK:
-      logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
-      return true;
-    case DIFFERENT_RESOURCE_VERSION:
-      uint32_t serverVersion;
-      ret = peer_->read(serverVersion);
-      if (ret <= 0) {
-        // tearDown();
-        return false;
-      }
-      logger_->log_info(
-          "Site2Site Server Response asked for a different protocol version %d",
-          serverVersion);
-      for (unsigned int i = (_currentVersionIndex + 1);
-          i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
-        if (serverVersion >= _supportedVersion[i]) {
-          _currentVersion = _supportedVersion[i];
-          _currentVersionIndex = i;
-          return initiateResourceNegotiation();
-        }
-      }
-      ret = -1;
-      // tearDown();
-      return false;
-    case NEGOTIATED_ABORT:
-      logger_->log_info("Site2Site Negotiate protocol response ABORT");
-      ret = -1;
+  case RESOURCE_OK:
+    logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
+    return true;
+  case DIFFERENT_RESOURCE_VERSION:
+    uint32_t serverVersion;
+    ret = peer_->read(serverVersion);
+    if (ret <= 0) {
       // tearDown();
       return false;
-    default:
-      logger_->log_info("Negotiate protocol response unknown code %d",
-                        statusCode);
-      return true;
+    }
+    logger_->log_info(
+        "Site2Site Server Response asked for a different protocol version %d",
+        serverVersion);
+    for (unsigned int i = (_currentVersionIndex + 1);
+        i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
+      if (serverVersion >= _supportedVersion[i]) {
+        _currentVersion = _supportedVersion[i];
+        _currentVersionIndex = i;
+        return initiateResourceNegotiation();
+      }
+    }
+    ret = -1;
+    // tearDown();
+    return false;
+  case NEGOTIATED_ABORT:
+    logger_->log_info("Site2Site Negotiate protocol response ABORT");
+    ret = -1;
+    // tearDown();
+    return false;
+  default:
+    logger_->log_info("Negotiate protocol response unknown code %d",
+        statusCode);
+    return true;
   }
 
   return true;
@@ -181,38 +181,38 @@ bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() {
   }
 
   switch (statusCode) {
-    case RESOURCE_OK:
-      logger_->log_info("Site2Site Codec Negotiate version OK");
-      return true;
-    case DIFFERENT_RESOURCE_VERSION:
-      uint32_t serverVersion;
-      ret = peer_->read(serverVersion);
-      if (ret <= 0) {
-        // tearDown();
-        return false;
-      }
-      logger_->log_info(
-          "Site2Site Server Response asked for a different codec version %d",
-          serverVersion);
-      for (unsigned int i = (_currentCodecVersionIndex + 1);
-          i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
-        if (serverVersion >= _supportedCodecVersion[i]) {
-          _currentCodecVersion = _supportedCodecVersion[i];
-          _currentCodecVersionIndex = i;
-          return initiateCodecResourceNegotiation();
-        }
-      }
-      ret = -1;
-      // tearDown();
-      return false;
-    case NEGOTIATED_ABORT:
-      logger_->log_info("Site2Site Codec Negotiate response ABORT");
-      ret = -1;
+  case RESOURCE_OK:
+    logger_->log_info("Site2Site Codec Negotiate version OK");
+    return true;
+  case DIFFERENT_RESOURCE_VERSION:
+    uint32_t serverVersion;
+    ret = peer_->read(serverVersion);
+    if (ret <= 0) {
       // tearDown();
       return false;
-    default:
-      logger_->log_info("Negotiate Codec response unknown code %d", statusCode);
-      return true;
+    }
+    logger_->log_info(
+        "Site2Site Server Response asked for a different codec version %d",
+        serverVersion);
+    for (unsigned int i = (_currentCodecVersionIndex + 1);
+        i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
+      if (serverVersion >= _supportedCodecVersion[i]) {
+        _currentCodecVersion = _supportedCodecVersion[i];
+        _currentCodecVersionIndex = i;
+        return initiateCodecResourceNegotiation();
+      }
+    }
+    ret = -1;
+    // tearDown();
+    return false;
+  case NEGOTIATED_ABORT:
+    logger_->log_info("Site2Site Codec Negotiate response ABORT");
+    ret = -1;
+    // tearDown();
+    return false;
+  default:
+    logger_->log_info("Negotiate Codec response unknown code %d", statusCode);
+    return true;
   }
 
   return true;
@@ -241,7 +241,7 @@ bool Site2SiteClientProtocol::handShake() {
     return false;
   }
 
-  std::map<std::string, std::string> properties;
+  std::map < std::string, std::string > properties;
   properties[HandShakePropertyStr[GZIP]] = "false";
   properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr;
   properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(
@@ -286,7 +286,7 @@ bool Site2SiteClientProtocol::handShake() {
       return false;
     }
     logger_->log_info("Site2Site Protocol Send handshake properties %s %s",
-                      it->first.c_str(), it->second.c_str());
+        it->first.c_str(), it->second.c_str());
   }
 
   RespondCode code;
@@ -300,28 +300,28 @@ bool Site2SiteClientProtocol::handShake() {
   }
 
   switch (code) {
-    case PROPERTIES_OK:
-      logger_->log_info("Site2Site HandShake Completed");
-      _peerState = HANDSHAKED;
-      return true;
-    case PORT_NOT_IN_VALID_STATE:
-    case UNKNOWN_PORT:
-    case PORTS_DESTINATION_FULL:
-      logger_->log_error(
-          "Site2Site HandShake Failed because destination port is either invalid or full");
-      ret = -1;
-      /*
-       peer_->yield();
-       tearDown(); */
-      return false;
-    default:
-      logger_->log_info("HandShake Failed because of unknown respond code %d",
-                        code);
-      ret = -1;
-      /*
-       peer_->yield();
-       tearDown(); */
-      return false;
+  case PROPERTIES_OK:
+    logger_->log_info("Site2Site HandShake Completed");
+    _peerState = HANDSHAKED;
+    return true;
+  case PORT_NOT_IN_VALID_STATE:
+  case UNKNOWN_PORT:
+  case PORTS_DESTINATION_FULL:
+    logger_->log_error(
+        "Site2Site HandShake Failed because destination port is either invalid or full");
+    ret = -1;
+    /*
+     peer_->yield();
+     tearDown(); */
+    return false;
+  default:
+    logger_->log_info("HandShake Failed because of unknown respond code %d",
+        code);
+    ret = -1;
+    /*
+     peer_->yield();
+     tearDown(); */
+    return false;
   }
 
   return false;
@@ -369,7 +369,7 @@ int Site2SiteClientProtocol::readRequestType(RequestType &type) {
 }
 
 int Site2SiteClientProtocol::readRespond(RespondCode &code,
-                                         std::string &message) {
+    std::string &message) {
   uint8_t firstByte;
 
   int ret = peer_->read(firstByte);
@@ -408,7 +408,7 @@ int Site2SiteClientProtocol::readRespond(RespondCode &code,
 }
 
 int Site2SiteClientProtocol::writeRespond(RespondCode code,
-                                          std::string message) {
+    std::string message) {
   RespondCodeContext *resCode = this->getRespondCodeContext(code);
 
   if (resCode == NULL) {
@@ -525,31 +525,31 @@ Transaction* Site2SiteClientProtocol::createTransaction(
     org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(
         peer_.get());
     switch (code) {
-      case MORE_DATA:
-        dataAvailable = true;
-        logger_->log_info("Site2Site peer indicates that data is available");
-        transaction = new Transaction(direction, crcstream);
-        _transactionMap[transaction->getUUIDStr()] = transaction;
-        transactionID = transaction->getUUIDStr();
-        transaction->setDataAvailable(dataAvailable);
-        logger_->log_info("Site2Site create transaction %s",
-                          transaction->getUUIDStr().c_str());
-        return transaction;
-      case NO_MORE_DATA:
-        dataAvailable = false;
-        logger_->log_info("Site2Site peer indicates that no data is available");
-        transaction = new Transaction(direction, crcstream);
-        _transactionMap[transaction->getUUIDStr()] = transaction;
-        transactionID = transaction->getUUIDStr();
-        transaction->setDataAvailable(dataAvailable);
-        logger_->log_info("Site2Site create transaction %s",
-                          transaction->getUUIDStr().c_str());
-        return transaction;
-      default:
-        logger_->log_info(
-            "Site2Site got unexpected response %d when asking for data", code);
-        // tearDown();
-        return NULL;
+    case MORE_DATA:
+      dataAvailable = true;
+      logger_->log_info("Site2Site peer indicates that data is available");
+      transaction = new Transaction(direction, crcstream);
+      _transactionMap[transaction->getUUIDStr()] = transaction;
+      transactionID = transaction->getUUIDStr();
+      transaction->setDataAvailable(dataAvailable);
+      logger_->log_info("Site2Site create transaction %s",
+          transaction->getUUIDStr().c_str());
+      return transaction;
+    case NO_MORE_DATA:
+      dataAvailable = false;
+      logger_->log_info("Site2Site peer indicates that no data is available");
+      transaction = new Transaction(direction, crcstream);
+      _transactionMap[transaction->getUUIDStr()] = transaction;
+      transactionID = transaction->getUUIDStr();
+      transaction->setDataAvailable(dataAvailable);
+      logger_->log_info("Site2Site create transaction %s",
+          transaction->getUUIDStr().c_str());
+      return transaction;
+    default:
+      logger_->log_info(
+          "Site2Site got unexpected response %d when asking for data", code);
+      // tearDown();
+      return NULL;
     }
   } else {
     ret = writeRequestType(SEND_FLOWFILES);
@@ -564,14 +564,14 @@ Transaction* Site2SiteClientProtocol::createTransaction(
       _transactionMap[transaction->getUUIDStr()] = transaction;
       transactionID = transaction->getUUIDStr();
       logger_->log_info("Site2Site create transaction %s",
-                        transaction->getUUIDStr().c_str());
+          transaction->getUUIDStr().c_str());
       return transaction;
     }
   }
 }
 
 bool Site2SiteClientProtocol::receive(std::string transactionID,
-                                      DataPacket *packet, bool &eof) {
+    DataPacket *packet, bool &eof) {
   int ret;
   Transaction *transaction = NULL;
 
@@ -602,7 +602,7 @@ bool Site2SiteClientProtocol::receive(std::string transactionID,
 
   if (transaction->getDirection() != RECEIVE) {
     logger_->log_info("Site2Site transaction %s direction is wrong",
-                      transactionID.c_str());
+        transactionID.c_str());
     return false;
   }
 
@@ -687,9 +687,8 @@ bool Site2SiteClientProtocol::receive(std::string transactionID,
 }
 
 bool Site2SiteClientProtocol::send(std::string transactionID,
-                                   DataPacket *packet,
-                                   std::shared_ptr<FlowFileRecord> flowFile,
-                                   core::ProcessSession *session) {
+    DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile,
+    core::ProcessSession *session) {
   int ret;
   Transaction *transaction = NULL;
 
@@ -720,7 +719,7 @@ bool Site2SiteClientProtocol::send(std::string transactionID,
 
   if (transaction->getDirection() != SEND) {
     logger_->log_info("Site2Site transaction %s direction is wrong",
-                      transactionID.c_str());
+        transactionID.c_str());
     return false;
   }
 
@@ -751,22 +750,38 @@ bool Site2SiteClientProtocol::send(std::string transactionID,
       return false;
     }
     logger_->log_info("Site2Site transaction %s send attribute key %s value %s",
-                      transactionID.c_str(), itAttribute->first.c_str(),
-                      itAttribute->second.c_str());
+        transactionID.c_str(), itAttribute->first.c_str(),
+        itAttribute->second.c_str());
   }
 
-  uint64_t len = flowFile->getSize();
-  ret = transaction->getStream().write(len);
-  if (ret != 8) {
-    return false;
-  }
+  uint64_t len = 0;
+  if (flowFile) {
+    len = flowFile->getSize();
+    ret = transaction->getStream().write(len);
+    if (ret != 8) {
+      return false;
+    }
+    if (flowFile->getSize()) {
+      Site2SiteClientProtocol::ReadCallback callback(packet);
+      session->read(flowFile, &callback);
+      if (flowFile->getSize() != packet->_size) {
+        return false;
+      }
+    }
+  } else if (packet->payload_.length() > 0) {
+    len = packet->payload_.length();
+
+    ret = transaction->getStream().write(len);
+    if (ret != 8) {
+      return false;
+    }
 
-  if (flowFile->getSize()) {
-    Site2SiteClientProtocol::ReadCallback callback(packet);
-    session->read(flowFile, &callback);
-    if (flowFile->getSize() != packet->_size) {
+    ret = transaction->getStream().writeData(
+        reinterpret_cast<uint8_t *> (const_cast<char*> (packet->payload_.c_str())), len);
+    if (ret != len) {
       return false;
     }
+    packet->_size += len;
   }
 
   transaction->_transfers++;
@@ -780,7 +795,7 @@ bool Site2SiteClientProtocol::send(std::string transactionID,
 }
 
 void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
-                                               core::ProcessSession *session) {
+    core::ProcessSession *session) {
   uint64_t bytes = 0;
   int transfers = 0;
   Transaction *transaction = NULL;
@@ -793,7 +808,7 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
     context->yield();
     tearDown();
     throw Exception(SITE2SITE_EXCEPTION,
-                    "Can not establish handshake with peer");
+        "Can not establish handshake with peer");
     return;
   }
 
@@ -810,9 +825,10 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
 
   try {
     while (true) {
-      std::map<std::string, std::string> empty;
+      std::map < std::string, std::string > empty;
       uint64_t startTime = getTimeMillis();
-      DataPacket packet(this, transaction, empty);
+      std::string payload;
+      DataPacket packet(this, transaction, empty, payload);
       bool eof = false;
 
       if (!receive(transactionID, &packet, eof)) {
@@ -823,8 +839,8 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
         // transaction done
         break;
       }
-      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
-          FlowFileRecord>(session->create());
+      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast
+          < FlowFileRecord > (session->create());
 
       if (!flowFile) {
         throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed");
@@ -853,8 +869,7 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
       std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host="
           + peer_->getHostName();
       session->getProvenanceReporter()->receive(flowFile, transitUri,
-                                                sourceIdentifier, details,
-                                                endTime - startTime);
+          sourceIdentifier, details, endTime - startTime);
       session->transfer(flowFile, relation);
       // receive the transfer for the flow record
       bytes += packet._size;
@@ -941,7 +956,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) {
     int64_t crcValue = transaction->getCRC();
     std::string crc = std::to_string(crcValue);
     logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s",
-                      transaction->getCRC(), transactionID.c_str());
+        transaction->getCRC(), transactionID.c_str());
     ret = writeRespond(CONFIRM_TRANSACTION, crc);
     if (ret <= 0)
       return false;
@@ -953,24 +968,24 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) {
 
     if (code == CONFIRM_TRANSACTION) {
       logger_->log_info("Site2Site transaction %s peer confirm transaction",
-                        transactionID.c_str());
+          transactionID.c_str());
       transaction->_state = TRANSACTION_CONFIRMED;
       return true;
     } else if (code == BAD_CHECKSUM) {
       logger_->log_info("Site2Site transaction %s peer indicate bad checksum",
-                        transactionID.c_str());
+          transactionID.c_str());
       /*
        transaction->_state = TRANSACTION_CONFIRMED;
        return true; */
       return false;
     } else {
       logger_->log_info("Site2Site transaction %s peer unknown respond code %d",
-                        transactionID.c_str(), code);
+          transactionID.c_str(), code);
       return false;
     }
   } else {
     logger_->log_info("Site2Site Send FINISH TRANSACTION for transaction %s",
-                      transactionID.c_str());
+        transactionID.c_str());
     ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION");
     if (ret <= 0)
       return false;
@@ -990,7 +1005,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) {
         std::string crc = std::to_string(crcValue);
         if (message == crc) {
           logger_->log_info("Site2Site transaction %s CRC matched",
-                            transactionID.c_str());
+              transactionID.c_str());
           ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
           if (ret <= 0)
             return false;
@@ -998,7 +1013,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) {
           return true;
         } else {
           logger_->log_info("Site2Site transaction %s CRC not matched %s",
-                            transactionID.c_str(), crc.c_str());
+              transactionID.c_str(), crc.c_str());
           ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM");
           /*
            ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
@@ -1016,7 +1031,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) {
       return true;
     } else {
       logger_->log_info("Site2Site transaction %s peer unknown respond code %d",
-                        transactionID.c_str(), code);
+          transactionID.c_str(), code);
       return false;
     }
     return false;
@@ -1065,7 +1080,7 @@ void Site2SiteClientProtocol::deleteTransaction(std::string transactionID) {
   }
 
   logger_->log_info("Site2Site delete transaction %s",
-                    transaction->getUUIDStr().c_str());
+      transaction->getUUIDStr().c_str());
   delete transaction;
   _transactionMap.erase(transactionID);
 }
@@ -1119,7 +1134,7 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) {
       return true;
     } else {
       logger_->log_info("Site2Site transaction %s send finished",
-                        transactionID.c_str());
+          transactionID.c_str());
       ret = this->writeRespond(TRANSACTION_FINISHED, "Finished");
       if (ret <= 0) {
         return false;
@@ -1140,21 +1155,21 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) {
 
     if (code == TRANSACTION_FINISHED) {
       logger_->log_info("Site2Site transaction %s peer finished transaction",
-                        transactionID.c_str());
+          transactionID.c_str());
       transaction->_state = TRANSACTION_COMPLETED;
       return true;
     } else {
       logger_->log_info("Site2Site transaction %s peer unknown respond code %d",
-                        transactionID.c_str(), code);
+          transactionID.c_str(), code);
       return false;
     }
   }
 }
 
 void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
-                                                core::ProcessSession *session) {
-  std::shared_ptr<FlowFileRecord> flow =
-      std::static_pointer_cast<FlowFileRecord>(session->get());
+    core::ProcessSession *session) {
+  std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast
+      < FlowFileRecord > (session->get());
 
   Transaction *transaction = NULL;
 
@@ -1169,7 +1184,7 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
     context->yield();
     tearDown();
     throw Exception(SITE2SITE_EXCEPTION,
-                    "Can not establish handshake with peer");
+        "Can not establish handshake with peer");
     return;
   }
 
@@ -1190,27 +1205,28 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
   try {
     while (continueTransaction) {
       uint64_t startTime = getTimeMillis();
-      DataPacket packet(this, transaction, flow->getAttributes());
+      std::string payload;
+      DataPacket packet(this, transaction, flow->getAttributes(), payload);
 
       if (!send(transactionID, &packet, flow, session)) {
         throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
         return;
       }
       logger_->log_info("Site2Site transaction %s send flow record %s",
-                        transactionID.c_str(), flow->getUUIDStr().c_str());
+          transactionID.c_str(), flow->getUUIDStr().c_str());
       uint64_t endTime = getTimeMillis();
       std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
       std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host="
           + peer_->getHostName();
       session->getProvenanceReporter()->send(flow, transitUri, details,
-                                             endTime - startTime, false);
+          endTime - startTime, false);
       session->remove(flow);
 
       uint64_t transferNanos = getTimeNano() - startSendingNanos;
       if (transferNanos > _batchSendNanos)
         break;
 
-      flow = std::static_pointer_cast<FlowFileRecord>(session->get());
+      flow = std::static_pointer_cast < FlowFileRecord > (session->get());
 
       if (!flow) {
         continueTransaction = false;
@@ -1250,6 +1266,80 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
   return;
 }
 
+void Site2SiteClientProtocol::transferString(core::ProcessContext *context,
+    core::ProcessSession *session, std::string &payload,
+    std::map<std::string, std::string> attributes) {
+  Transaction *transaction = NULL;
+
+  if (payload.length() <= 0)
+    return;
+
+  if (_peerState != READY) {
+    bootstrap();
+  }
+
+  if (_peerState != READY) {
+    context->yield();
+    tearDown();
+    throw Exception(SITE2SITE_EXCEPTION,
+        "Can not establish handshake with peer");
+    return;
+  }
+
+  // Create the transaction
+  std::string transactionID;
+  transaction = createTransaction(transactionID, SEND);
+
+  if (transaction == NULL) {
+    context->yield();
+    tearDown();
+    throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
+    return;
+  }
+
+  try {
+    DataPacket packet(this, transaction, attributes, payload);
+
+    if (!send(transactionID, &packet, nullptr, session)) {
+      throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
+      return;
+    }
+    logger_->log_info("Site2Site transaction %s send bytes length %d",
+        transactionID.c_str(), payload.length());
+
+    if (!confirm(transactionID)) {
+      throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
+      return;
+    }
+    if (!complete(transactionID)) {
+      throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
+      return;
+    }
+    logger_->log_info(
+        "Site2Site transaction %s successfully send flow record %d, content bytes %d",
+        transactionID.c_str(), transaction->_transfers, transaction->_bytes);
+  } catch (std::exception &exception) {
+    if (transaction)
+      deleteTransaction(transactionID);
+    context->yield();
+    tearDown();
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    if (transaction)
+      deleteTransaction(transactionID);
+    context->yield();
+    tearDown();
+    logger_->log_debug(
+        "Caught Exception during Site2SiteClientProtocol::transferBytes");
+    throw;
+  }
+
+  deleteTransaction(transactionID);
+
+  return;
+}
+
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index 68aaf5c..33f9b43 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -71,6 +71,10 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(
       == org::apache::nifi::minifi::processors::AppendHostInfo::ProcessorName) {
     processor = std::make_shared<
         org::apache::nifi::minifi::processors::AppendHostInfo>(name, uuid);
+  } else if (name
+      == org::apache::nifi::minifi::provenance::ProvenanceTaskReport::ProcessorName) {
+    processor = std::make_shared<
+        org::apache::nifi::minifi::provenance::ProvenanceTaskReport>(name, uuid);
   } else {
     logger_->log_error("No Processor defined for %s", name.c_str());
     return nullptr;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 9a0898a..9d44d39 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -29,11 +29,13 @@
 #include <thread>
 #include <memory>
 #include <functional>
+#include <utility>
 #include "Connection.h"
 #include "core/Connectable.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
+#include "../include/io/StreamFactory.h"
 
 namespace org {
 namespace apache {
@@ -41,15 +43,15 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
-Processor::Processor(std::string name, uuid_t uuid)
-    : Connectable(name, uuid),
-      ConfigurableComponent(logging::Logger::getLogger()) {
+Processor::Processor(std::string name, uuid_t uuid) :
+    Connectable(name, uuid), ConfigurableComponent(logging::Logger::getLogger()) {
   has_work_.store(false);
   // Setup the default values
   state_ = DISABLED;
   strategy_ = TIMER_DRIVEN;
   loss_tolerant_ = false;
   _triggerWhenEmpty = false;
+  protocols_created_ = false;
   scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
   run_durantion_nano_ = 0;
   yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
@@ -60,7 +62,7 @@ Processor::Processor(std::string name, uuid_t uuid)
   incoming_connections_Iter = this->_incomingConnections.begin();
   logger_ = logging::Logger::getLogger();
   logger_->log_info("Processor %s created UUID %s", name_.c_str(),
-                    uuidStr_.c_str());
+      uuidStr_.c_str());
 }
 
 bool Processor::isRunning() {
@@ -76,12 +78,12 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
 
   if (isRunning()) {
     logger_->log_info("Can not add connection while the process %s is running",
-                      name_.c_str());
+        name_.c_str());
     return false;
   }
-  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(
-      conn);
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection
+      > (conn);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   uuid_t srcUUID;
   uuid_t destUUID;
@@ -114,7 +116,8 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
     auto &&it = _outGoingConnections.find(relationship);
     if (it != _outGoingConnections.end()) {
       // We already has connection for this relationship
-      std::set<std::shared_ptr<Connectable>> existedConnection = it->second;
+      std::set < std::shared_ptr < Connectable >> existedConnection =
+          it->second;
       if (existedConnection.find(connection) == existedConnection.end()) {
         // We do not have the same connection for this relationship yet
         existedConnection.insert(connection);
@@ -127,7 +130,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
       }
     } else {
       // We do not have any outgoing connection for this relationship yet
-      std::set<std::shared_ptr<Connectable>> newConnection;
+      std::set < std::shared_ptr < Connectable >> newConnection;
       newConnection.insert(connection);
       connection->setSource(shared_from_this());
       _outGoingConnections[relationship] = newConnection;
@@ -149,13 +152,13 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
     return;
   }
 
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   uuid_t srcUUID;
   uuid_t destUUID;
 
-  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(
-      conn);
+  std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection
+      > (conn);
 
   connection->getSourceUUID(srcUUID);
   connection->getDestinationUUID(destUUID);
@@ -191,15 +194,63 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
   }
 }
 
+std::shared_ptr<Site2SiteClientProtocol> Processor::obtainSite2SiteProtocol(
+    std::string host, uint16_t sport, uuid_t portId) {
+  std::lock_guard < std::mutex > lock(mutex_);
+
+  if (!protocols_created_) {
+    for (int i = 0; i < this->max_concurrent_tasks_; i++) {
+      // create the protocol pool based on max threads allowed
+      std::shared_ptr<Site2SiteClientProtocol> protocol = std::make_shared<Site2SiteClientProtocol>(nullptr);
+      protocols_created_ = true;
+      protocol->setPortId(portId);
+      std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
+          std::unique_ptr < org::apache::nifi::minifi::io::DataStream
+              > (org::apache::nifi::minifi::io::StreamFactory::getInstance()->createSocket(
+                  host, sport));
+      std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer
+          > (new Site2SitePeer(std::move(str), host, sport));
+      protocol->setPeer(std::move(peer_));
+      available_protocols_.push(protocol);
+    }
+  }
+  if (!available_protocols_.empty()) {
+    std::shared_ptr<Site2SiteClientProtocol> return_pointer =
+        available_protocols_.top();
+    available_protocols_.pop();
+    return return_pointer;
+  } else {
+    // create the protocol on demand if we exceed the pool
+    std::shared_ptr<Site2SiteClientProtocol> protocol = std::make_shared<Site2SiteClientProtocol>(nullptr);
+    protocol->setPortId(portId);
+    std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
+        std::unique_ptr < org::apache::nifi::minifi::io::DataStream
+            > (org::apache::nifi::minifi::io::StreamFactory::getInstance()->createSocket(
+                host, sport));
+    std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer
+        > (new Site2SitePeer(std::move(str), host, sport));
+    protocol->setPeer(std::move(peer_));
+    return protocol;
+  }
+}
+
+void Processor::returnSite2SiteProtocol(
+    std::shared_ptr<Site2SiteClientProtocol> protocol) {
+  std::lock_guard < std::mutex > lock(mutex_);
+  if (protocol && available_protocols_.size() < max_concurrent_tasks_) {
+    available_protocols_.push(protocol);
+  }
+}
+
 bool Processor::flowFilesQueued() {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   if (_incomingConnections.size() == 0)
     return false;
 
   for (auto &&conn : _incomingConnections) {
-    std::shared_ptr<Connection> connection =
-        std::static_pointer_cast<Connection>(conn);
+    std::shared_ptr<Connection> connection = std::static_pointer_cast
+        < Connection > (conn);
     if (connection->getQueueSize() > 0)
       return true;
   }
@@ -208,14 +259,15 @@ bool Processor::flowFilesQueued() {
 }
 
 bool Processor::flowFilesOutGoingFull() {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   for (auto &&connection : _outGoingConnections) {
     // We already has connection for this relationship
-    std::set<std::shared_ptr<Connectable>> existedConnection = connection.second;
+    std::set < std::shared_ptr < Connectable >> existedConnection =
+        connection.second;
     for (const auto conn : existedConnection) {
-      std::shared_ptr<Connection> connection = std::static_pointer_cast<
-          Connection>(conn);
+      std::shared_ptr < Connection > connection = std::static_pointer_cast
+          < Connection > (conn);
       if (connection->isFull())
         return true;
     }
@@ -225,7 +277,7 @@ bool Processor::flowFilesOutGoingFull() {
 }
 
 void Processor::onTrigger(ProcessContext *context,
-                          ProcessSessionFactory *sessionFactory) {
+    ProcessSessionFactory *sessionFactory) {
   auto session = sessionFactory->createSession();
 
   try {
@@ -249,8 +301,8 @@ bool Processor::isWorkAvailable() {
 
   try {
     for (const auto &conn : _incomingConnections) {
-      std::shared_ptr<Connection> connection = std::static_pointer_cast<
-          Connection>(conn);
+      std::shared_ptr<Connection> connection = std::static_pointer_cast
+          < Connection > (conn);
       if (connection->getQueueSize() > 0) {
         hasWork = true;
         break;
@@ -259,7 +311,7 @@ bool Processor::isWorkAvailable() {
   } catch (...) {
     logger_->log_error(
         "Caught an exception while checking if work is available;"
-        " unless it was positively determined that work is available, assuming NO work is available!");
+            " unless it was positively determined that work is available, assuming NO work is available!");
   }
 
   return hasWork;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/src/provenance/Provenance.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index b1db9a8..083d0b2 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -34,6 +34,11 @@ namespace nifi {
 namespace minifi {
 namespace provenance {
 
+const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY+1] =
+{ "CREATE", "RECEIVE", "FETCH", "SEND", "DOWNLOAD", "DROP", "EXPIRE", "FORK",
+                "JOIN", "CLONE", "CONTENT_MODIFIED", "ATTRIBUTES_MODIFIED", "ROUTE",
+                "ADDINFO", "REPLAY"};
+
 // DeSerialize
 bool ProvenanceEventRecord::DeSerialize(
     const std::shared_ptr<core::Repository> &repo, std::string key) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/src/provenance/ProvenanceTaskReport.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/ProvenanceTaskReport.cpp b/libminifi/src/provenance/ProvenanceTaskReport.cpp
new file mode 100644
index 0000000..dfe81e8
--- /dev/null
+++ b/libminifi/src/provenance/ProvenanceTaskReport.cpp
@@ -0,0 +1,198 @@
+/**
+ * @file ProvenanceTaskReport.cpp
+ * ProvenanceTaskReport class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <string>
+#include <memory>
+#include <sstream>
+#include <iostream>
+
+#include "provenance/ProvenanceTaskReport.h"
+#include "../include/io/StreamFactory.h"
+#include "io/ClientSocket.h"
+#include "utils/TimeUtil.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "provenance/Provenance.h"
+#include "FlowController.h"
+
+#include "json/json.h"
+#include "json/writer.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace provenance {
+
+core::Property ProvenanceTaskReport::hostName("Host Name", "Remote Host Name.",
+    "localhost");
+core::Property ProvenanceTaskReport::port("Port", "Remote Port", "9999");
+core::Property ProvenanceTaskReport::batchSize("Batch Size",
+    "Specifies how many records to send in a single batch, at most.", "100");
+core::Property ProvenanceTaskReport::portUUID("Port UUID",
+    "Specifies remote NiFi Port UUID.", "");
+core::Relationship ProvenanceTaskReport::relation;
+const char *ProvenanceTaskReport::ProvenanceAppStr = "MiNiFi Flow";
+
+void ProvenanceTaskReport::initialize() {
+  //! Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(hostName);
+  properties.insert(port);
+  properties.insert(batchSize);
+  properties.insert(portUUID);
+  setSupportedProperties(properties);
+  //! Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(relation);
+  setSupportedRelationships(relationships);
+}
+
+void ProvenanceTaskReport::getJasonReport(core::ProcessContext *context,
+    core::ProcessSession *session,
+    std::vector<std::shared_ptr<ProvenanceEventRecord>> &records,
+    std::string &report) {
+
+  Json::Value array;
+  for (auto record : records) {
+    Json::Value recordJson;
+    Json::Value updatedAttributesJson;
+    Json::Value parentUuidJson;
+    Json::Value childUuidJson;
+    recordJson["eventId"] = record->getEventId().c_str();
+    recordJson["eventType"] =
+        ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()];
+    recordJson["timestampMillis"] = record->getEventTime();
+    recordJson["durationMillis"] = record->getEventDuration();
+    recordJson["lineageStart"] = record->getlineageStartDate();
+    recordJson["details"] = record->getDetails().c_str();
+    recordJson["componentId"] = record->getComponentId().c_str();
+    recordJson["componentType"] = record->getComponentType().c_str();
+    recordJson["entityId"] = record->getFlowFileUuid().c_str();
+    recordJson["entityType"] = "org.apache.nifi.flowfile.FlowFile";
+    recordJson["entitySize"] = record->getFileSize();
+    recordJson["entityOffset"] = record->getFileOffset();
+
+    for (auto attr : record->getAttributes()) {
+      updatedAttributesJson[attr.first] = attr.second;
+    }
+    recordJson["updatedAttributes"] = updatedAttributesJson;
+
+    for (auto parentUUID : record->getParentUuids()) {
+      parentUuidJson.append(parentUUID.c_str());
+    }
+    recordJson["parentIds"] = parentUuidJson;
+
+    for (auto childUUID : record->getChildrenUuids()) {
+      childUuidJson.append(childUUID.c_str());
+    }
+    recordJson["childIds"] = childUuidJson;
+    recordJson["transitUri"] = record->getTransitUri().c_str();
+    recordJson["remoteIdentifier"] =
+        record->getSourceSystemFlowFileIdentifier().c_str();
+    recordJson["alternateIdentifier"] =
+        record->getAlternateIdentifierUri().c_str();
+    recordJson["application"] = ProvenanceAppStr;
+    array.append(recordJson);
+  }
+
+  Json::StyledWriter writer;
+  report = writer.write(array);
+}
+
+void ProvenanceTaskReport::onTrigger(core::ProcessContext *context,
+    core::ProcessSession *session) {
+  std::string value;
+  int64_t lvalue;
+  std::string host = "";
+  uint16_t sport = 0;
+
+  if (context->getProperty(hostName.getName(), value)) {
+    host = value;
+  }
+  if (context->getProperty(port.getName(), value)
+      && core::Property::StringToInt(value, lvalue)) {
+    sport = (uint16_t) lvalue;
+  }
+  if (context->getProperty(portUUID.getName(), value)) {
+    uuid_parse(value.c_str(), protocol_uuid_);
+  }
+
+  std::shared_ptr<Site2SiteClientProtocol> protocol_ =
+      this->obtainSite2SiteProtocol(host, sport, protocol_uuid_);
+
+  if (!protocol_) {
+    context->yield();
+    return;
+  }
+
+  if (!protocol_->bootstrap()) {
+    // bootstrap the client protocol if needeed
+    context->yield();
+    std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor
+        > (context->getProcessorNode().getProcessor());
+    logger_->log_error("Site2Site bootstrap failed yield period %d peer ",
+        processor->getYieldPeriodMsec());
+    returnSite2SiteProtocol(protocol_);
+    return;
+  }
+
+  int64_t batch = 100;
+  if (context->getProperty(batchSize.getName(), value)
+      && core::Property::StringToInt(value, lvalue)) {
+    batch = lvalue;
+  }
+  std::vector < std::shared_ptr < ProvenanceEventRecord >> records;
+  std::shared_ptr<ProvenanceRepository> repo = std::static_pointer_cast
+      < ProvenanceRepository > (context->getProvenanceRepository());
+  repo->getProvenanceRecord(records, batch);
+  if (records.size() <= 0) {
+    returnSite2SiteProtocol(protocol_);
+    return;
+  }
+
+  std::string jsonStr;
+  this->getJasonReport(context, session, records, jsonStr);
+  if (jsonStr.length() <= 0) {
+    returnSite2SiteProtocol(protocol_);
+    return;
+  }
+
+  try {
+    std::map < std::string, std::string > attributes;
+    protocol_->transferString(context, session, jsonStr, attributes);
+  } catch (...) {
+    // if transfer bytes failed, return instead of purge the provenance records
+    return;
+  }
+
+  // we transfer the record, purge the record from DB
+  repo->purgeProvenanceRecord(records);
+  returnSite2SiteProtocol(protocol_);
+}
+
+} /* namespace provenance */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp
index 87f190c..995aa19 100644
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ b/libminifi/test/unit/ProcessorTests.cpp
@@ -41,16 +41,20 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
 
   testController.enableDebug();
 
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+  std::shared_ptr<core::Processor> processor = std::make_shared
+      < org::apache::nifi::minifi::processors::GetFile > ("getfileCreate2");
+
+  std::shared_ptr<core::Processor> processorReport = std::make_shared
+      < org::apache::nifi::minifi::provenance::ProvenanceTaskReport
+      > ("provenanceTaskReport");
 
   std::shared_ptr<core::Repository> test_repo =
       std::make_shared<TestRepository>();
 
-  std::shared_ptr<TestRepository> repo =
-      std::static_pointer_cast<TestRepository>(test_repo);
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
-      TestFlowController>(test_repo, test_repo);
+  std::shared_ptr<TestRepository> repo = std::static_pointer_cast
+      < TestRepository > (test_repo);
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared
+      < TestFlowController > (test_repo, test_repo);
 
   char format[] = "/tmp/gt.XXXXXX";
   char *dir = testController.createTempDirectory(format);
@@ -58,8 +62,8 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
   uuid_t processoruuid;
   REQUIRE(true == processor->getUUID(processoruuid));
 
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(test_repo, "getfileCreate2Connection");
+  std::shared_ptr<minifi::Connection> connection = std::make_shared
+      < minifi::Connection > (test_repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
   // link the connections so that we can test results at the end for this
@@ -77,7 +81,7 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
   core::ProcessContext context(node, test_repo);
   core::ProcessSessionFactory factory(&context);
   context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
-                      dir);
+      dir);
   core::ProcessSession session(&context);
 
   processor->onSchedule(&context, &factory);
@@ -122,7 +126,7 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
   for (auto entry : repo->getRepoMap()) {
     provenance::ProvenanceEventRecord newRecord;
     newRecord.DeSerialize((uint8_t*) entry.second.data(),
-                          entry.second.length());
+        entry.second.length());
 
     bool found = false;
     for (auto provRec : records) {
@@ -141,6 +145,24 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
 
   }
 
+  core::ProcessorNode nodeReport(processorReport);
+  core::ProcessContext contextReport(nodeReport, test_repo);
+  core::ProcessSessionFactory factoryReport(&contextReport);
+  contextReport.setProperty(org::apache::nifi::minifi::provenance::ProvenanceTaskReport::batchSize,
+      "1");
+  core::ProcessSession sessionReport(&contextReport);
+  processorReport->onSchedule(&contextReport, &factoryReport);
+  std::shared_ptr<org::apache::nifi::minifi::provenance::ProvenanceTaskReport> taskReport = std::static_pointer_cast
+        < org::apache::nifi::minifi::provenance::ProvenanceTaskReport > (processorReport);
+  std::vector < std::shared_ptr < provenance::ProvenanceEventRecord >> recordsReport;
+  processorReport->incrementActiveTasks();
+  processorReport->setScheduledState(core::ScheduledState::RUNNING);
+  std::string jsonStr;
+  repo->getProvenanceRecord(recordsReport, 1);
+  taskReport->getJasonReport(&contextReport, &sessionReport, recordsReport, jsonStr);
+  REQUIRE(recordsReport.size() == 1);
+  REQUIRE(taskReport->getName() == "provenanceTaskReport");
+  REQUIRE(jsonStr.find("\"componentType\" : \"getfileCreate2\"") != std::string::npos);
 }
 
 TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
@@ -149,16 +171,16 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
 
   testController.enableDebug();
 
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+  std::shared_ptr<core::Processor> processor = std::make_shared
+      < org::apache::nifi::minifi::processors::GetFile > ("getfileCreate2");
 
   std::shared_ptr<core::Repository> test_repo =
       std::make_shared<TestRepository>();
 
-  std::shared_ptr<TestRepository> repo =
-      std::static_pointer_cast<TestRepository>(test_repo);
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
-      TestFlowController>(test_repo, test_repo);
+  std::shared_ptr<TestRepository> repo = std::static_pointer_cast
+      < TestRepository > (test_repo);
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared
+      < TestFlowController > (test_repo, test_repo);
 
   char format[] = "/tmp/gt.XXXXXX";
   char *dir = testController.createTempDirectory(format);
@@ -166,8 +188,8 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
   uuid_t processoruuid;
   REQUIRE(true == processor->getUUID(processoruuid));
 
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(test_repo, "getfileCreate2Connection");
+  std::shared_ptr<minifi::Connection> connection = std::make_shared
+      < minifi::Connection > (test_repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
   // link the connections so that we can test results at the end for this
@@ -184,7 +206,7 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
   core::ProcessContext context(node, test_repo);
   core::ProcessSessionFactory factory(&context);
   context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
-                      dir);
+      dir);
   // replicate 10 threads
   processor->setScheduledState(core::ScheduledState::RUNNING);
   processor->onSchedule(&context, &factory);
@@ -229,9 +251,9 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
     session.commit();
     std::shared_ptr<core::FlowFile> ffr = session.get();
 
-    REQUIRE((repo->getRepoMap().size()%2) == 0);
-    REQUIRE(repo->getRepoMap().size() == (prev+2));
-    prev+=2;
+    REQUIRE((repo->getRepoMap().size() % 2) == 0);
+    REQUIRE(repo->getRepoMap().size() == (prev + 2));
+    prev += 2;
 
   }
 
@@ -239,10 +261,10 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
 
 TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
   std::ostringstream oss;
-  std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
-      logging::BaseLogger>(
-      new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
-                                                                         0));
+  std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr
+      < logging::BaseLogger
+      > (new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
+          0));
   std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
   logger->updateLogger(std::move(outputLogger));
 
@@ -252,11 +274,11 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
 
   std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
 
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+  std::shared_ptr<core::Processor> processor = std::make_shared
+      < org::apache::nifi::minifi::processors::GetFile > ("getfileCreate2");
 
-  std::shared_ptr<core::Processor> logAttribute = std::make_shared<
-      org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+  std::shared_ptr<core::Processor> logAttribute = std::make_shared
+      < org::apache::nifi::minifi::processors::LogAttribute > ("logattribute");
 
   char format[] = "/tmp/gt.XXXXXX";
   char *dir = testController.createTempDirectory(format);
@@ -267,12 +289,12 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
   uuid_t logattribute_uuid;
   REQUIRE(true == logAttribute->getUUID(logattribute_uuid));
 
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(repo, "getfileCreate2Connection");
+  std::shared_ptr<minifi::Connection> connection = std::make_shared
+      < minifi::Connection > (repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
-  std::shared_ptr<minifi::Connection> connection2 = std::make_shared<
-      minifi::Connection>(repo, "logattribute");
+  std::shared_ptr<minifi::Connection> connection2 = std::make_shared
+      < minifi::Connection > (repo, "logattribute");
   connection2->setRelationship(core::Relationship("success", "description"));
 
   // link the connections so that we can test results at the end for this
@@ -298,7 +320,7 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
   core::ProcessContext context(node, repo);
   core::ProcessContext context2(node2, repo);
   context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
-                      dir);
+      dir);
   core::ProcessSession session(&context);
   core::ProcessSession session2(&context2);
 
@@ -357,8 +379,8 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
       log_attribute_output.find("key:path value:" + std::string(dir))
           != std::string::npos);
 
-  outputLogger = std::unique_ptr<logging::BaseLogger>(
-      new org::apache::nifi::minifi::core::logging::NullAppender());
+  outputLogger = std::unique_ptr < logging::BaseLogger
+      > (new org::apache::nifi::minifi::core::logging::NullAppender());
   logger->updateLogger(std::move(outputLogger));
 
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/libminifi/test/unit/ProvenanceTestHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index 80d8642..67039bf 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -25,10 +25,10 @@
 /**
  * Test repository
  */
-class TestRepository : public core::Repository {
- public:
-  TestRepository()
-      : Repository("repo_name", "./dir", 1000, 100, 0) {
+class TestRepository: public core::Repository {
+public:
+  TestRepository() :
+      Repository("repo_name", "./dir", 1000, 100, 0) {
   }
   // initialize
   bool initialize() {
@@ -42,8 +42,8 @@ class TestRepository : public core::Repository {
 
   bool Put(std::string key, uint8_t *buf, int bufLen) {
     repositoryResults.insert(
-        std::pair<std::string, std::string>(
-            key, std::string((const char*) buf, bufLen)));
+        std::pair<std::string, std::string>(key,
+            std::string((const char*) buf, bufLen)));
     return true;
   }
   // Delete
@@ -66,19 +66,35 @@ class TestRepository : public core::Repository {
     return repositoryResults;
   }
 
+  void getProvenanceRecord(
+      std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records,
+      int maxSize) {
+    for (auto entry : repositoryResults) {
+      if (records.size() >= maxSize)
+        break;
+      std::shared_ptr<provenance::ProvenanceEventRecord> eventRead =
+          std::make_shared<provenance::ProvenanceEventRecord>();
+
+      if (eventRead->DeSerialize((uint8_t*) entry.second.data(),
+          entry.second.length())) {
+        records.push_back(eventRead);
+      }
+    }
+  }
+
   void run() {
     // do nothing
   }
- protected:
+protected:
   std::map<std::string, std::string> repositoryResults;
 };
 
-class TestFlowController : public minifi::FlowController {
+class TestFlowController: public minifi::FlowController {
 
- public:
+public:
   TestFlowController(std::shared_ptr<core::Repository> repo,
-                     std::shared_ptr<core::Repository> flow_file_repo)
-      : minifi::FlowController(repo, flow_file_repo, nullptr, "",true) {
+      std::shared_ptr<core::Repository> flow_file_repo) :
+      minifi::FlowController(repo, flow_file_repo, nullptr, "", true) {
   }
   ~TestFlowController() {
 
@@ -112,7 +128,7 @@ class TestFlowController : public minifi::FlowController {
   }
 
   std::shared_ptr<core::Processor> createProcessor(std::string name,
-                                                   uuid_t uuid) {
+      uuid_t uuid) {
     return 0;
   }
 
@@ -125,10 +141,10 @@ class TestFlowController : public minifi::FlowController {
   }
 
   std::shared_ptr<minifi::Connection> createConnection(std::string name,
-                                                       uuid_t uuid) {
+      uuid_t uuid) {
     return 0;
   }
- protected:
+protected:
   void initializePaths(const std::string &adjustedFilename) {
   }
 };

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/main/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt
index f7bd6e3..baf4254 100644
--- a/main/CMakeLists.txt
+++ b/main/CMakeLists.txt
@@ -23,7 +23,7 @@ IF(POLICY CMP0048)
   CMAKE_POLICY(SET CMP0048 OLD)
 ENDIF(POLICY CMP0048)
 
-include_directories(../include ../libminifi/include  ../libminifi/include/core/yaml  ../libminifi/include/core ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/leveldb-1.18/include ../thirdparty/)
+include_directories(../include ../libminifi/include  ../libminifi/include/core/yaml  ../libminifi/include/core ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/jsoncpp/include ../thirdparty/leveldb-1.18/include ../thirdparty/)
 
 find_package(Boost REQUIRED)
 include_directories(${Boost_INCLUDE_DIRS})
@@ -43,8 +43,8 @@ find_package(UUID REQUIRED)
 find_package(OpenSSL REQUIRED)
 include_directories(${OPENSSL_INCLUDE_DIR})
 
-# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, and leveldb
-target_link_libraries(minifiexe minifi yaml-cpp c-library civetweb-cpp ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES})
+# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, jsoncpp and leveldb
+target_link_libraries(minifiexe minifi yaml-cpp c-library civetweb-cpp jsoncpp_lib_static ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES})
 set_target_properties(minifiexe
         PROPERTIES OUTPUT_NAME minifi)
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/33b3759c/thirdparty/jsoncpp/AUTHORS
----------------------------------------------------------------------
diff --git a/thirdparty/jsoncpp/AUTHORS b/thirdparty/jsoncpp/AUTHORS
new file mode 100644
index 0000000..c0fbbee
--- /dev/null
+++ b/thirdparty/jsoncpp/AUTHORS
@@ -0,0 +1 @@
+Baptiste Lepilleur <blep@users.sourceforge.net>


Mime
View raw message