nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremyd...@apache.org
Subject [50/51] [partial] nifi-minifi-cpp git commit: MINIFI-372: Replace leveldb with RocksDB
Date Mon, 09 Oct 2017 16:25:30 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index 4f4bed0..e76802d 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -31,13 +31,11 @@ ENDMACRO()
 
 function(createTests testName)
    message ("-- Adding test: ${testName}")
-    target_include_directories(${testName} PRIVATE BEFORE ${UUID_INCLUDE_DIRS})
     target_include_directories(${testName} PRIVATE BEFORE "thirdparty/catch")
     target_include_directories(${testName} PRIVATE BEFORE "thirdparty/spdlog-20170710/include")
     target_include_directories(${testName} PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
     target_include_directories(${testName} PRIVATE BEFORE "thirdparty/jsoncpp/include")
     target_include_directories(${testName} PRIVATE BEFORE "thirdparty/civetweb-1.9.1/include")
-    target_include_directories(${testName} PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS})
     target_include_directories(${testName} PRIVATE BEFORE "include")
     target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/")
     target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/c2/protocols")
@@ -52,8 +50,8 @@ function(createTests testName)
     target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/utils")
     target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/processors")
     target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/provenance")
-    target_link_libraries(${testName} ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB})
-     if (HTTP-CURL)
+    target_link_libraries(${testName} ${CMAKE_THREAD_LIBS_INIT} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB})
+     if (CURL_FOUND)
         target_include_directories(${testName} PRIVATE BEFORE ${CURL_INCLUDE_DIRS})
         target_include_directories(${testName} PRIVATE BEFORE "extensions/http-curl/")
         target_include_directories(${testName} PRIVATE BEFORE "extensions/http-curl/client/")
@@ -61,7 +59,7 @@ function(createTests testName)
         target_include_directories(${testName} PRIVATE BEFORE "extensions/http-curl/protocols/")
         target_link_libraries(${testName} ${CURL_LIBRARIES} )
         if (APPLE)    
-		    	target_link_libraries (${testName} -Wl,-all_load ${HTTP-CURL})
+		    target_link_libraries (${testName} -Wl,-all_load ${HTTP-CURL})
 		else ()
 			target_link_libraries (${testName} -Wl,--whole-archive ${HTTP-CURL} -Wl,--no-whole-archive)    
 		endif ()
@@ -77,6 +75,7 @@ SET(TEST_RESOURCES ${TEST_DIR}/resources)
 GETSOURCEFILES(UNIT_TESTS "${TEST_DIR}/unit/")
 GETSOURCEFILES(INTEGRATION_TESTS "${TEST_DIR}/integration/")
 GETSOURCEFILES(CURL_INTEGRATION_TESTS "${TEST_DIR}/curl-tests/")
+GETSOURCEFILES(ROCKSDB_INTEGRATION_TESTS "${TEST_DIR}/rocksdb-tests/")
 
 SET(UNIT_TEST_COUNT 0)
 FOREACH(testfile ${UNIT_TESTS})
@@ -93,11 +92,28 @@ FOREACH(testfile ${INTEGRATION_TESTS})
 	get_filename_component(testfilename "${testfile}" NAME_WE)
 	add_executable("${testfilename}" "${TEST_DIR}/integration/${testfile}" ${SPD_SOURCES} "${TEST_DIR}/TestBase.cpp")
 	createTests("${testfilename}")
-	#message("Adding ${testfilename} from ${testfile}")
 	MATH(EXPR INT_TEST_COUNT "${INT_TEST_COUNT}+1")
 ENDFOREACH()
 message("-- Finished building ${INT_TEST_COUNT} integration test file(s)...")
 
+if (ROCKSDB-REPOS)
+SET(ROCKSDB_TEST_COUNT 0)
+FOREACH(testfile ${ROCKSDB_INTEGRATION_TESTS})
+	get_filename_component(testfilename "${testfile}" NAME_WE)
+	add_executable("${testfilename}" "${TEST_DIR}/rocksdb-tests/${testfile}" ${SPD_SOURCES} "${TEST_DIR}/TestBase.cpp")
+	target_include_directories(${testfilename} PRIVATE BEFORE "extensions/rocksdb-repos/")
+    target_include_directories(${testfilename} PRIVATE BEFORE "thirdparty/rocksdb/include")
+    if (APPLE)    
+	    	target_link_libraries (${testfilename} -Wl,-all_load ${ROCKSDB-REPOS})
+	else ()
+			target_link_libraries (${testfilename} -Wl,--whole-archive ${ROCKSDB-REPOS} -Wl,--no-whole-archive)    
+	endif ()
+	createTests("${testfilename}")
+	MATH(EXPR ROCKSDB_TEST_COUNT "${ROCKSDB_TEST_COUNT}+1")
+ENDFOREACH()
+message("-- Finished building ${ROCKSDB_TEST_COUNT} RocksDB related test file(s)...")
+endif(ROCKSDB-REPOS)
+
 if (HTTP-CURL)
 
 SET(CURL_INT_TEST_COUNT 0)
