nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sza...@apache.org
Subject [nifi-minifi-cpp] 01/06: MINIFICPP-1414 Create in-memory compressed logs
Date Wed, 23 Jun 2021 16:55:49 GMT
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit a09d158f500e1e977d289c5b03fc0d11574f8340
Author: Adam Debreceni <adebreceni@apache.org>
AuthorDate: Wed Jun 23 17:29:32 2021 +0200

    MINIFICPP-1414 Create in-memory compressed logs
    
    Closes #955
    
    Signed-off-by: Marton Szasz <szaszm@apache.org>
---
 conf/minifi-log.properties                         |   9 ++
 libminifi/CMakeLists.txt                           |   2 +-
 libminifi/include/core/TypedValues.h               |  88 ++++------
 .../include/core/logging/LoggerConfiguration.h     |  53 +++---
 libminifi/include/core/logging/LoggerProperties.h  |  69 ++++++++
 .../core/logging/internal/ActiveCompressor.h       |  72 +++++++++
 .../core/logging/internal/CompressionManager.h     |  84 ++++++++++
 .../core/logging/internal/LogBuffer.h}             |  46 ++++--
 .../core/logging/internal/LogCompressor.h}         |  35 ++--
 .../core/logging/internal/LogCompressorSink.h      |  92 +++++++++++
 libminifi/include/io/BufferStream.h                |   8 +
 libminifi/include/io/ZlibStream.h                  |  13 +-
 libminifi/include/utils/Literals.h                 |  59 +++++++
 libminifi/include/utils/StagingQueue.h             | 178 +++++++++++++++++++++
 libminifi/include/utils/ValueParser.h              |   4 +
 libminifi/src/core/TypedValues.cpp                 |   9 ++
 libminifi/src/core/logging/LoggerConfiguration.cpp |  55 +++++--
 .../core/logging/internal/CompressionManager.cpp   |  74 +++++++++
 .../internal/LogCompressor.cpp}                    |  31 ++--
 .../core/logging/internal/LogCompressorSink.cpp    |  81 ++++++++++
 libminifi/src/io/ZlibStream.cpp                    |  39 +++--
 libminifi/test/unit/LoggerTests.cpp                |  97 ++++++++++-
 libminifi/test/unit/StagingQueueTests.cpp          | 121 ++++++++++++++
 23 files changed, 1176 insertions(+), 143 deletions(-)

diff --git a/conf/minifi-log.properties b/conf/minifi-log.properties
index 250cb46..aeca6dd 100644
--- a/conf/minifi-log.properties
+++ b/conf/minifi-log.properties
@@ -45,3 +45,12 @@ logger.org::apache::nifi::minifi=INFO,rolling
 
 #Logging configurable by class fully qualified name
 #logger.org::apache::nifi::minifi::core::logging::LoggerConfiguration=DEBUG
+
+# Log compression #
+## Enables the agent to keep a limited chunk of the application
+## logs in memory in compressed format. Note that due to its
+## compressed nature this could mean more logs than the contents
+## of the log files.
+## Setting any of these to 0 disables the in-memory log compression.
+#compression.cached.log.max.size=8 MB
+#compression.compressed.log.max.size=8 MB
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index a9cce79..6550e7f 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -57,7 +57,7 @@ if (NOT OPENSSL_OFF)
 	set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp")
 endif()
 