@@ -124,8 +140,6 @@ add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${
 
 add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
 
-add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
-
 add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
 
 add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8082/nifi-api/controller")

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/cmake/FindLeveldb.cmake
----------------------------------------------------------------------
diff --git a/cmake/FindLeveldb.cmake b/cmake/FindLeveldb.cmake
deleted file mode 100644
index a6d94c0..0000000
--- a/cmake/FindLeveldb.cmake
+++ /dev/null
@@ -1,50 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-# Find module for Leveldb library and includes
-#  LEVELDB_FOUND - if system found LEVELDB library
-#  LEVELDB_INCLUDE_DIRS - The LEVELDB include directories
-#  LEVELDB_LIBRARIES - The libraries needed to use LEVELDB
-#  LEVELDB_DEFINITIONS - Compiler switches required for using LEVELDB
-
-# For OS X do not attempt to use the OS X application frameworks or bundles.
-set (CMAKE_FIND_FRAMEWORK NEVER)
-set (CMAKE_FIND_APPBUNDLE NEVER)
-
-find_path(LEVELDB_INCLUDE_DIR
-    NAMES leveldb/db.h
-    PATHS /usr/local/include /usr/include
-    DOC "LevelDB include header"
-)
-
-find_library(LEVELDB_LIBRARY 
-    NAMES libleveldb.dylib libleveldb.so
-    PATHS /usr/local/lib /usr/lib/x86_64-linux-gnu
-    DOC "LevelDB library"
-)
-
-include(FindPackageHandleStandardArgs)
-find_package_handle_standard_args(LEVELDB DEFAULT_MSG LEVELDB_INCLUDE_DIR LEVELDB_LIBRARY)
-
-if (LEVELDB_FOUND)
-    set(LEVELDB_LIBRARIES ${LEVELDB_LIBRARY} )
-    set(LEVELDB_INCLUDE_DIRS ${LEVELDB_INCLUDE_DIR} )
-    set(LEVELDB_DEFINITIONS )
-endif()
-
-mark_as_advanced(LEVELDB_ROOT_DIR LEVELDB_INCLUDE_DIR LEVELDB_LIBRARY)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/cmake/FindRocksDB.cmake
----------------------------------------------------------------------
diff --git a/cmake/FindRocksDB.cmake b/cmake/FindRocksDB.cmake
new file mode 100644
index 0000000..db9c2d1
--- /dev/null
+++ b/cmake/FindRocksDB.cmake
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#  ROCKSDB_FOUND               System has RocksDB library/headers.
+#  ROCKSDB_LIBRARIES           The RocksDB library.
+#  ROCKSDB_INCLUDE_DIR        The location of RocksDB headers.
+
+find_path(ROCKSDB_ROOT_DIR
+    NAMES include/rocksdb/db.h
+)
+
+find_library(ROCKSDB_LIBRARIES
+    NAMES rocksdb
+    HINTS ${ROCKSDB_ROOT_DIR}/lib
+)
+
+find_path(ROCKSDB_INCLUDE_DIR
+    NAMES rocksdb/db.h
+    HINTS ${ROCKSDB_ROOT_DIR}/include
+)
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(RocksDB DEFAULT_MSG
+    ROCKSDB_LIBRARIES
+    ROCKSDB_INCLUDE_DIR
+)
+
+mark_as_advanced(
+    ROCKSDB_ROOT_DIR
+    ROCKSDB_LIBRARIES
+    ROCKSDB_INCLUDE_DIR
+)
+
+if(ROCKSDB_INCLUDE_DIR AND ROCKSDB_LIBRARIES)
+  set(ROCKSDB_FOUND "YES")
+  message(STATUS "Found RocksDB...${ROCKSDB_LIBRARIES}")
+endif()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 7688439..641f0e2 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -17,7 +17,7 @@
 #
 
 # First stage: the build environment
-# Edge required for leveldb
+# Edge required for rocksdb
 FROM alpine:edge AS builder
 MAINTAINER Apache NiFi <dev@nifi.apache.org>
 
@@ -27,7 +27,7 @@ ARG MINIFI_VERSION
 ARG MINIFI_SOURCE_CODE
 
 # Install the system dependencies needed for a build
-# Add testing repo for leveldb
+# Add testing repo for rocksdb
 RUN echo 'http://dl-cdn.alpinelinux.org/alpine/edge/testing' >> /etc/apk/repositories
 RUN apk --update --no-cache upgrade && apk --update --no-cache add gcc \
 	g++ \
@@ -38,7 +38,7 @@ RUN apk --update --no-cache upgrade && apk --update --no-cache add gcc \
 	boost-dev \
 	vim \
 	util-linux-dev \
-	leveldb-dev \
+	rocksdb-dev \
 	curl-dev \
 	cmake \
 	git \
@@ -70,7 +70,7 @@ RUN cd $MINIFI_BASE_DIR \
 	&& tar -xzvf $MINIFI_BASE_DIR/build/nifi-minifi-cpp-$MINIFI_VERSION-bin.tar.gz -C $MINIFI_BASE_DIR
 
 # Second stage: the runtime image
-# Edge required for leveldb
+# Edge required for rocksdb
 FROM alpine:edge
 
 ARG UID
@@ -78,12 +78,12 @@ ARG GID
 ARG MINIFI_VERSION
 ARG MINIFI_SOURCE_CODE
 
-# Add testing repo for leveldb
+# Add testing repo for rocksdb
 RUN echo 'http://dl-cdn.alpinelinux.org/alpine/edge/testing' >> /etc/apk/repositories
 
 RUN apk --update --no-cache upgrade && apk add --update --no-cache \
 	util-linux \
-	leveldb \
+	rocksdb \
 	curl \
 	unzip \
 	gpsd \

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/http-curl/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/http-curl/CMakeLists.txt b/extensions/http-curl/CMakeLists.txt
index cb91683..b8b0629 100644
--- a/extensions/http-curl/CMakeLists.txt
+++ b/extensions/http-curl/CMakeLists.txt
@@ -24,7 +24,7 @@ find_package(CURL REQUIRED)
 set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
 set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
 
-include_directories(../../libminifi/include ../../libminifi/include/c2  ../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics  ../../libminifi/include/core/yaml  ../../libminifi/include/core  ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/leveldb-1.18/include ../../thirdparty/)
+include_directories(../../libminifi/include ../../libminifi/include/c2  ../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics  ../../libminifi/include/core/yaml  ../../libminifi/include/core  ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include  ../../thirdparty/)
 
 find_package(Boost REQUIRED)
 include_directories(${Boost_INCLUDE_DIRS})

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/CMakeLists.txt b/extensions/rocksdb-repos/CMakeLists.txt
new file mode 100644
index 0000000..82df6db
--- /dev/null
+++ b/extensions/rocksdb-repos/CMakeLists.txt
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+find_package(CURL REQUIRED)
+
+set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
+set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
+
+include_directories(../../libminifi/include ../../libminifi/include/c2  ../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics  ../../libminifi/include/core/yaml  ../../libminifi/include/core  ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/rocksdb/include ../../thirdparty/)
+
+find_package(Boost REQUIRED)
+find_package(RocksDB)
+
+SET_PACKAGE_INFO("HTTP-CURL" "libcURL implementation code that supports " )
+
+include_directories(${Boost_INCLUDE_DIRS})
+
+file(GLOB SOURCES  "*.cpp")
+
+add_library(minifi-rocksdb-repos STATIC ${SOURCES})
+set_property(TARGET minifi-rocksdb-repos PROPERTY POSITION_INDEPENDENT_CODE ON)
+if(THREADS_HAVE_PTHREAD_ARG)
+  target_compile_options(PUBLIC minifi-rocksdb-repos "-pthread")
+endif()
+if(CMAKE_THREAD_LIBS_INIT)
+  target_link_libraries(minifi-rocksdb-repos "${CMAKE_THREAD_LIBS_INIT}")
+endif()
+
+if (CURL_FOUND)
+        include_directories(${CURL_INCLUDE_DIRS})
+        target_link_libraries (minifi-rocksdb-repos ${CURL_LIBRARIES})
+endif(CURL_FOUND)
+
+# Include UUID
+find_package(UUID REQUIRED)
+target_link_libraries(minifi-rocksdb-repos ${LIBMINIFI} ${UUID_LIBRARIES} ${JSONCPP_LIB})
+add_dependencies(minifi-rocksdb-repos jsoncpp_project)
+find_package(OpenSSL REQUIRED)
+include_directories(${OPENSSL_INCLUDE_DIR})
+target_link_libraries(minifi-rocksdb-repos ${CMAKE_DL_LIBS} )
+if (ROCKSDB_FOUND)
+	target_link_libraries(minifi-rocksdb-repos ${ROCKSDB_LIBRARIES} )
+else()
+	target_link_libraries(minifi-rocksdb-repos rocksdb )
+endif()
+find_package(ZLIB REQUIRED)
+include_directories(${ZLIB_INCLUDE_DIRS})
+target_link_libraries (minifi-rocksdb-repos ${ZLIB_LIBRARIES})
+if (WIN32)
+    set_target_properties(minifi-rocksdb-repos PROPERTIES
+        LINK_FLAGS "/WHOLEARCHIVE"
+    )
+elseif (APPLE)
+    set_target_properties(minifi-rocksdb-repos PROPERTIES
+        LINK_FLAGS "-Wl,-all_load"
+    )
+else ()
+    set_target_properties(minifi-rocksdb-repos PROPERTIES
+        LINK_FLAGS "-Wl,--whole-archive"
+    )
+endif ()
+
+
+SET (ROCKSDB-REPOS minifi-rocksdb-repos PARENT_SCOPE)
+
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/DatabaseContentRepository.cpp
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
new file mode 100644
index 0000000..50f007f
--- /dev/null
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -0,0 +1,112 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DatabaseContentRepository.h"
+#include <memory>
+#include <string>
+#include "RocksDbStream.h"
+#include "rocksdb/merge_operator.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configure> &configuration) {
+  std::string value;
+  if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, value)) {
+    directory_ = value;
+  } else {
+    directory_ = "dbcontentrepository";
+  }
+  rocksdb::Options options;
+  options.create_if_missing = true;
+  options.use_direct_io_for_flush_and_compaction = true;
+  options.use_direct_reads = true;
+  options.merge_operator = std::make_shared<StringAppender>();
+  options.error_if_exists = false;
+  options.max_successive_merges = 0;
+  rocksdb::Status status = rocksdb::DB::Open(options, directory_.c_str(), &db_);
+  if (status.ok()) {
+    logger_->log_debug("NiFi Content DB Repository database open %s success", directory_.c_str());
+    is_valid_ = true;
+  } else {
+    logger_->log_error("NiFi Content DB Repository database open %s fail", directory_.c_str());
+    is_valid_ = false;
+  }
+  return is_valid_;
+}
+void DatabaseContentRepository::stop() {
+  if (db_) {
+    db_->FlushWAL(true);
+    delete db_;
+    db_ = nullptr;
+  }
+}
+
+std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+  // the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
+  // we can simply return a nullptr, which is also valid from the API when this stream is not valid.
+  if (nullptr == claim || !is_valid_ || !db_)
+    return nullptr;
+  return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), db_, true);
+}
+
+std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+  // the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
+  // we can simply return a nullptr, which is also valid from the API when this stream is not valid.
+  if (nullptr == claim || !is_valid_ || !db_)
+    return nullptr;
+  return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), db_, false);
+}
+
+bool DatabaseContentRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+  std::string value;
+  rocksdb::Status status;
+  status = db_->Get(rocksdb::ReadOptions(), streamId->getContentFullPath(), &value);
+  if (status.ok()) {
+    logger_->log_debug("%s exists", streamId->getContentFullPath());
+    return true;
+  } else {
+    logger_->log_debug("%s does not exist", streamId->getContentFullPath());
+    return false;
+  }
+}
+
+bool DatabaseContentRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+  if (nullptr == claim || !is_valid_ || !db_)
+    return false;
+  rocksdb::Status status;
+  status = db_->Delete(rocksdb::WriteOptions(), claim->getContentFullPath());
+  if (status.ok()) {
+    logger_->log_debug("Deleted %s", claim->getContentFullPath());
+    return true;
+  } else {
+    logger_->log_debug("Attempted, but could not delete %s", claim->getContentFullPath());
+    return false;
+  }
+}
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/DatabaseContentRepository.h
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
new file mode 100644
index 0000000..e43ff35
--- /dev/null
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -0,0 +1,132 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_DatabaseContentRepository_H_
+#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_DatabaseContentRepository_H_
+
+#include "rocksdb/db.h"
+#include "rocksdb/merge_operator.h"
+#include "core/Core.h"
+#include "core/Connectable.h"
+#include "../ContentRepository.h"
+#include "properties/Configure.h"
+#include "core/logging/LoggerConfiguration.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+class StringAppender : public rocksdb::AssociativeMergeOperator {
+ public:
+  // Constructor: specify delimiter
+  explicit StringAppender() {
+
+  }
+
+  virtual bool Merge(const rocksdb::Slice& key, const rocksdb::Slice* existing_value, const rocksdb::Slice& value, std::string* new_value, rocksdb::Logger* logger) const {
+    // Clear the *new_value for writing.
+    assert(new_value);
+    new_value->clear();
+
+    if (!existing_value) {
+      // No existing_value. Set *new_value = value
+      new_value->assign(value.data(), value.size());
+    } else {
+      new_value->reserve(existing_value->size() + value.size());
+      new_value->assign(existing_value->data(), existing_value->size());
+      new_value->append(value.data(), value.size());
+    }
+
+    return true;
+  }
+
+  virtual const char* Name() const {
+    return "StringAppender";
+  }
+
+ private:
+
+};
+
+/**
+ * DatabaseContentRepository is a content repository that stores data onto the local file system.
+ */
+class DatabaseContentRepository : public core::ContentRepository, public core::Connectable {
+ public:
+
+  DatabaseContentRepository(std::string name = getClassName<DatabaseContentRepository>(), uuid_t uuid = 0)
+      : core::Connectable(name, uuid),
+        logger_(logging::LoggerFactory<DatabaseContentRepository>::getLogger()),
+        is_valid_(false),
+        db_(nullptr) {
+  }
+  virtual ~DatabaseContentRepository() {
+    stop();
+  }
+
+  virtual bool initialize(const std::shared_ptr<minifi::Configure> &configuration);
+
+  virtual void stop();
+
+  virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<minifi::ResourceClaim> &claim);
+
+  virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<minifi::ResourceClaim> &claim);
+
+  virtual bool close(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+    return remove(claim);
+  }
+
+  virtual bool remove(const std::shared_ptr<minifi::ResourceClaim> &claim);
+
+  virtual bool exists(const std::shared_ptr<minifi::ResourceClaim> &streamId);
+
+  virtual void yield() {
+
+  }
+
+  /**
+   * Determines if we are connected and operating
+   */
+  virtual bool isRunning() {
+    return true;
+  }
+
+  /**
+   * Determines if work is available by this connectable
+   * @return boolean if work is available.
+   */
+  virtual bool isWorkAvailable() {
+    return true;
+  }
+
+ private:
+  bool is_valid_;
+  std::string directory_;
+  rocksdb::DB* db_;
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_DatabaseContentRepository_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
new file mode 100644
index 0000000..df2be6e
--- /dev/null
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -0,0 +1,140 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FlowFileRepository.h"
+#include "rocksdb/write_batch.h"
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+void FlowFileRepository::flush() {
+  rocksdb::WriteBatch batch;
+  std::string key;
+  std::string value;
+  rocksdb::ReadOptions options;
+
+  std::vector<std::shared_ptr<FlowFileRecord>> purgeList;
+
+  uint64_t decrement_total = 0;
+  while (keys_to_delete.size_approx() > 0) {
+    if (keys_to_delete.try_dequeue(key)) {
+      db_->Get(options, key, &value);
+      decrement_total += value.size();
+      std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
+      if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(value.data()), value.size())) {
+        purgeList.push_back(eventRead);
+      }
+      logger_->log_info("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath());
+      batch.Delete(key);
+    }
+  }
+  if (db_->Write(rocksdb::WriteOptions(), &batch).ok()) {
+    logger_->log_info("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load());
+    if (decrement_total > repo_size_.load()) {
+      repo_size_ = 0;
+    } else {
+      repo_size_ -= decrement_total;
+    }
+  }
+
+  if (nullptr != content_repo_) {
+    for (const auto &ffr : purgeList) {
+      auto claim = ffr->getResourceClaim();
+      if (claim != nullptr) {
+        content_repo_->removeIfOrphaned(claim);
+      }
+    }
+  }
+}
+
+void FlowFileRepository::run() {
+  // threshold for purge
+  uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4;
+
+  while (running_) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
+    uint64_t curTime = getTimeMillis();
+
+    flush();
+
+    uint64_t size = getRepoSize();
+
+    if (size > max_partition_bytes_)
+      repo_full_ = true;
+    else
+      repo_full_ = false;
+  }
+}
+
+void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) {
+  content_repo_ = content_repo;
+  std::vector<std::pair<std::string, uint64_t>> purgeList;
+  rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
+
+  repo_size_ = 0;
+  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+    std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
+    std::string key = it->key().ToString();
+    repo_size_ += it->value().size();
+    if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
+      logger_->log_info("Found connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
+      auto search = connectionMap.find(eventRead->getConnectionUuid());
+      if (search != connectionMap.end()) {
+        // we find the connection for the persistent flowfile, create the flowfile and enqueue that
+        std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
+        eventRead->setStoredToRepository(true);
+        search->second->put(eventRead);
+      } else {
+        logger_->log_info("Could not find connectinon for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
+        if (eventRead->getContentFullPath().length() > 0) {
+          if (nullptr != eventRead->getResourceClaim()) {
+            content_repo_->remove(eventRead->getResourceClaim());
+          }
+        }
+        purgeList.push_back(std::make_pair(key, it->value().size()));
+      }
+    } else {
+      purgeList.push_back(std::make_pair(key, it->value().size()));
+    }
+  }
+
+  delete it;
+  for (auto eventId : purgeList) {
+    logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.first.c_str());
+    if (Delete(eventId.first)) {
+      repo_size_ -= eventId.second;
+    }
+  }
+
+  return;
+}
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/FlowFileRepository.h
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h
new file mode 100644
index 0000000..f473415
--- /dev/null
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -0,0 +1,168 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_
+#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_
+
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "core/Repository.h"
+#include "core/Core.h"
+#include "Connection.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "concurrentqueue.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository"
+#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M
+#define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute
+#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2000) // 2000 msec
+
+/**
+ * Flow File repository
+ * Design: Extends Repository and implements the run function, using rocksdb as the primary substrate.
+ */
+class FlowFileRepository : public core::Repository, public std::enable_shared_from_this<FlowFileRepository> {
+ public:
+  // Constructor
+
+  FlowFileRepository(std::string name, uuid_t uuid)
+      : FlowFileRepository(name){
+
+  }
+
+  FlowFileRepository(const std::string repo_name = "", std::string directory = FLOWFILE_REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                     int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD)
+      : core::SerializableComponent(repo_name,0), Repository(repo_name.length() > 0 ? repo_name : core::getClassName<FlowFileRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod),
+        logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()),
+        content_repo_(nullptr) {
+    db_ = NULL;
+  }
+
+  // Destructor
+  ~FlowFileRepository() {
+    if (db_)
+      delete db_;
+  }
+
+  virtual void flush();
+
+  // initialize
+  virtual bool initialize(const std::shared_ptr<Configure> &configure) {
+    std::string value;
+
+    if (configure->get(Configure::nifi_flowfile_repository_directory_default, value)) {
+      directory_ = value;
+    }
+    logger_->log_info("NiFi FlowFile Repository Directory %s", directory_.c_str());
+    if (configure->get(Configure::nifi_flowfile_repository_max_storage_size, value)) {
+      Property::StringToInt(value, max_partition_bytes_);
+    }
+    logger_->log_info("NiFi FlowFile Max Partition Bytes %d", max_partition_bytes_);
+    if (configure->get(Configure::nifi_flowfile_repository_max_storage_time, value)) {
+      TimeUnit unit;
+      if (Property::StringToTime(value, max_partition_millis_, unit) && Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_)) {
+      }
+    }
+    logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", max_partition_millis_);
+    rocksdb::Options options;
+    options.create_if_missing = true;
+    options.use_direct_io_for_flush_and_compaction = true;
+    options.use_direct_reads = true;
+    rocksdb::Status status = rocksdb::DB::Open(options, directory_.c_str(), &db_);
+    if (status.ok()) {
+      logger_->log_info("NiFi FlowFile Repository database open %s success", directory_.c_str());
+    } else {
+      logger_->log_error("NiFi FlowFile Repository database open %s fail", directory_.c_str());
+      return false;
+    }
+    return true;
+  }
+
+  virtual void run();
+
+  virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
+    // persistent to the DB
+    rocksdb::Slice value((const char *) buf, bufLen);
+    rocksdb::Status status;
+    repo_size_ += bufLen;
+    status = db_->Put(rocksdb::WriteOptions(), key, value);
+    if (status.ok())
+      return true;
+    else
+      return false;
+  }
+  /**
+   * 
+   * Deletes the key
+   * @return status of the delete operation
+   */
+  virtual bool Delete(std::string key) {
+    keys_to_delete.enqueue(key);
+    return true;
+  }
+  /**
+   * Sets the value from the provided key
+   * @return status of the get operation.
+   */
+  virtual bool Get(const std::string &key, std::string &value) {
+    if (db_ == nullptr)
+      return false;
+    rocksdb::Status status;
+    status = db_->Get(rocksdb::ReadOptions(), key, &value);
+    if (status.ok())
+      return true;
+    else
+      return false;
+  }
+
+  virtual void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo);
+
+  void start() {
+    if (this->purge_period_ <= 0) {
+      return;
+    }
+    if (running_) {
+      return;
+    }
+    running_ = true;
+    thread_ = std::thread(&FlowFileRepository::run, shared_from_this());
+    logger_->log_info("%s Repository Monitor Thread Start", getName());
+  }
+
+ private:
+  moodycamel::ConcurrentQueue<std::string> keys_to_delete;
+  std::shared_ptr<core::ContentRepository> content_repo_;
+  rocksdb::DB* db_;
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/ProvenanceRepository.cpp
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp
new file mode 100644
index 0000000..4540a4a
--- /dev/null
+++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp
@@ -0,0 +1,93 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ProvenanceRepository.h"
+#include "rocksdb/write_batch.h"
+#include <string>
+#include <vector>
+#include "rocksdb/options.h"
+#include "provenance/Provenance.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace provenance {
+
+void ProvenanceRepository::flush() {
+  rocksdb::WriteBatch batch;
+  std::string key;
+  std::string value;
+  rocksdb::ReadOptions options;
+  uint64_t decrement_total = 0;
+  while (keys_to_delete.size_approx() > 0) {
+    if (keys_to_delete.try_dequeue(key)) {
+      db_->Get(options, key, &value);
+      decrement_total += value.size();
+      batch.Delete(key);
+      logger_->log_info("Removing %s", key);
+    }
+  }
+  if (db_->Write(rocksdb::WriteOptions(), &batch).ok()) {
+    logger_->log_info("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load());
+    if (decrement_total > repo_size_.load()) {
+      repo_size_ = 0;
+    } else {
+      repo_size_ -= decrement_total;
+    }
+  }
+}
+
+void ProvenanceRepository::run() {
+  while (running_) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
+    uint64_t curTime = getTimeMillis();
+    // threshold for purge
+    uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4;
+
+    uint64_t size = getRepoSize();
+
+    if (size >= purgeThreshold) {
+      rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
+      for (it->SeekToFirst(); it->Valid(); it->Next()) {
+        ProvenanceEventRecord eventRead;
+        std::string key = it->key().ToString();
+        uint64_t eventTime = eventRead.getEventTime(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size());
+        if (eventTime > 0) {
+          if ((curTime - eventTime) > max_partition_millis_)
+            Delete(key);
+        } else {
+          logger_->log_debug("NiFi Provenance retrieve event %s fail", key.c_str());
+          Delete(key);
+        }
+      }
+      delete it;
+    }
+    flush();
+    size = getRepoSize();
+    if (size > max_partition_bytes_)
+      repo_full_ = true;
+    else
+      repo_full_ = false;
+  }
+}
+} /* namespace provenance */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/ProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h
new file mode 100644
index 0000000..67072eb
--- /dev/null
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_
+#define LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_
+
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "core/Repository.h"
+#include "core/Core.h"
+#include "provenance/Provenance.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "concurrentqueue.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace provenance {
+
+#define PROVENANCE_DIRECTORY "./provenance_repository"
+#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024) // 10M
+#define MAX_PROVENANCE_ENTRY_LIFE_TIME (60000) // 1 minute
+#define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec
+
+class ProvenanceRepository : public core::Repository, public std::enable_shared_from_this<ProvenanceRepository> {
+ public:
+
+  ProvenanceRepository(std::string name, uuid_t uuid)
+      : ProvenanceRepository(name){
+
+  }
+  // Constructor
+  /*!
+   * Create a new provenance repository
+   */
+  ProvenanceRepository(const std::string repo_name = "", std::string directory = PROVENANCE_DIRECTORY, int64_t maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
+  MAX_PROVENANCE_STORAGE_SIZE,
+                       uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD)
+      : core::SerializableComponent(repo_name, 0),
+        Repository(repo_name.length() > 0 ? repo_name : core::getClassName<ProvenanceRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod),
+        logger_(logging::LoggerFactory<ProvenanceRepository>::getLogger()) {
+    db_ = NULL;
+  }
+
+  // Destructor
+  virtual ~ProvenanceRepository() {
+    if (db_)
+      delete db_;
+  }
+
+  virtual void flush();
+
+  void start() {
+    if (this->purge_period_ <= 0)
+      return;
+    if (running_)
+      return;
+    running_ = true;
+    thread_ = std::thread(&ProvenanceRepository::run, shared_from_this());
+    logger_->log_info("%s Repository Monitor Thread Start", name_);
+  }
+
+  // initialize
+  virtual bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &config) {
+    std::string value;
+    if (config->get(Configure::nifi_provenance_repository_directory_default, value)) {
+      directory_ = value;
+    }
+    logger_->log_info("NiFi Provenance Repository Directory %s", directory_.c_str());
+    if (config->get(Configure::nifi_provenance_repository_max_storage_size, value)) {
+      core::Property::StringToInt(value, max_partition_bytes_);
+    }
+    logger_->log_info("NiFi Provenance Max Partition Bytes %d", max_partition_bytes_);
+    if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) {
+      core::TimeUnit unit;
+      if (core::Property::StringToTime(value, max_partition_millis_, unit) && core::Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_)) {
+      }
+    }
+    logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", max_partition_millis_);
+    rocksdb::Options options;
+    options.create_if_missing = true;
+    options.use_direct_io_for_flush_and_compaction = true;
+    options.use_direct_reads = true;
+    rocksdb::Status status = rocksdb::DB::Open(options, directory_.c_str(), &db_);
+    if (status.ok()) {
+      logger_->log_info("NiFi Provenance Repository database open %s success", directory_.c_str());
+    } else {
+      logger_->log_error("NiFi Provenance Repository database open %s fail", directory_.c_str());
+      return false;
+    }
+
+    return true;
+  }
+  // Put
+  virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
+
+    if (repo_full_) {
+      return false;
+    }
+
+    // persist to the DB
+    rocksdb::Slice value((const char *) buf, bufLen);
+    rocksdb::Status status;
+    status = db_->Put(rocksdb::WriteOptions(), key, value);
+    if (status.ok())
+      return true;
+    else
+      return false;
+  }
+  // Delete
+  virtual bool Delete(std::string key) {
+    keys_to_delete.enqueue(key);
+    return true;
+  }
+  // Get
+  virtual bool Get(const std::string &key, std::string &value) {
+    rocksdb::Status status;
+    status = db_->Get(rocksdb::ReadOptions(), key, &value);
+    if (status.ok())
+      return true;
+    else
+      return false;
+  }
+
+  // Remove event
+  void removeEvent(ProvenanceEventRecord *event) {
+    Delete(event->getEventId());
+  }
+
+  virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) {
+    return Put(key, buffer, bufferSize);
+  }
+
+  virtual bool get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size) {
+    rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
+    for (it->SeekToFirst(); it->Valid(); it->Next()) {
+      std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
+      std::string key = it->key().ToString();
+      if (store.size() >= max_size)
+        break;
+      if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) {
+        store.push_back(std::dynamic_pointer_cast<core::CoreComponent>(eventRead));
+      }
+    }
+    delete it;
+    return true;
+  }
+
+  virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
+    rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
+    size_t requested_batch = max_size;
+    max_size = 0;
+    for (it->SeekToFirst(); it->Valid(); it->Next()) {
+
+      if (max_size >= requested_batch)
+        break;
+      std::shared_ptr<core::SerializableComponent> eventRead = lambda();
+      std::string key = it->key().ToString();
+      if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) {
+        max_size++;
+        records.push_back(eventRead);
+      }
+
+    }
+    delete it;
+
+    if (max_size > 0) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+  //! get record
+  void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize) {
+    rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
+    for (it->SeekToFirst(); it->Valid(); it->Next()) {
+      std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
+      std::string key = it->key().ToString();
+      if (records.size() >= maxSize)
+        break;
+      if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) {
+        records.push_back(eventRead);
+      }
+    }
+    delete it;
+  }
+
+  virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) {
+    rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
+    max_size = 0;
+    for (it->SeekToFirst(); it->Valid(); it->Next()) {
+      std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
+      std::string key = it->key().ToString();
+
+      if (store.at(max_size)->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) {
+        max_size++;
+      }
+      if (store.size() >= max_size)
+        break;
+    }
+    delete it;
+    if (max_size > 0) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+  //! purge record
+  void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records) {
+    for (auto record : records) {
+      Delete(record->getEventId());
+    }
+    flush();
+  }
+  // destroy
+  void destroy() {
+    if (db_) {
+      delete db_;
+      db_ = NULL;
+    }
+  }
+  // Run function for the thread
+  void run();
+
+  // Prevent default copy constructor and assignment operation
+  // Only support pass by reference or pointer
+  ProvenanceRepository(const ProvenanceRepository &parent) = delete;
+  ProvenanceRepository &operator=(const ProvenanceRepository &parent) = delete;
+
+ private:
+  moodycamel::ConcurrentQueue<std::string> keys_to_delete;
+  rocksdb::DB* db_;
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+} /* namespace provenance */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_ */
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/RocksDBLoader.cpp
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/RocksDBLoader.cpp b/extensions/rocksdb-repos/RocksDBLoader.cpp
new file mode 100644
index 0000000..444c1db
--- /dev/null
+++ b/extensions/rocksdb-repos/RocksDBLoader.cpp
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/FlowConfiguration.h"
+#include "RocksDBLoader.h"
+
+bool RocksDBFactory::added = core::FlowConfiguration::add_static_func("createRocksDBFactory");
+
+extern "C" {
+
+void *createRocksDBFactory(void) {
+  return new RocksDBFactory();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/RocksDBLoader.h
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/RocksDBLoader.h b/extensions/rocksdb-repos/RocksDBLoader.h
new file mode 100644
index 0000000..4b14def
--- /dev/null
+++ b/extensions/rocksdb-repos/RocksDBLoader.h
@@ -0,0 +1,80 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_ROCKSDBLOADER_H
+#define EXTENSIONS_ROCKSDBLOADER_H
+
+#include "DatabaseContentRepository.h"
+#include "FlowFileRepository.h"
+#include "ProvenanceRepository.h"
+#include "RocksDbStream.h"
+#include "core/ClassLoader.h"
+
+class __attribute__((visibility("default"))) RocksDBFactory : public core::ObjectFactory {
+ public:
+  RocksDBFactory() {
+
+  }
+
+  /**
+   * Gets the name of the object.
+   * @return class name of processor
+   */
+  virtual std::string getName() {
+    return "RocksDBFactory";
+  }
+
+  virtual std::string getClassName() {
+    return "RocksDBFactory";
+  }
+  /**
+   * Gets the class name for the object
+   * @return class name for the processor.
+   */
+  virtual std::vector<std::string> getClassNames() {
+    std::vector<std::string> class_names;
+    class_names.push_back("DatabaseContentRepository");
+    class_names.push_back("FlowFileRepository");
+    class_names.push_back("ProvenanceRepository");
+    class_names.push_back("databasecontentrepository");
+    class_names.push_back("flowfilerepository");
+    class_names.push_back("provenancerepository");
+    return class_names;
+  }
+
+  virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) {
+    std::string name = class_name;
+    std::transform(name.begin(), name.end(), name.begin(), ::tolower);
+    if (name == "databasecontentrepository") {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<core::repository::DatabaseContentRepository>());
+    } else if (name == "flowfilerepository") {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<core::repository::FlowFileRepository>());
+    } else if (name == "provenancerepository") {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::provenance::ProvenanceRepository>());
+    } else {
+      return nullptr;
+    }
+  }
+
+  static bool added;
+
+};
+
+extern "C" {
+void *createRocksDBFactory(void);
+}
+#endif /* EXTENSIONS_ROCKSDBLOADER_H */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/RocksDbStream.cpp
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp
new file mode 100644
index 0000000..73f691e
--- /dev/null
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -0,0 +1,124 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RocksDbStream.h"
+#include <fstream>
+#include <vector>
+#include <memory>
+#include <string>
+#include "io/validation.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+RocksDbStream::RocksDbStream(const std::string &path, rocksdb::DB *db, bool write_enable)
+    : BaseStream(),
+      logger_(logging::LoggerFactory<RocksDbStream>::getLogger()),
+      db_(db),
+      path_(path),
+      write_enable_(write_enable) {
+  rocksdb::Status status;
+  status = db_->Get(rocksdb::ReadOptions(), path_, &value_);
+  if (status.ok()) {
+    exists_ = true;
+  } else {
+    exists_ = false;
+  }
+  offset_ = 0;
+  size_ = value_.size();
+}
+
+void RocksDbStream::closeStream() {
+}
+
+void RocksDbStream::seek(uint64_t offset) {
+  // noop
+}
+
+int RocksDbStream::writeData(std::vector<uint8_t> &buf, int buflen) {
+  if (buf.capacity() < buflen) {
+    return -1;
+  }
+  return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen);
+}
+
+// data stream overrides
+
+int RocksDbStream::writeData(uint8_t *value, int size) {
+  if (!IsNullOrEmpty(value) && write_enable_) {
+    rocksdb::Slice slice_value((const char *) value, size);
+    rocksdb::Status status;
+    size_ += size;
+    rocksdb::WriteOptions opts;
+    opts.sync = true;
+    db_->Merge(opts, path_, slice_value);
+    if (status.ok()) {
+      return 0;
+    } else {
+      return -1;
+    }
+  } else {
+    return -1;
+  }
+}
+
+template<typename T>
+inline std::vector<uint8_t> RocksDbStream::readBuffer(const T& t) {
+  std::vector<uint8_t> buf;
+  buf.resize(sizeof t);
+  readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+  return buf;
+}
+
+int RocksDbStream::readData(std::vector<uint8_t> &buf, int buflen) {
+  if (buf.capacity() < buflen) {
+    buf.resize(buflen);
+  }
+  int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
+
+  if (ret < buflen) {
+    buf.resize(ret);
+  }
+  return ret;
+}
+
+int RocksDbStream::readData(uint8_t *buf, int buflen) {
+  if (!IsNullOrEmpty(buf) && exists_) {
+    int amtToRead = buflen;
+    if (offset_ >= value_.size()) {
+      return 0;
+    }
+    if (buflen > value_.size() - offset_) {
+      amtToRead = value_.size() - offset_;
+    }
+    std::memcpy(buf, value_.data() + offset_, amtToRead);
+    offset_ += amtToRead;
+    return amtToRead;
+  } else {
+    return -1;
+  }
+}
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/RocksDbStream.h
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h
new file mode 100644
index 0000000..da08899
--- /dev/null
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -0,0 +1,185 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_TLS_RocksDbStream_H_
+#define LIBMINIFI_INCLUDE_IO_TLS_RocksDbStream_H_
+
+#include "rocksdb/db.h"
+#include <iostream>
+#include <cstdint>
+#include <string>
+#include "io/EndianCheck.h"
+#include "io/BaseStream.h"
+#include "io/Serializable.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+/**
+ * Purpose: File Stream Base stream extension. This is intended to be a thread safe access to
+ * read/write to the local file system.
+ *
+ * Design: Simply extends BaseStream and overrides readData/writeData to allow a sink to the
+ * fstream object.
+ */
+class RocksDbStream : public io::BaseStream {
+ public:
+  /**
+   * File Stream constructor that accepts an fstream shared pointer.
+   * It must already be initialized for read and write.
+   */
+  explicit RocksDbStream(const std::string &path, rocksdb::DB *db, bool write_enable = false);
+
+  /**
+   * File Stream constructor that accepts an fstream shared pointer.
+   * It must already be initialized for read and write.
+   */
+  explicit RocksDbStream(const std::string &path);
+
+  virtual ~RocksDbStream() {
+    closeStream();
+  }
+
+  virtual void closeStream();
+  /**
+   * Skip to the specified offset.
+   * @param offset offset to which we will skip
+   */
+  void seek(uint64_t offset);
+
+  const uint32_t getSize() const {
+    return size_;
+  }
+
+  virtual int read(uint16_t &value, bool is_little_endian) {
+    uint8_t buf[2];
+    if (readData(&buf[0], 2) < 0)
+      return -1;
+    if (is_little_endian) {
+      value = (buf[0] << 8) | buf[1];
+    } else {
+      value = buf[0] | buf[1] << 8;
+    }
+    return 2;
+  }
+
+  virtual int read(uint32_t &value, bool is_little_endian) {
+    uint8_t buf[4];
+    if (readData(&buf[0], 4) < 0)
+      return -1;
+
+    if (is_little_endian) {
+      value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
+    } else {
+      value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
+    }
+
+    return 4;
+  }
+  virtual int read(uint64_t &value, bool is_little_endian) {
+    uint8_t buf[0];
+    if (readData(&buf[0], 8) < 0)
+      return -1;
+    if (is_little_endian) {
+      value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)
+          | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
+    } else {
+      value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | ((uint64_t) (buf[4] & 255) << 32)
+          | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
+    }
+    return 8;
+  }
+
+  // data stream extensions
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(std::vector<uint8_t> &buf, int buflen);
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(uint8_t *buf, int buflen);
+
+  /**
+   * Write value to the stream using std::vector
+   * @param buf incoming buffer
+   * @param buflen buffer to write
+   *
+   */
+  virtual int writeData(std::vector<uint8_t> &buf, int buflen);
+
+  /**
+   * writes value to stream
+   * @param value value to write
+   * @param size size of value
+   */
+  virtual int writeData(uint8_t *value, int size);
+
+  /**
+   * Returns the underlying buffer
+   * @return vector's array
+   **/
+  const uint8_t *getBuffer() const {
+    throw std::runtime_error("Stream does not support this operation");
+  }
+
+ protected:
+
+  /**
+   * Creates a vector and returns the vector using the provided
+   * type name.
+   * @param t incoming object
+   * @returns vector.
+   */
+  template<typename T>
+  std::vector<uint8_t> readBuffer(const T&);
+
+  std::string path_;
+
+  bool write_enable_;
+
+  bool exists_;
+
+  int64_t offset_;
+
+  std::string value_;
+
+  rocksdb::DB *db_;
+
+  size_t size_;
+
+ private:
+
+  std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_IO_TLS_RocksDbStream_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 22046e3..9bdab2a 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -39,8 +39,8 @@ IF (IOS)
 set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fobjc-abi-version=2 -fobjc-arc -std=gnu++11 -stdlib=libc++ -isysroot ${CMAKE_OSX_SYSROOT} -DIOS")
 set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fobjc-abi-version=2 -fobjc-arc -isysroot ${CMAKE_OSX_SYSROOT} -DIOS")
 ELSE ()
-set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DLEVELDB_SUPPORT -DOPENSSL_SUPPORT -DYAML_SUPPORT")
-set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DLEVELDB_SUPPORT -DOPENSSL_SUPPORT -DYAML_SUPPORT")
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DOPENSSL_SUPPORT -DYAML_SUPPORT")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DOPENSSL_SUPPORT -DYAML_SUPPORT")
 ENDIF()
 
 
@@ -61,6 +61,7 @@ include_directories(../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include)
 include_directories(../thirdparty/civetweb-1.9.1/include)
 include_directories(../thirdparty/jsoncpp/include)
 include_directories(../thirdparty/concurrentqueue/)
+include_directories(../thirdparty/rocksdb/include)
 include_directories(include)
 
 file(GLOB SOURCES  "src/core/logging/*.cpp"  "src/core/state/*.cpp" "src/c2/protocols/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp"  "src/provenance/*.cpp" "src/processors/*.cpp" "src/utils/*.cpp" "src/*.cpp")
@@ -90,14 +91,10 @@ find_package(Boost COMPONENTS system filesystem REQUIRED)
 target_link_libraries(minifi ${Boost_SYSTEM_LIBRARY})
 target_link_libraries(minifi ${Boost_FILESYSTEM_LIBRARY})
 
-# Include LevelDB
-find_package (Leveldb REQUIRED)
-if (LEVELDB_FOUND)
-	include_directories(${LEVELDB_INCLUDE_DIRS})
-	target_link_libraries (minifi ${LEVELDB_LIBRARIES})
-else ()
-    message( FATAL_ERROR "LevelDB was not found. Please install LevelDB" )
-endif (LEVELDB_FOUND)
+if (CURL_FOUND)
+        include_directories(${CURL_INCLUDE_DIRS})
+        target_link_libraries (minifi ${CURL_LIBRARIES})
+endif(CURL_FOUND)
 
 # Include OpenSSL
 find_package (OpenSSL REQUIRED)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/cmake/FindLeveldb.cmake