-file(GLOB SOURCES "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp"  "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/serialization/*.cpp" "src/pro [...]
+file(GLOB SOURCES "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp" "src/core/logging/internal/*.cpp"  "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp"  [...]
 # manually add this as it might not yet be present when this executes
 list(APPEND SOURCES "src/agent/agent_version.cpp")
 
diff --git a/libminifi/include/core/TypedValues.h b/libminifi/include/core/TypedValues.h
index b3808ba..09050f1 100644
--- a/libminifi/include/core/TypedValues.h
+++ b/libminifi/include/core/TypedValues.h
@@ -21,12 +21,15 @@
 #include <algorithm>
 #include <string>
 #include <typeindex>
+#include <map>
+#include <memory>
 
 #include "state/Value.h"
 #include "utils/StringUtils.h"
 #include "utils/ValueParser.h"
 #include "utils/PropertyErrors.h"
 #include "utils/OptionalUtils.h"
+#include "utils/Literals.h"
 
 namespace org {
 namespace apache {
@@ -114,6 +117,8 @@ class TimePeriodValue : public TransformableValue, public state::response::UInt6
  * format <numeric> <byte size>.
  */
 class DataSizeValue : public TransformableValue, public state::response::UInt64Value {
+  static std::shared_ptr<logging::Logger>& getLogger();
+
  public:
   static const std::type_index type_id;
 
@@ -128,72 +133,41 @@ class DataSizeValue : public TransformableValue, public state::response::UInt64V
   }
 
 
-// Convert String to Integer
+  // Convert String to Integer
   template<typename T, typename std::enable_if<
       std::is_integral<T>::value>::type* = nullptr>
   static bool StringToInt(const std::string &input, T &output) {
-    if (input.size() == 0) {
+    // TODO(adebreceni): this mapping is to preserve backwards compatibility,
+    //  we should entertain the idea of moving to standardized units in
+    //  the configuration (i.e. K = 1000, Ki = 1024)
+    static std::map<std::string, int64_t> unit_map{
+      {"B", 1},
+      {"K", 1_KB}, {"M", 1_MB}, {"G", 1_GB}, {"T", 1_TB}, {"P", 1_PB},
+      {"KB", 1_KiB}, {"MB", 1_MiB}, {"GB", 1_GiB}, {"TB", 1_TiB}, {"PB", 1_PiB},
+    };
+
+    int64_t value;
+    std::string unit_str;
+    try {
+      unit_str = utils::StringUtils::trim(utils::internal::ValueParser(input).parse(value).rest());
+    } catch (const utils::internal::ParseException&) {
       return false;
     }
 
-    const char *cvalue = input.c_str();
-    char *pEnd;
-    auto ival = std::strtoll(cvalue, &pEnd, 0);
-
-    if (pEnd[0] == '\0') {
-      output = gsl::narrow<T>(ival);
-      return true;
-    }
-
-    while (*pEnd == ' ') {
-      // Skip the space
-      pEnd++;
-    }
-
-    char end0 = toupper(pEnd[0]);
-    if (end0 == 'B') {
-      output = gsl::narrow<T>(ival);
-      return true;
-    } else if ((end0 == 'K') || (end0 == 'M') || (end0 == 'G') || (end0 == 'T') || (end0 == 'P')) {
-      if (pEnd[1] == '\0') {
-        unsigned long int multiplier = 1000; // NOLINT
-
-        if ((end0 != 'K')) {
-          multiplier *= 1000;
-          if (end0 != 'M') {
-            multiplier *= 1000;
-            if (end0 != 'G') {
-              multiplier *= 1000;
-              if (end0 != 'T') {
-                multiplier *= 1000;
-              }
-            }
-          }
-        }
-        output = gsl::narrow<T>(ival * multiplier);
-        return true;
-
-      } else if ((pEnd[1] == 'b' || pEnd[1] == 'B') && (pEnd[2] == '\0')) {
-        unsigned long int multiplier = 1024; // NOLINT
-
-        if ((end0 != 'K')) {
-          multiplier *= 1024;
-          if (end0 != 'M') {
-            multiplier *= 1024;
-            if (end0 != 'G') {
-              multiplier *= 1024;
-              if (end0 != 'T') {
-                multiplier *= 1024;
-              }
-            }
-          }
-        }
-        output = gsl::narrow<T>(ival * multiplier);
-        return true;
+    if (!unit_str.empty()) {
+      std::transform(unit_str.begin(), unit_str.end(), unit_str.begin(), ::toupper);
+      auto multiplierIt = unit_map.find(unit_str);
+      if (multiplierIt == unit_map.end()) {
+        getLogger()->log_warn("Unrecognized data unit: '%s', in the future this will constitute as an error", unit_str);
+        // backwards compatibility
+        // return false;
+      } else {
+        value *= multiplierIt->second;
       }
     }
 
-    return false;
+    output = gsl::narrow<T>(value);
+    return true;
   }
 };
 
diff --git a/libminifi/include/core/logging/LoggerConfiguration.h b/libminifi/include/core/logging/LoggerConfiguration.h
index 7e93ef0..80e2f1e 100644
--- a/libminifi/include/core/logging/LoggerConfiguration.h
+++ b/libminifi/include/core/logging/LoggerConfiguration.h
@@ -36,7 +36,10 @@
 
 #include "core/Core.h"
 #include "core/logging/Logger.h"
-#include "properties/Properties.h"
+#include "LoggerProperties.h"
+#include "internal/CompressionManager.h"
+
+class LoggerTestAccessor;
 
 namespace org {
 namespace apache {
@@ -50,6 +53,8 @@ struct LoggerNamespace {
   spdlog::level::level_enum level;
   bool has_level;
   std::vector<std::shared_ptr<spdlog::sinks::sink>> sinks;
+  // sinks made available to all descendants
+  std::vector<std::shared_ptr<spdlog::sinks::sink>> exported_sinks;
   std::map<std::string, std::shared_ptr<LoggerNamespace>> children;
 
   LoggerNamespace()
@@ -61,37 +66,9 @@ struct LoggerNamespace {
 };
 }  // namespace internal
 
-class LoggerProperties : public Properties {
- public:
-  LoggerProperties()
-      : Properties("Logger properties") {
-  }
-  /**
-   * Gets all keys that start with the given prefix and do not have a "." after the prefix and "." separator.
-   *
-   * Ex: with type argument "appender"
-   * you would get back a property of "appender.rolling" but not "appender.rolling.file_name"
-   */
-  std::vector<std::string> get_keys_of_type(const std::string &type);
-
-  /**
-   * Registers a sink witht the given name. This allows for programmatic definition of sinks.
-   */
-  void add_sink(const std::string &name, std::shared_ptr<spdlog::sinks::sink> sink) {
-    sinks_[name] = sink;
-  }
-  std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> initial_sinks() {
-    return sinks_;
-  }
-
-  static const char* appender_prefix;
-  static const char* logger_prefix;
-
- private:
-  std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> sinks_;
-};
-
 class LoggerConfiguration {
+  friend class ::LoggerTestAccessor;
+
  public:
   /**
    * Gets the current log configuration
@@ -121,6 +98,15 @@ class LoggerConfiguration {
    */
   void initialize(const std::shared_ptr<LoggerProperties> &logger_properties);
 
+  static std::unique_ptr<io::InputStream> getCompressedLog(bool flush = false) {
+    return getCompressedLog(std::chrono::milliseconds{0}, flush);
+  }
+
+  template<class Rep, class Period>
+  static std::unique_ptr<io::InputStream> getCompressedLog(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
+    return getConfiguration().compression_manager_.getCompressedLog(time, flush);
+  }
+
   /**
    * Can be used to get arbitrarily named Logger, LoggerFactory should be preferred within a class.
    */
@@ -134,6 +120,10 @@ class LoggerConfiguration {
                                                     std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present = false);
 
  private:
+  std::shared_ptr<Logger> getLogger(const std::string& name, const std::lock_guard<std::mutex>& lock);
+
+  void initializeCompression(const std::lock_guard<std::mutex>& lock, const std::shared_ptr<LoggerProperties>& properties);
+
   static spdlog::sink_ptr create_syslog_sink();
   static spdlog::sink_ptr create_fallback_sink();
 
@@ -154,6 +144,7 @@ class LoggerConfiguration {
   };
 
   LoggerConfiguration();
+  internal::CompressionManager compression_manager_;
   std::shared_ptr<internal::LoggerNamespace> root_namespace_;
   std::vector<std::shared_ptr<LoggerImpl>> loggers;
   std::shared_ptr<spdlog::formatter> formatter_;
diff --git a/libminifi/include/core/logging/LoggerProperties.h b/libminifi/include/core/logging/LoggerProperties.h
new file mode 100644
index 0000000..1341a7f
--- /dev/null
+++ b/libminifi/include/core/logging/LoggerProperties.h
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <map>
+#include <vector>
+
+#include "spdlog/sinks/sink.h"
+
+#include "properties/Properties.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+
+class LoggerProperties : public Properties {
+ public:
+  LoggerProperties()
+      : Properties("Logger properties") {
+  }
+  /**
+   * Gets all keys that start with the given prefix and do not have a "." after the prefix and "." separator.
+   *
+   * Ex: with type argument "appender"
+   * you would get back a property of "appender.rolling" but not "appender.rolling.file_name"
+   */
+  std::vector<std::string> get_keys_of_type(const std::string &type);
+
+  /**
+   * Registers a sink witht the given name. This allows for programmatic definition of sinks.
+   */
+  void add_sink(const std::string &name, std::shared_ptr<spdlog::sinks::sink> sink) {
+    sinks_[name] = sink;
+  }
+  std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> initial_sinks() {
+    return sinks_;
+  }
+
+ private:
+  std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> sinks_;
+};
+
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/core/logging/internal/ActiveCompressor.h b/libminifi/include/core/logging/internal/ActiveCompressor.h
new file mode 100644
index 0000000..045d3e0
--- /dev/null
+++ b/libminifi/include/core/logging/internal/ActiveCompressor.h
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include "LogBuffer.h"
+#include "LogCompressor.h"
+#include "core/logging/Logger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+class ActiveCompressor {
+ public:
+  class Allocator {
+   public:
+    explicit Allocator(std::shared_ptr<logging::Logger> logger) : logger_{std::move(logger)} {}
+
+    ActiveCompressor operator()(size_t max_size) const {
+      ActiveCompressor instance;
+      instance.output_.reset(new io::BufferStream());
+      instance.output_->extend(max_size);
+      instance.compressor_.reset(new LogCompressor(gsl::make_not_null(instance.output_.get()), logger_));
+      return instance;
+    }
+
+   private:
+    std::shared_ptr<logging::Logger> logger_;
+  };
+
+  LogBuffer commit() {
+    compressor_->close();
+    return LogBuffer{std::move(output_)};
+  }
+
+  size_t size() const {
+    return output_->size();
+  }
+
+  std::unique_ptr<io::BufferStream> output_;
+  std::unique_ptr<LogCompressor> compressor_;
+};
+
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/core/logging/internal/CompressionManager.h b/libminifi/include/core/logging/internal/CompressionManager.h
new file mode 100644
index 0000000..b1ea9fa
--- /dev/null
+++ b/libminifi/include/core/logging/internal/CompressionManager.h
@@ -0,0 +1,84 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <mutex>
+#include <atomic>
+#include <functional>
+#include <utility>
+#include <string>
+
+#include "core/logging/Logger.h"
+#include "LogCompressorSink.h"
+#include "core/logging/LoggerProperties.h"
+#include "io/InputStream.h"
+#include "utils/Literals.h"
+
+class LoggerTestAccessor;
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+class CompressionManager {
+  friend class ::LoggerTestAccessor;
+
+  using LoggerFactory = std::function<std::shared_ptr<Logger>(const std::string&)>;
+
+ public:
+  std::shared_ptr<LogCompressorSink> initialize(const std::shared_ptr<LoggerProperties>& properties, const std::shared_ptr<Logger>& error_logger, const LoggerFactory& logger_factory);
+
+  template<class Rep, class Period>
+  std::unique_ptr<io::InputStream> getCompressedLog(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
+    std::shared_ptr<internal::LogCompressorSink> sink = getSink();
+    if (sink) {
+      return sink->getContent(time, flush);
+    }
+    return nullptr;
+  }
+
+  static constexpr const char* compression_cached_log_max_size_ = "compression.cached.log.max.size";
+  static constexpr const char* compression_compressed_log_max_size_ = "compression.compressed.log.max.size";
+
+ private:
+  std::shared_ptr<internal::LogCompressorSink> getSink() const {
+    // gcc4.8 bug => cannot use std::atomic_load
+    std::lock_guard<std::mutex> lock(mtx_);
+    return sink_;
+  }
+
+  std::atomic<size_t> cache_segment_size{1_MiB};
+  std::atomic<size_t> compressed_segment_size{1_MiB};
+
+  mutable std::mutex mtx_;
+  std::shared_ptr<LogCompressorSink> sink_;
+};
+
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/src/core/TypedValues.cpp b/libminifi/include/core/logging/internal/LogBuffer.h
similarity index 53%
copy from libminifi/src/core/TypedValues.cpp
copy to libminifi/include/core/logging/internal/LogBuffer.h
index 93a43b8..968be80 100644
--- a/libminifi/src/core/TypedValues.cpp
+++ b/libminifi/include/core/logging/internal/LogBuffer.h
@@ -16,19 +16,47 @@
  * limitations under the License.
  */
 
-#include "core/Property.h"
-#include "core/TypedValues.h"
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "io/BufferStream.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace core {
+namespace logging {
+namespace internal {
+
+class LogBuffer {
+ public:
+  LogBuffer() = default;
+  explicit LogBuffer(std::unique_ptr<io::BufferStream> buffer): buffer_{std::move(buffer)} {}
+
+  static LogBuffer allocate(size_t max_size) {
+    LogBuffer instance{utils::make_unique<io::BufferStream>()};
+    instance.buffer_->extend(max_size);
+    return instance;
+  }
+
+  LogBuffer commit() {
+    return LogBuffer{std::move(buffer_)};
+  }
+
+  size_t size() const {
+    return buffer_->size();
+  }
 
-const  std::type_index DataSizeValue::type_id = typeid(uint64_t);
-const  std::type_index TimePeriodValue::type_id = typeid(uint64_t);
+  std::unique_ptr<io::BufferStream> buffer_;
+};
 
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/src/core/TypedValues.cpp b/libminifi/include/core/logging/internal/LogCompressor.h
similarity index 62%
copy from libminifi/src/core/TypedValues.cpp
copy to libminifi/include/core/logging/internal/LogCompressor.h
index 93a43b8..9bc722e 100644
--- a/libminifi/src/core/TypedValues.cpp
+++ b/libminifi/include/core/logging/internal/LogCompressor.h
@@ -16,19 +16,36 @@
  * limitations under the License.
  */
 
-#include "core/Property.h"
-#include "core/TypedValues.h"
+#pragma once
+
+#include <memory>
+#include "io/ZlibStream.h"
+#include "io/OutputStream.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace core {
+namespace logging {
+namespace internal {
+
+class LogCompressor : public io::ZlibCompressStream {
+ public:
+  LogCompressor(gsl::not_null<OutputStream *> output, std::shared_ptr<logging::Logger> logger);
+
+  enum class FlushResult {
+    Success,
+    Error
+  };
 
-const  std::type_index DataSizeValue::type_id = typeid(uint64_t);
-const  std::type_index TimePeriodValue::type_id = typeid(uint64_t);
+  FlushResult flush();
+};
 
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/core/logging/internal/LogCompressorSink.h b/libminifi/include/core/logging/internal/LogCompressorSink.h
new file mode 100644
index 0000000..bbb7c24
--- /dev/null
+++ b/libminifi/include/core/logging/internal/LogCompressorSink.h
@@ -0,0 +1,92 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <atomic>
+#include <utility>
+
+#include "spdlog/common.h"
+#include "spdlog/details/log_msg.h"
+#include "spdlog/details/null_mutex.h"
+#include "spdlog/sinks/base_sink.h"
+#include "ActiveCompressor.h"
+#include "LogBuffer.h"
+#include "utils/StagingQueue.h"
+
+class LoggerTestAccessor;
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+struct LogQueueSize {
+  size_t max_total_size;
+  size_t max_segment_size;
+};
+
+class LogCompressorSink : public spdlog::sinks::base_sink<spdlog::details::null_mutex> {
+  friend class ::LoggerTestAccessor;
+
+ private:
+  void sink_it_(const spdlog::details::log_msg& msg) override;
+  void flush_() override;
+
+ public:
+  explicit LogCompressorSink(LogQueueSize cache_size, LogQueueSize compressed_size, std::shared_ptr<logging::Logger> logger);
+  ~LogCompressorSink() override;
+
+  template<class Rep, class Period>
+  std::unique_ptr<io::InputStream> getContent(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
+    if (flush) {
+      cached_logs_.commit();
+      compress(true);
+    }
+    LogBuffer compressed;
+    compressed_logs_.tryDequeue(compressed, time);
+    return std::move(compressed.buffer_);
+  }
+
+ private:
+  enum class CompressionResult {
+    Success,
+    NothingToCompress
+  };
+
+  CompressionResult compress(bool force_rotation = false);
+  void run();
+
+  std::atomic<bool> running_{true};
+  std::thread compression_thread_;
+
+  utils::StagingQueue<LogBuffer> cached_logs_;
+  utils::StagingQueue<ActiveCompressor, ActiveCompressor::Allocator> compressed_logs_;
+};
+
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/io/BufferStream.h b/libminifi/include/io/BufferStream.h
index 2290a3f..c16e15c 100644
--- a/libminifi/include/io/BufferStream.h
+++ b/libminifi/include/io/BufferStream.h
@@ -43,6 +43,14 @@ class BufferStream : public BaseStream {
     write(reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
   }
 
+  /*
+   * prepares the stream to accept and additional byte_count bytes
+   * @param byte_count number of bytes we expect to write
+   */
+  void extend(size_t byte_count) {
+    buffer_.reserve(buffer_.size() + byte_count);
+  }
+
   using BaseStream::read;
   using BaseStream::write;
 
diff --git a/libminifi/include/io/ZlibStream.h b/libminifi/include/io/ZlibStream.h
index b146413..f8efff8 100644
--- a/libminifi/include/io/ZlibStream.h
+++ b/libminifi/include/io/ZlibStream.h
@@ -26,7 +26,7 @@
 #include <vector>
 
 #include "BaseStream.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/Logger.h"
 #include "utils/gsl.h"
 
 namespace org {
@@ -80,8 +80,13 @@ class ZlibCompressStream : public ZlibBaseStream {
 
   void close() override;
 
- private:
-  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<ZlibCompressStream>::getLogger()};
+ protected:
+  ZlibCompressStream(gsl::not_null<OutputStream*> ouput, ZlibCompressionFormat format, int level, std::shared_ptr<core::logging::Logger> logger);
+
+  using FlushMode = int;
+  size_t write(const uint8_t* value, size_t size, FlushMode mode);
+
+  std::shared_ptr<core::logging::Logger> logger_;
 };
 
 class ZlibDecompressStream : public ZlibBaseStream {
@@ -98,7 +103,7 @@ class ZlibDecompressStream : public ZlibBaseStream {
   size_t write(const uint8_t *value, size_t size) override;
 
  private:
-  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<ZlibDecompressStream>::getLogger()};
+  std::shared_ptr<core::logging::Logger> logger_;
 };
 
 }  // namespace io
diff --git a/libminifi/include/utils/Literals.h b/libminifi/include/utils/Literals.h
new file mode 100644
index 0000000..9b47b8a
--- /dev/null
+++ b/libminifi/include/utils/Literals.h
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+constexpr unsigned long long operator "" _KiB(unsigned long long n) {  // NOLINT
+  return 1024 * n;
+}
+
+constexpr unsigned long long operator "" _MiB(unsigned long long n) {  // NOLINT
+  return 1024_KiB * n;
+}
+
+constexpr unsigned long long operator "" _GiB(unsigned long long n) {  // NOLINT
+  return 1024_MiB * n;
+}
+
+constexpr unsigned long long operator "" _TiB(unsigned long long n) {  // NOLINT
+  return 1024_GiB * n;
+}
+
+constexpr unsigned long long operator "" _PiB(unsigned long long n) {  // NOLINT
+  return 1024_TiB * n;
+}
+
+constexpr unsigned long long operator "" _KB(unsigned long long n) {  // NOLINT
+  return 1000 * n;
+}
+
+constexpr unsigned long long operator "" _MB(unsigned long long n) {  // NOLINT
+  return 1000_KB * n;
+}
+
+constexpr unsigned long long operator "" _GB(unsigned long long n) {  // NOLINT
+  return 1000_MB * n;
+}
+
+constexpr unsigned long long operator "" _TB(unsigned long long n) {  // NOLINT
+  return 1000_GB * n;
+}
+
+constexpr unsigned long long operator "" _PB(unsigned long long n) {  // NOLINT
+  return 1000_TB * n;
+}
diff --git a/libminifi/include/utils/StagingQueue.h b/libminifi/include/utils/StagingQueue.h
new file mode 100644
index 0000000..2714a3a
--- /dev/null
+++ b/libminifi/include/utils/StagingQueue.h
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <mutex>
+#include <atomic>
+#include <utility>
+#include "MinifiConcurrentQueue.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+namespace internal {
+template<typename T>
+struct default_allocator {
+  T operator()(size_t max_size) const {
+    return T::allocate(max_size);
+  }
+};
+}  // namespace internal
+
+/**
+ * Purpose: A FIFO container that allows chunked processing while trying to enforce
+ * soft limits like max chunk size and max total size. The "head" chunk might be
+ * modified in a thread-safe manner (usually appending to it) before committing it
+ * thus making it available for dequeuing.
+ */
+template<typename ActiveItem, typename Allocator = internal::default_allocator<ActiveItem>>
+class StagingQueue {
+  using Item = typename std::decay<decltype(std::declval<ActiveItem&>().commit())>::type;
+
+  static_assert(std::is_same<decltype(std::declval<const Allocator&>()(std::declval<size_t>())), ActiveItem>::value,
+      "Allocator::operator(size_t) must return an ActiveItem");
+  static_assert(std::is_same<decltype(std::declval<const Item&>().size()), size_t>::value,
+      "Item::size must return size_t");
+  static_assert(std::is_same<decltype(std::declval<const ActiveItem&>().size()), size_t>::value,
+      "ActiveItem::size must return size_t");
+
+  template<typename Functor, typename Arg, typename = void>
+  struct FunctorCallHelper;
+
+  template<typename Functor, typename Arg>
+  struct FunctorCallHelper<Functor, Arg, typename std::enable_if<std::is_same<decltype(std::declval<Functor>()(std::declval<Arg>())), bool>::value>::type> {
+    static bool call(Functor&& fn, Arg&& arg) {
+      return std::forward<Functor>(fn)(std::forward<Arg>(arg));
+    }
+  };
+
+  template<typename Functor, typename Arg>
+  struct FunctorCallHelper<Functor, Arg, typename std::enable_if<std::is_same<decltype(std::declval<Functor>()(std::declval<Arg>())), void>::value>::type> {
+    static bool call(Functor&& fn, Arg&& arg) {
+      std::forward<Functor>(fn)(std::forward<Arg>(arg));
+      return false;
+    }
+  };
+
+  static ActiveItem allocateActiveItem(const Allocator& allocator, size_t max_item_size) {
+    // max_size is a soft limit, i.e. reaching max_size is an indicator
+    // that that item should be committed, we cannot guarantee that only
+    // max_size content is in the item, since max_size is the "trigger limit",
+    // presumable each item would contain (at the trigger point) a little
+    // more than max_size content, that is the reasoning behind "* 3 / 2"
+    return allocator(max_item_size * 3 / 2);
+  }
+
+ public:
+  StagingQueue(size_t max_size, size_t max_item_size, Allocator allocator = {})
+    : max_size_(max_size),
+      max_item_size_(max_item_size),
+      active_item_(allocateActiveItem(allocator, max_item_size)),
+      allocator_(allocator) {}
+
+  void commit() {
+    std::unique_lock<std::mutex> lock{active_item_mutex_};
+    if (active_item_.size() == 0) {
+      // nothing to commit
+      return;
+    }
+    commit(lock);
+  }
+
+  /**
+   * Allows thread-safe modification of the "live" instance.
+   * @tparam Functor
+   * @param fn callable which can modify the instance, should return true
+   * if it would like to force a commit
+   */
+  template<typename Functor>
+  void modify(Functor&& fn) {
+    std::unique_lock<std::mutex> lock{active_item_mutex_};
+    size_t original_size = active_item_.size();
+    bool should_commit = FunctorCallHelper<Functor, ActiveItem&>::call(std::forward<Functor>(fn), active_item_);
+    size_t new_size = active_item_.size();
+    if (new_size >= original_size) {
+      total_size_ += new_size - original_size;
+    } else {
+      total_size_ -= original_size - new_size;
+    }
+    if (should_commit || new_size > max_item_size_) {
+      commit(lock);
+    }
+  }
+
+  template<class Rep, class Period>
+  bool tryDequeue(Item& out, const std::chrono::duration<Rep, Period>& time) {
+    if (time == std::chrono::duration<Rep, Period>{0}) {
+      return tryDequeue(out);
+    }
+    if (queue_.dequeueWaitFor(out, time)) {
+      total_size_ -= out.size();
+      return true;
+    }
+    return false;
+  }
+
+  bool tryDequeue(Item& out) {
+    if (queue_.tryDequeue(out)) {
+      total_size_ -= out.size();
+      return true;
+    }
+    return false;
+  }
+
+  void discardOverflow() {
+    while (total_size_ > max_size_) {
+      Item item;
+      if (!queue_.tryDequeue(item)) {
+        break;
+      }
+      total_size_ -= item.size();
+    }
+  }
+
+  size_t size() const {
+    return total_size_;
+  }
+
+ private:
+  void commit(std::unique_lock<std::mutex>& /*lock*/) {
+    queue_.enqueue(active_item_.commit());
+    active_item_ = allocateActiveItem(allocator_, max_item_size_);
+  }
+
+  const size_t max_size_;
+  const size_t max_item_size_;
+  std::atomic<size_t> total_size_{0};
+
+  std::mutex active_item_mutex_;
+  ActiveItem active_item_;
+
+  const Allocator allocator_;
+
+  ConditionConcurrentQueue<Item> queue_;
+};
+
+}  // namespace utils
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/utils/ValueParser.h b/libminifi/include/utils/ValueParser.h
index 6639165..8910096 100644
--- a/libminifi/include/utils/ValueParser.h
+++ b/libminifi/include/utils/ValueParser.h
@@ -137,6 +137,10 @@ class ValueParser {
     }
   }
 
+  std::string rest() const noexcept {
+    return str.substr(offset);
+  }
+
  private:
   /**
    *
diff --git a/libminifi/src/core/TypedValues.cpp b/libminifi/src/core/TypedValues.cpp
index 93a43b8..8306dec 100644
--- a/libminifi/src/core/TypedValues.cpp
+++ b/libminifi/src/core/TypedValues.cpp
@@ -16,8 +16,12 @@
  * limitations under the License.
  */
 
+#include <memory>
+
 #include "core/Property.h"
 #include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -27,6 +31,11 @@ namespace core {
 const  std::type_index DataSizeValue::type_id = typeid(uint64_t);
 const  std::type_index TimePeriodValue::type_id = typeid(uint64_t);
 
+std::shared_ptr<logging::Logger>& DataSizeValue::getLogger() {
+  static std::shared_ptr<logging::Logger> logger = logging::LoggerFactory<DataSizeValue>::getLogger();
+  return logger;
+}
+
 } /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp
index 2b89547..dfff263 100644
--- a/libminifi/src/core/logging/LoggerConfiguration.cpp
+++ b/libminifi/src/core/logging/LoggerConfiguration.cpp
@@ -27,12 +27,16 @@
 #include <memory>
 #include <map>
 #include <string>
+#include <atomic>
 
 #include "core/Core.h"
 #include "utils/StringUtils.h"
 #include "utils/ClassUtils.h"
 #include "utils/file/FileUtils.h"
 #include "utils/Environment.h"
+#include "core/logging/internal/LogCompressorSink.h"
+#include "utils/Literals.h"
+#include "core/TypedValues.h"
 
 #include "spdlog/spdlog.h"
 #include "spdlog/sinks/stdout_sinks.h"
@@ -60,6 +64,25 @@ namespace logging {
 
 const char* LoggerConfiguration::spdlog_default_pattern = "[%Y-%m-%d %H:%M:%S.%e] [%n] [%l] %v";
 
+utils::optional<spdlog::level::level_enum> parse_log_level(const std::string& level_name) {
+  if (utils::StringUtils::equalsIgnoreCase(level_name, "trace")) {
+    return spdlog::level::trace;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "debug")) {
+    return spdlog::level::debug;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "info")) {
+    return spdlog::level::info;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "warn")) {
+    return spdlog::level::warn;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "error")) {
+    return spdlog::level::err;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "critical")) {
+    return spdlog::level::critical;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "off")) {
+    return spdlog::level::off;
+  }
+  return {};
+}
+
 std::vector<std::string> LoggerProperties::get_keys_of_type(const std::string &type) {
   std::vector<std::string> appenders;
   std::string prefix = type + ".";
@@ -85,6 +108,7 @@ LoggerConfiguration::LoggerConfiguration()
 void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &logger_properties) {
   std::lock_guard<std::mutex> lock(mutex);
   root_namespace_ = initialize_namespaces(logger_properties);
+  initializeCompression(lock, logger_properties);
   std::string spdlog_pattern;
   if (!logger_properties->getString("spdlog.pattern", spdlog_pattern)) {
     spdlog_pattern = spdlog_default_pattern;
@@ -116,6 +140,10 @@ void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &lo
 
 std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string &name) {
   std::lock_guard<std::mutex> lock(mutex);
+  return getLogger(name, lock);
+}
+
+std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string &name, const std::lock_guard<std::mutex>& /*lock*/) {
   std::string adjusted_name = name;
   const std::string clazz = "class ";
   auto haz_clazz = name.find(clazz);
@@ -207,19 +235,9 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::initialize_names
       std::string level_name = utils::StringUtils::trim(segment);
       if (first) {
         first = false;
-        std::transform(level_name.begin(), level_name.end(), level_name.begin(), ::tolower);
-        if ("trace" == level_name) {
-          level = spdlog::level::trace;
-        } else if ("debug" == level_name) {
-          level = spdlog::level::debug;
-        } else if ("warn" == level_name) {
-          level = spdlog::level::warn;
-        } else if ("critical" == level_name) {
-          level = spdlog::level::critical;
-        } else if ("error" == level_name) {
-          level = spdlog::level::err;
-        } else if ("off" == level_name) {
-          level = spdlog::level::off;
+        auto opt_level = parse_log_level(level_name);
+        if (opt_level) {
+          level = *opt_level;
         }
       } else {
         sinks.push_back(sink_map[level_name]);
@@ -258,6 +276,7 @@ std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr<
   }
   std::shared_ptr<internal::LoggerNamespace> current_namespace = root_namespace;
   std::vector<std::shared_ptr<spdlog::sinks::sink>> sinks = root_namespace->sinks;
+  std::vector<std::shared_ptr<spdlog::sinks::sink>> inherited_sinks;
   spdlog::level::level_enum level = root_namespace->level;
   std::string current_namespace_str = "";
   std::string sink_namespace_str = "root";
@@ -268,6 +287,7 @@ std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr<
     if (child_pair == current_namespace->children.end()) {
       break;
     }
+    std::copy(current_namespace->exported_sinks.begin(), current_namespace->exported_sinks.end(), std::back_inserter(inherited_sinks));
     current_namespace = child_pair->second;
     if (current_namespace->sinks.size() > 0) {
       sinks = current_namespace->sinks;
@@ -283,6 +303,7 @@ std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr<
     const auto levelView(spdlog::level::to_string_view(level));
     logger->log_debug("%s logger got sinks from namespace %s and level %s from namespace %s", name, sink_namespace_str, std::string(levelView.begin(), levelView.end()), level_namespace_str);
   }
+  std::copy(inherited_sinks.begin(), inherited_sinks.end(), std::back_inserter(sinks));
   spdlogger = std::make_shared<spdlog::logger>(name, begin(sinks), end(sinks));
   spdlogger->set_level(level);
   spdlogger->set_formatter(formatter -> clone());
@@ -318,6 +339,14 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::create_default_r
   return result;
 }
 
+void LoggerConfiguration::initializeCompression(const std::lock_guard<std::mutex>& lock, const std::shared_ptr<LoggerProperties>& properties) {
+  auto compression_sink = compression_manager_.initialize(properties, logger_, [&] (const std::string& name) {return getLogger(name, lock);});
+  if (compression_sink) {
+    root_namespace_->sinks.push_back(compression_sink);
+    root_namespace_->exported_sinks.push_back(compression_sink);
+  }
+}
+
 } /* namespace logging */
 } /* namespace core */
 } /* namespace minifi */
diff --git a/libminifi/src/core/logging/internal/CompressionManager.cpp b/libminifi/src/core/logging/internal/CompressionManager.cpp
new file mode 100644
index 0000000..6347988
--- /dev/null
+++ b/libminifi/src/core/logging/internal/CompressionManager.cpp
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <mutex>
+
+#include "core/logging/internal/CompressionManager.h"
+#include "core/logging/internal/LogCompressorSink.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerProperties.h"
+#include "core/TypedValues.h"
+#include "core/Core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+std::shared_ptr<LogCompressorSink> CompressionManager::initialize(
+    const std::shared_ptr<LoggerProperties>& properties, const std::shared_ptr<Logger>& error_logger, const LoggerFactory& logger_factory) {
+  auto get_size = [&] (const char* const property_name) -> utils::optional<size_t> {
+    auto size_str = properties->getString(property_name);
+    if (!size_str) return {};
+    size_t value;
+    if (DataSizeValue::StringToInt(*size_str, value)) {
+      return value;
+    }
+    if (error_logger) {
+      error_logger->log_error("Invalid format for %s", property_name);
+    }
+    return {};
+  };
+  auto cached_log_max_size = get_size(compression_cached_log_max_size_).value_or(8_MiB);
+  auto compressed_log_max_size = get_size(compression_compressed_log_max_size_).value_or(8_MiB);
+  std::shared_ptr<internal::LogCompressorSink> sink;
+  if (cached_log_max_size != 0 && compressed_log_max_size != 0) {
+    sink = std::make_shared<internal::LogCompressorSink>(
+        LogQueueSize{cached_log_max_size, cache_segment_size},
+        LogQueueSize{compressed_log_max_size, compressed_segment_size},
+        logger_factory(getClassName<LogCompressorSink>()));
+  }
+  {
+    // gcc4.8 bug => cannot use std::atomic_store
+    std::lock_guard<std::mutex> lock(mtx_);
+    sink_ = sink;
+  }
+  return sink;
+}
+
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/src/core/TypedValues.cpp b/libminifi/src/core/logging/internal/LogCompressor.cpp
similarity index 56%
copy from libminifi/src/core/TypedValues.cpp
copy to libminifi/src/core/logging/internal/LogCompressor.cpp
index 93a43b8..4dd23c0 100644
--- a/libminifi/src/core/TypedValues.cpp
+++ b/libminifi/src/core/logging/internal/LogCompressor.cpp
@@ -16,19 +16,32 @@
  * limitations under the License.
  */
 