----------------------------------------------------------------------
diff --git a/libminifi/cmake/FindLeveldb.cmake b/libminifi/cmake/FindLeveldb.cmake
deleted file mode 100644
index 32adafa..0000000
--- a/libminifi/cmake/FindLeveldb.cmake
+++ /dev/null
@@ -1,50 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-# Find module for Leveldb library and includes
-#  LEVELDB_FOUND - if system found LEVELDB library
-#  LEVELDB_INCLUDE_DIRS - The LEVELDB include directories
-#  LEVELDB_LIBRARIES - The libraries needed to use LEVELDB
-#  LEVELDB_DEFINITIONS - Compiler switches required for using LEVELDB
-
-# For OS X do not attempt to use the OS X application frameworks or bundles.
-set (CMAKE_FIND_FRAMEWORK NEVER)
-set (CMAKE_FIND_APPBUNDLE NEVER)
-
-find_path(LEVELDB_INCLUDE_DIR
-    NAMES leveldb/db.h
-    PATHS /usr/local/include /usr/include
-    DOC "LevelDB include header"
-)
-
-find_library(LEVELDB_LIBRARY 
-    NAMES libleveldb.dylib libleveldb.so
-    PATHS /usr/local/lib /usr/lib/x86_64-linux-gnu
-    DOC "LevelDB library"
-)
-
-include(FindPackageHandleStandardArgs)
-find_package_handle_standard_args(LEVELDB DEFAULT_MSG LEVELDB_INCLUDE_DIR LEVELDB_LIBRARY)
-
-if (LEVELDB_FOUND)
-    set(LEVELDB_LIBRARIES ${LEVELDB_LIBRARY} )
-    set(LEVELDB_INCLUDE_DIRS ${LEVELDB_INCLUDE_DIR} )
-    set(LEVELDB_DEFINITIONS )
-endif()
-
-mark_as_advanced(LEVELDB_ROOT_DIR LEVELDB_INCLUDE_DIR LEVELDB_LIBRARY)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/cmake/FindRocksDB.cmake
----------------------------------------------------------------------
diff --git a/libminifi/cmake/FindRocksDB.cmake b/libminifi/cmake/FindRocksDB.cmake
new file mode 100644
index 0000000..db9c2d1
--- /dev/null
+++ b/libminifi/cmake/FindRocksDB.cmake
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#  ROCKSDB_FOUND               System has RocksDB library/headers.
+#  ROCKSDB_LIBRARIES           The RocksDB library.
+#  ROCKSDB_INCLUDE_DIR        The location of RocksDB headers.
+
+find_path(ROCKSDB_ROOT_DIR
+    NAMES include/rocksdb/db.h
+)
+
+find_library(ROCKSDB_LIBRARIES
+    NAMES rocksdb
+    HINTS ${ROCKSDB_ROOT_DIR}/lib
+)
+
+find_path(ROCKSDB_INCLUDE_DIR
+    NAMES rocksdb/db.h
+    HINTS ${ROCKSDB_ROOT_DIR}/include
+)
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(RocksDB DEFAULT_MSG
+    ROCKSDB_LIBRARIES
+    ROCKSDB_INCLUDE_DIR
+)
+
+mark_as_advanced(
+    ROCKSDB_ROOT_DIR
+    ROCKSDB_LIBRARIES
+    ROCKSDB_INCLUDE_DIR
+)
+
+if(ROCKSDB_INCLUDE_DIR AND ROCKSDB_LIBRARIES)
+  set(ROCKSDB_FOUND "YES")
+  message(STATUS "Found RocksDB...${ROCKSDB_LIBRARIES}")
+endif()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/Connection.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index c92a626..e88b071 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -48,7 +48,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
    * Create a new processor
    */
   explicit Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, uuid_t uuid = NULL, uuid_t srcUUID =