-#include "core/Property.h"
-#include "core/TypedValues.h"
+#include "core/logging/internal/LogCompressor.h"
+#include "core/logging/LoggerConfiguration.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace core {
+namespace logging {
+namespace internal {
+
+LogCompressor::LogCompressor(gsl::not_null<OutputStream *> output, std::shared_ptr<logging::Logger> logger)
+    : ZlibCompressStream(output, io::ZlibCompressionFormat::GZIP, Z_DEFAULT_COMPRESSION, std::move(logger)) {}
+
+LogCompressor::FlushResult LogCompressor::flush() {
+  if (write(nullptr, 0, Z_SYNC_FLUSH) == 0) {
+    return FlushResult::Success;
+  }
+  return FlushResult::Error;
+}
 
-const  std::type_index DataSizeValue::type_id = typeid(uint64_t);
-const  std::type_index TimePeriodValue::type_id = typeid(uint64_t);
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
 
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/libminifi/src/core/logging/internal/LogCompressorSink.cpp b/libminifi/src/core/logging/internal/LogCompressorSink.cpp
new file mode 100644
index 0000000..112e6ae
--- /dev/null
+++ b/libminifi/src/core/logging/internal/LogCompressorSink.cpp
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/internal/LogCompressorSink.h"
+#include "spdlog/details/log_msg.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+LogCompressorSink::LogCompressorSink(LogQueueSize cache_size, LogQueueSize compressed_size, std::shared_ptr<logging::Logger> logger)
+  : cached_logs_(cache_size.max_total_size, cache_size.max_segment_size),
+    compressed_logs_(compressed_size.max_total_size, compressed_size.max_segment_size, ActiveCompressor::Allocator{std::move(logger)}) {
+  compression_thread_ = std::thread{&LogCompressorSink::run, this};
+}
+
+LogCompressorSink::~LogCompressorSink() {
+  running_ = false;
+  compression_thread_.join();
+}
+
+void LogCompressorSink::sink_it_(const spdlog::details::log_msg &msg) {
+  cached_logs_.modify([&] (LogBuffer& active) {
+    active.buffer_->write(reinterpret_cast<const uint8_t*>(msg.payload.data()), msg.payload.size());
+  });
+}
+
+void LogCompressorSink::run() {
+  while (running_) {
+    cached_logs_.discardOverflow();
+    compressed_logs_.discardOverflow();
+    if (compress() == CompressionResult::NothingToCompress) {
+      std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    }
+  }
+}
+
+LogCompressorSink::CompressionResult LogCompressorSink::compress(bool force_rotation) {
+  LogBuffer log_cache;
+  if (!cached_logs_.tryDequeue(log_cache)) {
+    if (force_rotation) {
+      compressed_logs_.commit();
+    }
+    return CompressionResult::NothingToCompress;
+  }
+  compressed_logs_.modify([&] (ActiveCompressor& compressor) {
+    compressor.compressor_->write(log_cache.buffer_->getBuffer(), log_cache.buffer_->size());
+    compressor.compressor_->flush();
+    return force_rotation;
+  });
+  return CompressionResult::Success;
+}
+
+void LogCompressorSink::flush_() {}
+
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/src/io/ZlibStream.cpp b/libminifi/src/io/ZlibStream.cpp
index 711a31d..9a9aa2f 100644
--- a/libminifi/src/io/ZlibStream.cpp
+++ b/libminifi/src/io/ZlibStream.cpp
@@ -19,6 +19,7 @@
 #include "io/ZlibStream.h"
 #include "Exception.h"
 #include "utils/gsl.h"
+#include "core/logging/LoggerConfiguration.h"
 
 namespace org {
 namespace apache {
@@ -39,7 +40,11 @@ bool ZlibBaseStream::isFinished() const {
 }
 
 ZlibCompressStream::ZlibCompressStream(gsl::not_null<OutputStream*> output, ZlibCompressionFormat format, int level)
-  : ZlibBaseStream(output) {
+  : ZlibCompressStream(output, format, level, logging::LoggerFactory<ZlibCompressStream>::getLogger()) {}
+
+ZlibCompressStream::ZlibCompressStream(gsl::not_null<OutputStream*> output, ZlibCompressionFormat format, int level, std::shared_ptr<logging::Logger> logger)
+  : ZlibBaseStream(output),
+    logger_{std::move(logger)} {
   int ret = deflateInit2(
       &strm_,
       level,
@@ -57,11 +62,22 @@ ZlibCompressStream::ZlibCompressStream(gsl::not_null<OutputStream*> output, Zlib
 
 ZlibCompressStream::~ZlibCompressStream() {
   if (state_ != ZlibStreamState::UNINITIALIZED) {
-    deflateEnd(&strm_);
+    int result = deflateEnd(&strm_);
+    if (result == Z_DATA_ERROR) {
+      logger_->log_debug("Stream was freed prematurely");
+    } else if (result == Z_STREAM_ERROR) {
+      logger_->log_debug("Stream state was inconsistent");
+    } else if (result != Z_OK) {
+      logger_->log_debug("Unknown error while finishing compression %d", result);
+    }
   }
 }
 
-size_t ZlibCompressStream::write(const uint8_t* value, size_t size) {
+size_t ZlibCompressStream::write(const uint8_t *value, size_t size) {
+  return write(value, size, Z_NO_FLUSH);
+}
+
+size_t ZlibCompressStream::write(const uint8_t* value, size_t size, FlushMode mode) {
   if (state_ != ZlibStreamState::INITIALIZED) {
     logger_->log_error("writeData called in invalid ZlibCompressStream state, state is %hhu", state_);
     return STREAM_ERROR;
@@ -84,10 +100,9 @@ size_t ZlibCompressStream::write(const uint8_t* value, size_t size) {
     strm_.next_out = outputBuffer_.data();
     strm_.avail_out = gsl::narrow<uInt>(outputBuffer_.size());
 
-    int flush = value == nullptr ? Z_FINISH : Z_NO_FLUSH;
-    logger_->log_trace("calling deflate with flush %d", flush);
+    logger_->log_trace("calling deflate with flush %d", mode);
 
-    int ret = deflate(&strm_, flush);
+    int ret = deflate(&strm_, mode);
     if (ret == Z_STREAM_ERROR) {
       logger_->log_error("deflate failed, error code: %d", ret);
       state_ = ZlibStreamState::ERRORED;
@@ -107,14 +122,15 @@ size_t ZlibCompressStream::write(const uint8_t* value, size_t size) {
 
 void ZlibCompressStream::close() {
   if (state_ == ZlibStreamState::INITIALIZED) {
-    if (write(nullptr, 0U) == 0) {
+    if (write(nullptr, 0U, Z_FINISH) == 0) {
       state_ = ZlibStreamState::FINISHED;
     }
   }
 }
 
 ZlibDecompressStream::ZlibDecompressStream(gsl::not_null<OutputStream*> output, ZlibCompressionFormat format)
-    : ZlibBaseStream(output) {
+    : ZlibBaseStream(output),
+      logger_{logging::LoggerFactory<ZlibDecompressStream>::getLogger()} {
   int ret = inflateInit2(&strm_, 15 + (format == ZlibCompressionFormat::GZIP ? 16 : 0) /* windowBits */);
   if (ret != Z_OK) {
     logger_->log_error("Failed to initialize z_stream with inflateInit2, error code: %d", ret);
@@ -126,7 +142,12 @@ ZlibDecompressStream::ZlibDecompressStream(gsl::not_null<OutputStream*> output,
 
 ZlibDecompressStream::~ZlibDecompressStream() {
   if (state_ != ZlibStreamState::UNINITIALIZED) {
-    inflateEnd(&strm_);
+    int result = inflateEnd(&strm_);
+    if (result == Z_STREAM_ERROR) {
+      logger_->log_error("Stream state was inconsistent");
+    } else if (result != Z_OK) {
+      logger_->log_error("Unknown error while finishing decompression %d", result);
+    }
   }
 }
 
diff --git a/libminifi/test/unit/LoggerTests.cpp b/libminifi/test/unit/LoggerTests.cpp
index e9db9a7..8750df7 100644
--- a/libminifi/test/unit/LoggerTests.cpp
+++ b/libminifi/test/unit/LoggerTests.cpp
@@ -22,6 +22,9 @@
 #include <ctime>
 #include "../TestBase.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "io/ZlibStream.h"
+#include "StreamPipe.h"
+#include "utils/IntegrationTestUtils.h"
 
 TEST_CASE("Test log Levels", "[ttl1]") {
   LogTestController::getInstance().setTrace<logging::Logger>();
@@ -76,7 +79,7 @@ TEST_CASE("Test log Levels change", "[ttl5]") {
 namespace single {
 class TestClass {
 };
-}
+}  // namespace single
 
 class TestClass2 {
 };
@@ -107,3 +110,95 @@ TEST_CASE("Test ShortenNames", "[ttl6]") {
   LogTestController::getInstance(props)->reset();
   LogTestController::getInstance().reset();
 }
+
+using namespace minifi::io;
+
+std::string decompress(const std::shared_ptr<InputStream>& input) {
+  auto output = utils::make_unique<BufferStream>();
+  auto decompressor = std::make_shared<ZlibDecompressStream>(gsl::make_not_null(output.get()));
+  minifi::internal::pipe(input, decompressor);
+  decompressor->close();
+  return std::string{reinterpret_cast<const char*>(output->getBuffer()), output->size()};
+}
+
+TEST_CASE("Test Compression", "[ttl7]") {
+  auto& log_config = logging::LoggerConfiguration::getConfiguration();
+  auto properties = std::make_shared<logging::LoggerProperties>();
+  std::string className;
+  SECTION("Using root logger") {
+    className = "CompressionTestClassUsingRoot";
+    // by default the root logger is OFF
+    properties->set("logger.root", "INFO");
+  }
+  SECTION("Inherit compression sink") {
+    className = "CompressionTestClassInheriting";
+    properties->set("appender.null", "null");
+    properties->set("logger." + className, "INFO,null");
+  }
+  log_config.initialize(properties);
+  auto logger = log_config.getLogger(className);
+  logger->log_error("Hi there");
+  std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
+  REQUIRE(compressed_log);
+  auto logs = decompress(compressed_log);
+  REQUIRE(logs == "Hi there");
+}
+
+class LoggerTestAccessor {
+ public:
+  static void setCompressionCacheSegmentSize(logging::LoggerConfiguration& log_config, size_t value) {
+    log_config.compression_manager_.cache_segment_size = value;
+  }
+  static void setCompressionCompressedSegmentSize(logging::LoggerConfiguration& log_config, size_t value) {
+    log_config.compression_manager_.compressed_segment_size = value;
+  }
+  static size_t getUncompressedSize(logging::LoggerConfiguration& log_config) {
+    return log_config.compression_manager_.getSink()->cached_logs_.size();
+  }
+  static size_t getCompressedSize(logging::LoggerConfiguration& log_config) {
+    return log_config.compression_manager_.getSink()->compressed_logs_.size();
+  }
+};
+
+TEST_CASE("Test Compression cache overflow is discarded intermittently", "[ttl8]") {
+  auto& log_config = logging::LoggerConfiguration::getConfiguration();
+  auto properties = std::make_shared<logging::LoggerProperties>();
+  properties->set(logging::internal::CompressionManager::compression_cached_log_max_size_, "10 KB");
+  LoggerTestAccessor::setCompressionCacheSegmentSize(log_config, 1_KiB);
+  std::string className = "CompressionTestCacheCleaned";
+  // by default the root logger is OFF
+  properties->set("logger.root", "INFO");
+  log_config.initialize(properties);
+  auto logger = log_config.getLogger(className);
+  for (size_t idx = 0; idx < 10000; ++idx) {
+    logger->log_error("Hi there");
+  }
+  bool cache_shrunk = utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, [&] {
+    return LoggerTestAccessor::getUncompressedSize(log_config) <= 10_KiB;
+  });
+  REQUIRE(cache_shrunk);
+}
+
+TEST_CASE("Setting either properties to 0 disables in-memory compressed logs", "[ttl9]") {
+  auto& log_config = logging::LoggerConfiguration::getConfiguration();
+  auto properties = std::make_shared<logging::LoggerProperties>();
+  bool is_nullptr = false;
+  SECTION("Cached log size is set to 0") {
+    is_nullptr = true;
+    properties->set(logging::internal::CompressionManager::compression_cached_log_max_size_, "0");
+  }
+  SECTION("Compressed log size is set to 0") {
+    is_nullptr = true;
+    properties->set(logging::internal::CompressionManager::compression_compressed_log_max_size_, "0");
+  }
+  SECTION("Sanity check") {
+    is_nullptr = false;
+    // pass
+  }
+  // by default the root logger is OFF
+  properties->set("logger.root", "INFO");
+  log_config.initialize(properties);
+  auto logger = log_config.getLogger("DisableCompressionTestLogger");
+  logger->log_error("Hi there");
+  REQUIRE((logging::LoggerConfiguration::getCompressedLog(true) == nullptr) == is_nullptr);
+}
diff --git a/libminifi/test/unit/StagingQueueTests.cpp b/libminifi/test/unit/StagingQueueTests.cpp
new file mode 100644
index 0000000..9391458
--- /dev/null
+++ b/libminifi/test/unit/StagingQueueTests.cpp
@@ -0,0 +1,121 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+
+#include "utils/StringUtils.h"
+#include "../TestBase.h"
+#include "utils/StagingQueue.h"
+
+using org::apache::nifi::minifi::utils::StagingQueue;
+
+class MockItem {
+ public:
+  static MockItem allocate(size_t max_size) {
+    MockItem instance;
+    instance.data_.reserve(max_size * 3 / 2);
+    return instance;
+  }
+
+  MockItem commit() {
+    return std::move(*this);
+  }
+
+  size_t size() const {
+    return data_.size();
+  }
+
+  std::string data_;
+};
+
+TEST_CASE("Construct queue", "[TestStagingQueue1]") {
+  StagingQueue<MockItem> queue(30, 10);
+  REQUIRE(queue.size() == 0);
+}
+
+TEST_CASE("Modify no commit", "[TestStagingQueue2]") {
+  StagingQueue<MockItem> queue(30, 10);
+  queue.modify([] (MockItem& item) {
+    item.data_ += "12345";
+  });
+  REQUIRE(queue.size() == 5);
+  SECTION("Decrease size") {
+    queue.modify([] (MockItem& item) {
+      REQUIRE(item.data_ == "12345");
+      item.data_ = "";
+    });
+    REQUIRE(queue.size() == 0);
+  }
+  MockItem out;
+  REQUIRE(!queue.tryDequeue(out));
+}
+
+TEST_CASE("Modify and commit", "[TestStagingQueue3]") {
+  StagingQueue<MockItem> queue(30, 10);
+  queue.modify([] (MockItem& item) {
+    item.data_ += "12345";
+  });
+  queue.commit();
+  SECTION("Commit is idempotent if there is no modification between") {
+    queue.commit();
+  }
+  REQUIRE(queue.size() == 5);
+  MockItem out;
+  REQUIRE(queue.tryDequeue(out));
+  REQUIRE(out.data_ == "12345");
+  REQUIRE(queue.size() == 0);
+}
+
+TEST_CASE("Modify and overflow triggered automatic commit", "[TestStagingQueue4]") {
+  StagingQueue<MockItem> queue(30, 10);
+  queue.modify([] (MockItem& item) {
+    item.data_ += "123456789ab";
+  });
+  SECTION("Explicit commit makes no difference") {
+    queue.commit();
+  }
+  queue.modify([] (MockItem& item) {
+    // a new item has been allocated
+    REQUIRE(item.data_ == "");
+  });
+  REQUIRE(queue.size() == 11);
+  MockItem out;
+  REQUIRE(queue.tryDequeue(out));
+  REQUIRE(out.data_ == "123456789ab");
+  REQUIRE(queue.size() == 0);
+}
+
+TEST_CASE("Discard overflow", "[TestStagingQueue5]") {
+  StagingQueue<MockItem> queue(30, 10);
+  for (size_t idx = 0; idx < 5; ++idx) {
+    queue.modify([&] (MockItem& item) {
+      item.data_ = utils::StringUtils::repeat(std::to_string(idx), 10);
+    });
+    queue.commit();
+  }
+  REQUIRE(queue.size() == 50);
+  queue.discardOverflow();
+  REQUIRE(queue.size() == 30);
+  MockItem out;
+  // idx 0 and 1 have been discarded
+  for (size_t idx = 2; idx < 5; ++idx) {
+    REQUIRE(queue.tryDequeue(out));
+    REQUIRE(out.data_ == utils::StringUtils::repeat(std::to_string(idx), 10));
+  }
+  REQUIRE(queue.size() == 0);
+}

Mime
View raw message