-                          NULL,
+  NULL,
                       uuid_t destUUID = NULL);
   // Destructor
   virtual ~Connection() {
@@ -132,6 +132,12 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
   uint64_t getQueueDataSize() {
     return queued_data_size_;
   }
+  void put(std::shared_ptr<core::Connectable> flow) {
+    std::shared_ptr<core::FlowFile> ff = std::static_pointer_cast<core::FlowFile>(flow);
+    if (nullptr != ff) {
+      put(ff);
+    }
+  }
   // Put the flow file into queue
   void put(std::shared_ptr<core::FlowFile> flow);
   // Poll the flow file from queue, the expired flow file record also being returned

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/SchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 569c4ee..3fc72c6 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -38,7 +38,6 @@
 #include "core/ProcessContext.h"
 #include "core/controller/ControllerServiceProvider.h"
 #include "core/controller/ControllerServiceNode.h"
-#include "provenance/ProvenanceRepository.h"
 
 namespace org {
 namespace apache {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/ClassLoader.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ClassLoader.h b/libminifi/include/core/ClassLoader.h
index c44cb9a..d16a39b 100644
--- a/libminifi/include/core/ClassLoader.h
+++ b/libminifi/include/core/ClassLoader.h
@@ -21,7 +21,6 @@
 #include <mutex>
 #include <vector>
 #include <map>
-#include "Connectable.h"
 #include "utils/StringUtils.h"
 #include <dlfcn.h>
 #include "core/Core.h"
@@ -55,28 +54,28 @@ class ObjectFactory {
   /**
    * Create a shared pointer to a new processor.
    */
-  virtual std::shared_ptr<Connectable> create(const std::string &name) {
+  virtual std::shared_ptr<CoreComponent> create(const std::string &name) {
     return nullptr;
   }
 
   /**
    * Create a shared pointer to a new processor.
    */
-  virtual Connectable *createRaw(const std::string &name) {
+  virtual CoreComponent *createRaw(const std::string &name) {
     return nullptr;
   }
 
   /**
    * Create a shared pointer to a new processor.
    */
-  virtual std::shared_ptr<Connectable> create(const std::string &name, uuid_t uuid) {
+  virtual std::shared_ptr<CoreComponent> create(const std::string &name, uuid_t uuid) {
     return nullptr;
   }
 
   /**
    * Create a shared pointer to a new processor.
    */
-  virtual Connectable* createRaw(const std::string &name, uuid_t uuid) {
+  virtual CoreComponent* createRaw(const std::string &name, uuid_t uuid) {
     return nullptr;
   }
 
@@ -119,33 +118,33 @@ class DefautObjectFactory : public ObjectFactory {
   /**
    * Create a shared pointer to a new processor.
    */
-  virtual std::shared_ptr<Connectable> create(const std::string &name) {
+  virtual std::shared_ptr<CoreComponent> create(const std::string &name) {
     std::shared_ptr<T> ptr = std::make_shared<T>(name);
-    return std::static_pointer_cast<Connectable>(ptr);
+    return std::static_pointer_cast<CoreComponent>(ptr);
   }
 
   /**
    * Create a shared pointer to a new processor.
    */
-  virtual std::shared_ptr<Connectable> create(const std::string &name, uuid_t uuid) {
+  virtual std::shared_ptr<CoreComponent> create(const std::string &name, uuid_t uuid) {
     std::shared_ptr<T> ptr = std::make_shared<T>(name, uuid);
-    return std::static_pointer_cast<Connectable>(ptr);
+    return std::static_pointer_cast<CoreComponent>(ptr);
   }
 
   /**
    * Create a shared pointer to a new processor.
    */
-  virtual Connectable* createRaw(const std::string &name) {
+  virtual CoreComponent* createRaw(const std::string &name) {
     T *ptr = new T(name);
-    return dynamic_cast<Connectable*>(ptr);
+    return dynamic_cast<CoreComponent*>(ptr);
   }
 
   /**
    * Create a shared pointer to a new processor.
    */
-  virtual Connectable* createRaw(const std::string &name, uuid_t uuid) {
+  virtual CoreComponent* createRaw(const std::string &name, uuid_t uuid) {
     T *ptr = new T(name, uuid);
-    return dynamic_cast<Connectable*>(ptr);
+    return dynamic_cast<CoreComponent*>(ptr);
   }
 
   /**
@@ -234,7 +233,7 @@ class ClassLoader {
    * @param uuid uuid of object
    * @return nullptr or object created from class_name definition.
    */
-  template<class T = Connectable>
+  template<class T = CoreComponent>
   std::shared_ptr<T> instantiate(const std::string &class_name, const std::string &name);
 
   /**
@@ -243,7 +242,7 @@ class ClassLoader {
    * @param uuid uuid of object
    * @return nullptr or object created from class_name definition.
    */
-  template<class T = Connectable>
+  template<class T = CoreComponent>
   std::shared_ptr<T> instantiate(const std::string &class_name, uuid_t uuid);
 
   /**
@@ -252,7 +251,7 @@ class ClassLoader {
    * @param uuid uuid of object
    * @return nullptr or object created from class_name definition.
    */
-  template<class T = Connectable>
+  template<class T = CoreComponent>
   T *instantiateRaw(const std::string &class_name, const std::string &name);
 
   /**
@@ -261,7 +260,7 @@ class ClassLoader {
    * @param uuid uuid of object
    * @return nullptr or object created from class_name definition.
    */
-  template<class T = Connectable>
+  template<class T = CoreComponent>
   T *instantiateRaw(const std::string &class_name, uuid_t uuid);
 
  protected:
@@ -282,7 +281,7 @@ std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name, const
   auto factory_entry = loaded_factories_.find(class_name);
   if (factory_entry != loaded_factories_.end()) {
     auto obj = factory_entry->second->create(name);
-    return std::static_pointer_cast<T>(obj);
+    return std::dynamic_pointer_cast<T>(obj);
   } else {
     return nullptr;
   }
@@ -294,7 +293,7 @@ std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name, uuid_
   auto factory_entry = loaded_factories_.find(class_name);
   if (factory_entry != loaded_factories_.end()) {
     auto obj = factory_entry->second->create(class_name, uuid);
-    return std::static_pointer_cast<T>(obj);
+    return std::dynamic_pointer_cast<T>(obj);
   } else {
     return nullptr;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/Connectable.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h
index 5d27901..588b67a 100644
--- a/libminifi/include/core/Connectable.h
+++ b/libminifi/include/core/Connectable.h
@@ -70,6 +70,10 @@ class __attribute__((visibility("default"))) Connectable : public CoreComponent
    */
   std::set<std::shared_ptr<Connectable>> getOutGoingConnections(std::string relationship);
 
+  void put(std::shared_ptr<Connectable> flow) {
+
+  }
+
   /**
    * Get next incoming connection
    * @return next incoming connection

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/ContentRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 5558b93..54ec8d3 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -23,6 +23,7 @@
 #include "io/DataStream.h"
 #include "io/BaseStream.h"
 #include "StreamManager.h"
+#include "core/Connectable.h"
 
 namespace org {
 namespace apache {
@@ -35,6 +36,7 @@ namespace core {
  */
 class ContentRepository : public StreamManager<minifi::ResourceClaim> {
  public:
+
   virtual ~ContentRepository() {
 
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/Core.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h
index 1dc79e7..80b1ca4 100644
--- a/libminifi/include/core/Core.h
+++ b/libminifi/include/core/Core.h
@@ -118,8 +118,12 @@ class CoreComponent {
     uuid_copy(uuid_, other.uuid_);
   }
 
+  virtual ~CoreComponent() {
+
+  }
+
   // Get component name Name
-  std::string getName();
+  virtual std::string getName();
 
   /**
    * Set name.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index 344a188..6c50772 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -69,9 +69,8 @@ class FlowConfiguration : public CoreComponent {
         logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) {
     controller_services_ = std::make_shared<core::controller::ControllerServiceMap>();
     service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration);
-    for(auto sl_func : statics_sl_funcs_){
-      registerResource("",sl_func);
-    }
+    // it is okay if this has already been called
+    initialize_static_functions();
   }
 
   virtual ~FlowConfiguration();
@@ -120,11 +119,19 @@ class FlowConfiguration : public CoreComponent {
     return service_provider_;
   }
 
-  static bool add_static_func(std::string functor){
+  static bool add_static_func(std::string functor) {
     statics_sl_funcs_.push_back(functor);
     return true;
   }
 
+  static void initialize_static_functions() {
+    std::lock_guard<std::mutex> lock(atomic_initialization_);
+    for (auto sl_func : statics_sl_funcs_) {
+      core::ClassLoader::getDefaultClassLoader().registerResource("", sl_func);
+    }
+    statics_sl_funcs_.clear();
+  }
+
  protected:
 
   void registerResource(const std::string &resource_function) {
@@ -151,6 +158,7 @@ class FlowConfiguration : public CoreComponent {
 
  private:
   std::shared_ptr<logging::Logger> logger_;
+  static std::mutex atomic_initialization_;
   static std::vector<std::string> statics_sl_funcs_;
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/FlowFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h
index 95b901b..8417ac9 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -28,7 +28,7 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
-class FlowFile {
+class FlowFile : public core::Connectable {
  public:
   FlowFile();
   ~FlowFile();
@@ -179,11 +179,6 @@ class FlowFile {
    */
   uint64_t getOffset();
 
-  // Get the UUID as string
-  std::string getUUIDStr() {
-    return uuid_str_;
-  }
-
   bool getUUID(uuid_t other) {
     uuid_copy(other, uuid_);
     return true;
@@ -195,6 +190,27 @@ class FlowFile {
   }
 
   /**
+     * Yield
+     */
+    virtual void yield(){
+
+    }
+    /**
+     * Determines if we are connected and operating
+     */
+    virtual bool isRunning(){
+      return true;
+    }
+
+    /**
+     * Determines if work is available by this connectable
+     * @return boolean if work is available.
+     */
+    virtual bool isWorkAvailable(){
+      return true;
+    }
+
+  /**
    * Sets the original connection with a shared pointer.
    * @param connection shared connection.
    */
@@ -245,7 +261,6 @@ class FlowFile {
   // Size in bytes of the data corresponding to this flow file
   uint64_t size_;
   // A global unique identifier
-  uuid_t uuid_;
   // A local unique identifier
   uint64_t id_;
   // Offset to the content
@@ -257,7 +272,7 @@ class FlowFile {
   // Pointer to the associated content resource claim
   std::shared_ptr<ResourceClaim> claim_;
   // UUID string
-  std::string uuid_str_;
+  //std::string uuid_str_;
   // UUID string for all parents
   std::set<std::string> lineage_Identifiers_;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index 907fdfc..93caab9 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -167,6 +167,8 @@ class ProcessGroup {
 
   void getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap);
 
+  void getConnections(std::map<std::string, std::shared_ptr<Connectable>> &connectionMap);
+
  protected:
   // A global unique identifier
   uuid_t uuid_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/Repository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h
index cdc81c5..427a46d 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -41,6 +41,7 @@
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "Core.h"
+#include "core/Connectable.h"
 
 namespace org {
 namespace apache {
@@ -53,7 +54,7 @@ namespace core {
 #define MAX_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute
 #define REPOSITORY_PURGE_PERIOD (2500) // 2500 msec
 
-class Repository : public core::SerializableComponent {
+class Repository : public virtual core::SerializableComponent {
  public:
   /*
    * Constructor for the repository
@@ -100,6 +101,12 @@ class Repository : public core::SerializableComponent {
     }
     return found;
   }
+
+  void setConnectionMap(std::map<std::string, std::shared_ptr<core::Connectable>> &connectionMap) {
+      this->connectionMap = connectionMap;
+    }
+
+
   virtual bool Get(const std::string &key, std::string &value) {
     return false;
   }
@@ -204,6 +211,7 @@ class Repository : public core::SerializableComponent {
   Repository &operator=(const Repository &parent) = delete;
 
  protected:
+  std::map<std::string, std::shared_ptr<core::Connectable>> connectionMap;
   // Mutex for protection
   std::mutex mutex_;
   // repository directory

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/SerializableComponent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/SerializableComponent.h b/libminifi/include/core/SerializableComponent.h
index f7f9feb..2cf13b5 100644
--- a/libminifi/include/core/SerializableComponent.h
+++ b/libminifi/include/core/SerializableComponent.h
@@ -19,6 +19,7 @@
 #define LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_
 
 #include "io/Serializable.h"
+#include "core/Connectable.h"
 #include "core/Core.h"
 
 namespace org {
@@ -30,12 +31,12 @@ namespace core {
 /**
  * Represents a component that is serializable and an extension point of core Component
  */
-class SerializableComponent : public core::CoreComponent, public minifi::io::Serializable {
+class SerializableComponent : public core::Connectable, public minifi::io::Serializable {
 
  public:
 
   SerializableComponent(const std::string name, uuid_t uuid = nullptr)
-      : core::CoreComponent(name, uuid) {
+      : core::Connectable(name, uuid) {
 
   }
 
@@ -76,6 +77,25 @@ class SerializableComponent : public core::CoreComponent, public minifi::io::Ser
     return false;
   }
 
+  virtual void yield() {
+
+  }
+
+  /**
+   * Determines if we are connected and operating
+   */
+  virtual bool isRunning() {
+    return true;
+  }
+
+  /**
+   * Determines if work is available by this connectable
+   * @return boolean if work is available.
+   */
+  virtual bool isWorkAvailable() {
+    return true;
+  }
+
 };
 
 } /* namespace core */


Mime
View raw message