nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From phroc...@apache.org
Subject nifi-minifi-cpp git commit: MINIFICPP-264: CompressContent Processor
Date Thu, 26 Oct 2017 14:15:56 GMT
Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 72b9a0e1a -> 9af7faab4


MINIFICPP-264: CompressContent Processor

This closes #151.

Signed-off-by: Marc Parisi <phrocker@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/9af7faab
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/9af7faab
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/9af7faab

Branch: refs/heads/master
Commit: 9af7faab452c66a77e9a0623469369c2e1a76478
Parents: 72b9a0e
Author: Bin Qiu <benqiu2016@gmail.com>
Authored: Mon Oct 23 20:17:01 2017 -0700
Committer: Marc Parisi <phrocker@apache.org>
Committed: Thu Oct 26 10:10:15 2017 -0400

----------------------------------------------------------------------
 CMakeLists.txt                                  |   2 +-
 README.md                                       |   1 +
 extensions/libarchive/ArchiveLoader.h           |   7 +-
 extensions/libarchive/CompressContent.cpp       | 173 ++++
 extensions/libarchive/CompressContent.h         | 381 ++++++++
 extensions/rocksdb-repos/RocksDbStream.h        |   2 +-
 libminifi/include/io/AtomicEntryStream.h        |   2 +-
 libminifi/include/io/BaseStream.h               |   2 +-
 libminifi/include/io/ClientSocket.h             |   2 +-
 libminifi/include/io/DataStream.h               |   4 +-
 libminifi/include/io/FileStream.h               |   2 +-
 libminifi/src/io/FileStream.cpp                 |   3 +-
 .../test/archive-tests/CompressContentTests.cpp | 933 +++++++++++++++++++
 13 files changed, 1504 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 756be29..e28bae7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -130,7 +130,7 @@ endif()
 ## ENABLE FEATURE INFORMATION
 ADD_FEATURE_INFO("HTTP CURL" HTTP-CURL "This enables RESTProtocol, InvokeHTTP, and the HTTPClient for Site to Site")
 ADD_FEATURE_INFO("ROCKSDB REPOS" ROCKSDB-REPOS "This Enables persistent provenance, flowfile, and content repositories using RocksDB")
-ADD_FEATURE_INFO("ARCHIVE EXTENSIONS" ARCHIVE-EXTENSIONS "This Enables libarchive functionality including MergeContent")
+ADD_FEATURE_INFO("ARCHIVE EXTENSIONS" ARCHIVE-EXTENSIONS "This Enables libarchive functionality including MergeContent, CompressContent")
 
 ## NOW WE CAN ADD LIBRARIES AND EXTENSIONS TO MAIN
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 8f8af9c..1f7f269 100644
--- a/README.md
+++ b/README.md
@@ -58,6 +58,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a
   * TailFile
   * MergeContent
   * ExtractText
+  * CompressContent
 * Provenance events generation is supported and are persisted using RocksDB.
 
 ## System Requirements

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/extensions/libarchive/ArchiveLoader.h
----------------------------------------------------------------------
diff --git a/extensions/libarchive/ArchiveLoader.h b/extensions/libarchive/ArchiveLoader.h
index c7b1e36..e407483 100644
--- a/extensions/libarchive/ArchiveLoader.h
+++ b/extensions/libarchive/ArchiveLoader.h
@@ -19,6 +19,7 @@
 #define EXTENSION_ARCHIVELOADER_H
 
 #include "MergeContent.h"
+#include "CompressContent.h"
 #include "core/ClassLoader.h"
 
 class __attribute__((visibility("default"))) ArchiveFactory : public core::ObjectFactory {
@@ -45,6 +46,7 @@ class __attribute__((visibility("default"))) ArchiveFactory : public core::Objec
   virtual std::vector<std::string> getClassNames() {
     std::vector<std::string> class_names;
     class_names.push_back("MergeContent");
+    class_names.push_back("CompressContent");
     return class_names;
   }
 
@@ -53,8 +55,11 @@ class __attribute__((visibility("default"))) ArchiveFactory : public core::Objec
     std::transform(name.begin(), name.end(), name.begin(), ::tolower);
     if (name == "mergecontent") {
       return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::MergeContent>());
+    } else if (name == "compresscontent") {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::CompressContent>());
+    } else {
+      return nullptr;
     }
-    return nullptr;
   }
 
   static bool added;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/extensions/libarchive/CompressContent.cpp
----------------------------------------------------------------------
diff --git a/extensions/libarchive/CompressContent.cpp b/extensions/libarchive/CompressContent.cpp
new file mode 100644
index 0000000..4f250e9
--- /dev/null
+++ b/extensions/libarchive/CompressContent.cpp
@@ -0,0 +1,173 @@
+/**
+ * @file CompressContent.cpp
+ * CompressContent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "CompressContent.h"
+#include <stdio.h>
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <map>
+#include <set>
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property CompressContent::CompressLevel("Compression Level", "The compression level to use; this is valid only when using GZIP compression.", "1");
+core::Property CompressContent::CompressMode("Mode", "Indicates whether the processor should compress content or decompress content.", MODE_COMPRESS);
+core::Property CompressContent::CompressFormat("Compression Format", "The compression format to use.", COMPRESSION_FORMAT_ATTRIBUTE);
+core::Property CompressContent::UpdateFileName("Update Filename", "Determines if filename extension need to be updated", "false");
+core::Relationship CompressContent::Success("success", "FlowFiles will be transferred to the success relationship after successfully being compressed or decompressed");
+core::Relationship CompressContent::Failure("failure", "FlowFiles will be transferred to the failure relationship if they fail to compress/decompress");
+
+void CompressContent::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(CompressLevel);
+  properties.insert(CompressMode);
+  properties.insert(CompressFormat);
+  properties.insert(UpdateFileName);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Failure);
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+void CompressContent::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  if (context->getProperty(CompressLevel.getName(), value) && !value.empty()) {
+    core::Property::StringToInt(value, compressLevel_);
+  }
+  value = "";
+  if (context->getProperty(CompressMode.getName(), value) && !value.empty()) {
+    this->compressMode_ = value;
+  }
+  value = "";
+  if (context->getProperty(CompressFormat.getName(), value) && !value.empty()) {
+    this->compressFormat_ = value;
+  }
+  value = "";
+  if (context->getProperty(UpdateFileName.getName(), value) && !value.empty()) {
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, updateFileName_);
+  }
+  logger_->log_info("Compress Content: Mode [%s] Format [%s] Level [%d] UpdateFileName [%d]", compressMode_, compressFormat_, compressLevel_, updateFileName_);
+  // update the mimeTypeMap
+  compressionFormatMimeTypeMap_["application/gzip"] = COMPRESSION_FORMAT_GZIP;
+  compressionFormatMimeTypeMap_["application/bzip2"] = COMPRESSION_FORMAT_BZIP2;
+  compressionFormatMimeTypeMap_["application/x-bzip2"] = COMPRESSION_FORMAT_BZIP2;
+  compressionFormatMimeTypeMap_["application/x-lzma"] = COMPRESSION_FORMAT_LZMA;
+  compressionFormatMimeTypeMap_["application/x-xz"] = COMPRESSION_FORMAT_XZ_LZMA2;
+  fileExtension_[COMPRESSION_FORMAT_GZIP] = ".gz";
+  fileExtension_[COMPRESSION_FORMAT_LZMA] = ".lzma";
+  fileExtension_[COMPRESSION_FORMAT_BZIP2] = ".bz2";
+  fileExtension_[COMPRESSION_FORMAT_XZ_LZMA2] = ".xz";
+}
+
+void CompressContent::onTrigger(std::shared_ptr<core::ProcessContext> context, std::shared_ptr<core::ProcessSession> session) {
+  std::shared_ptr<core::FlowFile> flowFile = session->get();
+
+  if (!flowFile) {
+    return;
+  }
+
+  std::string compressFormat = compressFormat_;
+  if (compressFormat_ == COMPRESSION_FORMAT_ATTRIBUTE) {
+    std::string attr;
+    flowFile->getAttribute(FlowAttributeKey(MIME_TYPE), attr);
+    if (attr.empty()) {
+      logger_->log_error("No %s attribute existed for the flow, route to failure", FlowAttributeKey(MIME_TYPE));
+      session->transfer(flowFile, Failure);
+      return;
+    }
+    auto search = compressionFormatMimeTypeMap_.find(attr);
+    if (search != compressionFormatMimeTypeMap_.end()) {
+      compressFormat = search->second;
+    } else {
+      logger_->log_info("Mime type of %s is not indicated a support format, route to success", attr);
+      session->transfer(flowFile, Success);
+      return;
+    }
+  }
+  std::transform(compressFormat.begin(), compressFormat.end(), compressFormat.begin(), ::tolower);
+  std::string mimeType;
+  if (compressFormat == COMPRESSION_FORMAT_GZIP) {
+    mimeType = "application/gzip";
+  } else if (compressFormat == COMPRESSION_FORMAT_BZIP2) {
+    mimeType = "application/bzip2";
+  } else if (compressFormat == COMPRESSION_FORMAT_LZMA) {
+    mimeType = "application/x-lzma";
+  } else if (compressFormat == COMPRESSION_FORMAT_XZ_LZMA2) {
+    mimeType = "application/x-xz";
+  } else {
+    logger_->log_error("Compress format is invalid %s", compressFormat);
+    session->transfer(flowFile, Failure);
+    return;
+  }
+
+  std::string fileExtension;
+  auto search = fileExtension_.find(compressFormat);
+  if (search != fileExtension_.end()) {
+    fileExtension = search->second;
+  }
+  std::shared_ptr<core::FlowFile> processFlowFile = session->create(flowFile);
+  CompressContent::WriteCallback callback(compressMode_, compressLevel_, compressFormat, flowFile, session);
+  session->write(processFlowFile, &callback);
+
+  if (callback.status_ < 0) {
+    logger_->log_error("Compress Content processing fail for the flow with UUID %s", flowFile->getUUIDStr());
+    session->transfer(flowFile, Failure);
+    session->remove(processFlowFile);
+  } else {
+    std::string fileName;
+    processFlowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+    if (compressMode_ == MODE_COMPRESS) {
+      session->putAttribute(processFlowFile, FlowAttributeKey(MIME_TYPE), mimeType);
+      if (updateFileName_) {
+        fileName = fileName + fileExtension;
+        session->putAttribute(processFlowFile, FlowAttributeKey(FILENAME), fileName);
+      }
+    } else {
+      session->removeAttribute(processFlowFile, FlowAttributeKey(MIME_TYPE));
+      if (updateFileName_) {
+        if (fileName.size() >= fileExtension.size() &&
+            fileName.compare(fileName.size() - fileExtension.size(), fileExtension.size(), fileExtension) == 0) {
+          fileName = fileName.substr(0, fileName.size() - fileExtension.size());
+          session->putAttribute(processFlowFile, FlowAttributeKey(FILENAME), fileName);
+        }
+      }
+    }
+    logger_->log_debug("Compress Content processing success for the flow with UUID %s name %s", processFlowFile->getUUIDStr(), fileName);
+    session->transfer(processFlowFile, Success);
+    session->remove(flowFile);
+  }
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/extensions/libarchive/CompressContent.h
----------------------------------------------------------------------
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
new file mode 100644
index 0000000..b34aee7
--- /dev/null
+++ b/extensions/libarchive/CompressContent.h
@@ -0,0 +1,381 @@
+/**
+ * @file CompressContent.h
+ * CompressContent class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __COMPRESS_CONTENT_H__
+#define __COMPRESS_CONTENT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "archive_entry.h"
+#include "archive.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#define COMPRESSION_FORMAT_ATTRIBUTE "use mime.type attribute"
+#define COMPRESSION_FORMAT_GZIP "gzip"
+#define COMPRESSION_FORMAT_BZIP2 "bzip2"
+#define COMPRESSION_FORMAT_XZ_LZMA2 "xz-lzma2"
+#define COMPRESSION_FORMAT_LZMA "lzma"
+
+#define MODE_COMPRESS "compress"
+#define MODE_DECOMPRESS "decompress"
+
+// CompressContent Class
+class CompressContent: public core::Processor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit CompressContent(std::string name, uuid_t uuid = NULL) :
+      core::Processor(name, uuid), logger_(logging::LoggerFactory<CompressContent>::getLogger()), updateFileName_(false) {
+  }
+  // Destructor
+  virtual ~CompressContent() {
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "CompressContent";
+  // Supported Properties
+  static core::Property CompressMode;
+  static core::Property CompressLevel;
+  static core::Property CompressFormat;
+  static core::Property UpdateFileName;
+
+  // Supported Relationships
+  static core::Relationship Failure;
+  static core::Relationship Success;
+
+public:
+  // Nest Callback Class for read stream from flow for compress
+  class ReadCallbackCompress: public InputStreamCallback {
+  public:
+    ReadCallbackCompress(std::shared_ptr<core::FlowFile> &flow, struct archive *arch, struct archive_entry *entry) :
+        flow_(flow), arch_(arch), entry_(entry), status_(0), logger_(logging::LoggerFactory<CompressContent>::getLogger()) {
+    }
+    ~ReadCallbackCompress() {
+    }
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      int max_read = getpagesize();
+      uint8_t buffer[max_read];
+      int64_t ret = 0;
+      uint64_t read_size = 0;
+
+      ret = archive_write_header(arch_, entry_);
+      if (ret != ARCHIVE_OK) {
+        logger_->log_error("Compress Content archive error %s", archive_error_string(arch_));
+        status_ = -1;
+        return -1;
+      }
+      while (read_size < flow_->getSize()) {
+        ret = stream->read(buffer, sizeof(buffer));
+        if (ret < 0) {
+          status_ = -1;
+          return -1;
+        }
+        if (ret > 0) {
+          ret = archive_write_data(arch_, buffer, ret);
+          if (ret < 0) {
+            logger_->log_error("Compress Content archive error %s", archive_error_string(arch_));
+            status_ = -1;
+            return -1;
+          }
+          read_size += ret;
+        } else {
+          break;
+        }
+      }
+      return read_size;
+    }
+    std::shared_ptr<core::FlowFile> flow_;
+    struct archive *arch_;
+    struct archive_entry *entry_;
+    int status_;
+    std::shared_ptr<logging::Logger> logger_;
+  };
+  // Nest Callback Class for read stream from flow for decompress
+  class ReadCallbackDecompress: public InputStreamCallback {
+  public:
+    ReadCallbackDecompress(std::shared_ptr<core::FlowFile> &flow) :
+        read_size_(0), offset_(0), flow_(flow) {
+      origin_offset_ = flow_->getOffset();
+    }
+    ~ReadCallbackDecompress() {
+    }
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      read_size_ = 0;
+      stream->seek(offset_);
+      int readRet = stream->read(buffer_, sizeof(buffer_));
+      read_size_ = readRet;
+      if (readRet > 0) {
+        offset_ += read_size_;
+      }
+      return readRet;
+    }
+    int64_t read_size_;
+    uint8_t buffer_[8192];
+    uint64_t offset_;
+    uint64_t origin_offset_;
+    std::shared_ptr<core::FlowFile> flow_;
+  };
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+    WriteCallback(std::string &compress_mode, int64_t compress_level, std::string &compress_format,
+        std::shared_ptr<core::FlowFile> &flow, std::shared_ptr<core::ProcessSession> &session) :
+        compress_mode_(compress_mode), compress_level_(compress_level), compress_format_(compress_format),
+        flow_(flow), session_(session),
+        logger_(logging::LoggerFactory<CompressContent>::getLogger()),
+        readDecompressCb_(flow) {
+      size_ = 0;
+      stream_ = nullptr;
+      status_ = 0;
+    }
+    ~WriteCallback() {
+    }
+
+    std::string compress_mode_;
+    int64_t compress_level_;
+    std::string compress_format_;
+    std::shared_ptr<core::FlowFile> flow_;
+    std::shared_ptr<core::ProcessSession> session_;
+    std::shared_ptr<io::BaseStream> stream_;
+    int64_t size_;
+    std::shared_ptr<logging::Logger> logger_;
+    CompressContent::ReadCallbackDecompress readDecompressCb_;
+    int status_;
+
+    static la_ssize_t archive_write(struct archive *arch, void *context, const void *buff, size_t size) {
+      WriteCallback *callback = (WriteCallback *) context;
+      la_ssize_t ret = callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), size);
+      if (ret > 0)
+        callback->size_ += (int64_t) ret;
+      return ret;
+    }
+
+    static ssize_t archive_read(struct archive *arch, void *context, const void **buff) {
+      WriteCallback *callback = (WriteCallback *) context;
+      callback->session_->read(callback->flow_, &callback->readDecompressCb_);
+      if (callback->readDecompressCb_.read_size_ >= 0) {
+        *buff = callback->readDecompressCb_.buffer_;
+        return callback->readDecompressCb_.read_size_;
+      } else {
+        archive_set_error(arch, EIO, "Error reading flowfile");
+        return -1;
+      }
+    }
+
+    static la_int64_t archive_skip(struct archive *a, void *client_data, la_int64_t request) {
+      return 0;
+    }
+
+    void archive_write_log_error_cleanup(struct archive *arch) {
+      logger_->log_error("Compress Content archive write error %s", archive_error_string(arch));
+      status_ = -1;
+      archive_write_free(arch);
+    }
+
+    void archive_read_log_error_cleanup(struct archive *arch) {
+      logger_->log_error("Compress Content archive read error %s", archive_error_string(arch));
+      status_ = -1;
+      archive_read_free(arch);
+    }
+
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      int64_t ret = 0;
+      struct archive *arch;
+      int r;
+
+      if (compress_mode_ == MODE_COMPRESS) {
+        arch = archive_write_new();
+        if (!arch) {
+          status_ = -1;
+          return -1;
+        }
+        r = archive_write_set_format_ustar(arch);
+        if (r != ARCHIVE_OK) {
+          archive_write_log_error_cleanup(arch);
+          return -1;
+        }
+        if (compress_format_ == COMPRESSION_FORMAT_GZIP) {
+          r = archive_write_add_filter_gzip(arch);
+          if (r != ARCHIVE_OK) {
+            archive_write_log_error_cleanup(arch);
+            return -1;
+          }
+          std::string option;
+          option = "gzip:compression-level=" + std::to_string((int) compress_level_);
+          r = archive_write_set_options(arch, option.c_str());
+          if (r != ARCHIVE_OK) {
+            archive_write_log_error_cleanup(arch);
+            return -1;
+          }
+        } else if (compress_format_ == COMPRESSION_FORMAT_BZIP2) {
+          r = archive_write_add_filter_bzip2(arch);
+          if (r != ARCHIVE_OK) {
+            archive_write_log_error_cleanup(arch);
+            return -1;
+          }
+        } else if (compress_format_ == COMPRESSION_FORMAT_LZMA) {
+          r = archive_write_add_filter_lzma(arch);
+          if (r != ARCHIVE_OK) {
+            archive_write_log_error_cleanup(arch);
+            return -1;
+          }
+        } else if (compress_format_ == COMPRESSION_FORMAT_XZ_LZMA2) {
+          r = archive_write_add_filter_xz(arch);
+          if (r != ARCHIVE_OK) {
+            archive_write_log_error_cleanup(arch);
+            return -1;
+          }
+        } else {
+            archive_write_log_error_cleanup(arch);
+            return -1;
+        }
+        r = archive_write_set_bytes_per_block(arch, 0);
+        if (r != ARCHIVE_OK) {
+          archive_write_log_error_cleanup(arch);
+          return -1;
+        }
+        this->stream_ = stream;
+        r = archive_write_open(arch, this, NULL, archive_write, NULL);
+        if (r != ARCHIVE_OK) {
+          archive_write_log_error_cleanup(arch);
+          return -1;
+        }
+        struct archive_entry *entry = archive_entry_new();
+        if (!entry) {
+          archive_write_log_error_cleanup(arch);
+          return -1;
+        }
+        std::string fileName;
+        flow_->getAttribute(FlowAttributeKey(FILENAME), fileName);
+        archive_entry_set_pathname(entry, fileName.c_str());
+        archive_entry_set_size(entry, flow_->getSize());
+        archive_entry_set_mode(entry, S_IFREG | 0755);
+        ReadCallbackCompress readCb(flow_, arch, entry);
+        session_->read(flow_, &readCb);
+        if (readCb.status_ < 0) {
+          archive_entry_free(entry);
+          archive_write_log_error_cleanup(arch);
+          status_ = -1;
+          return -1;
+        }
+        archive_entry_free(entry);
+        archive_write_close(arch);
+        archive_write_free(arch);
+        return size_;
+      } else {
+        arch = archive_read_new();
+        if (!arch) {
+          status_ = -1;
+          return -1;
+        }
+        r = archive_read_support_format_all(arch);
+        if (r != ARCHIVE_OK) {
+          archive_read_log_error_cleanup(arch);
+          return -1;
+        }
+        r = archive_read_support_filter_all(arch);
+        if (r != ARCHIVE_OK) {
+          archive_read_log_error_cleanup(arch);
+          return -1;
+        }
+        this->stream_ = stream;
+        r = archive_read_open2(arch, this, NULL, archive_read, archive_skip, NULL);
+        if (r != ARCHIVE_OK) {
+          archive_read_log_error_cleanup(arch);
+          return -1;
+        }
+        struct archive_entry *entry;
+        if (archive_read_next_header(arch, &entry) != ARCHIVE_OK) {
+          archive_read_log_error_cleanup(arch);
+          return -1;
+        }
+        int entry_size = archive_entry_size(entry);
+        logger_->log_debug("Decompress Content archive entry size %d", entry_size);
+        size_ = 0;
+        while (size_ < entry_size) {
+          char buffer[8192];
+          int ret = archive_read_data(arch, buffer, sizeof(buffer));
+          if (ret < 0) {
+            archive_read_log_error_cleanup(arch);
+            return -1;
+          }
+          if (ret == 0)
+            break;
+          size_ += ret;
+          ret = stream_->write(reinterpret_cast<uint8_t*>(buffer), ret);
+          if (ret < 0) {
+            archive_read_log_error_cleanup(arch);
+            return -1;
+          }
+        }
+        archive_read_close(arch);
+        archive_read_free(arch);
+        return size_;
+      }
+    }
+  };
+
+public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+  // OnTrigger method, implemented by NiFi CompressContent
+  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  }
+  // OnTrigger method, implemented by NiFi CompressContent
+  virtual void onTrigger(std::shared_ptr<core::ProcessContext> context, std::shared_ptr<core::ProcessSession> session);
+  // Initialize, over write by NiFi CompressContent
+  virtual void initialize(void);
+
+protected:
+
+private:
+  std::shared_ptr<logging::Logger> logger_;
+  int64_t compressLevel_;
+  std::string compressMode_;
+  std::string compressFormat_;
+  bool updateFileName_;
+  std::map<std::string, std::string> compressionFormatMimeTypeMap_;
+  std::map<std::string, std::string> fileExtension_;
+};
+
+REGISTER_RESOURCE (CompressContent);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/extensions/rocksdb-repos/RocksDbStream.h
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h
index da08899..e4feb62 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -65,7 +65,7 @@ class RocksDbStream : public io::BaseStream {
    */
   void seek(uint64_t offset);
 
-  const uint32_t getSize() const {
+  const uint64_t getSize() const {
     return size_;
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/libminifi/include/io/AtomicEntryStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h
index 3eb456e..181b69d 100644
--- a/libminifi/include/io/AtomicEntryStream.h
+++ b/libminifi/include/io/AtomicEntryStream.h
@@ -61,7 +61,7 @@ class AtomicEntryStream : public BaseStream {
    */
   void seek(uint64_t offset);
 
-  virtual const uint32_t getSize() const {
+  virtual const uint64_t getSize() const {
     return length_;
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/libminifi/include/io/BaseStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/BaseStream.h b/libminifi/include/io/BaseStream.h
index dc810e3..3410be6 100644
--- a/libminifi/include/io/BaseStream.h
+++ b/libminifi/include/io/BaseStream.h
@@ -60,7 +60,7 @@ class BaseStream : public DataStream, public Serializable {
 
   int writeData(uint8_t *value, int size);
 
-  virtual void seek(uint32_t offset) {
+  virtual void seek(uint64_t offset) {
     if (composable_stream_ != this) {
       composable_stream_->seek(offset);
     } else {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/libminifi/include/io/ClientSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h
index c9ad90e..216ef3d 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -212,7 +212,7 @@ class Socket : public BaseStream {
    * Retrieve size of data stream
    * @return size of data stream
    **/
-  const uint32_t getSize() const {
+  const uint64_t getSize() const {
     return DataStream::getSize();
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/libminifi/include/io/DataStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/DataStream.h b/libminifi/include/io/DataStream.h
index 2ebc9a4..e0fa4be 100644
--- a/libminifi/include/io/DataStream.h
+++ b/libminifi/include/io/DataStream.h
@@ -60,7 +60,7 @@ class DataStream {
     return 0;
   }
 
-  virtual void seek(uint32_t offset) {
+  virtual void seek(uint64_t offset) {
     readBuffer += offset;
   }
 
@@ -117,7 +117,7 @@ class DataStream {
    * Retrieve size of data stream
    * @return size of data stream
    **/
-  virtual const uint32_t getSize() const {
+  virtual const uint64_t getSize() const {
     return buffer.size();
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/libminifi/include/io/FileStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h
index 0cddcc2..94d13b2 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -64,7 +64,7 @@ class FileStream : public io::BaseStream {
    */
   void seek(uint64_t offset);
 
-  const uint32_t getSize() const {
+  const uint64_t getSize() const {
     return length_;
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/libminifi/src/io/FileStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index 93d6411..93a38df 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -47,7 +47,8 @@ FileStream::FileStream(const std::string &path)
 
 FileStream::FileStream(const std::string &path, uint32_t offset, bool write_enable)
     : logger_(logging::LoggerFactory<FileStream>::getLogger()),
-      path_(path) {
+      path_(path),
+      offset_(offset) {
   file_stream_ = std::unique_ptr<std::fstream>(new std::fstream());
   if (write_enable) {
     file_stream_->open(path.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9af7faab/libminifi/test/archive-tests/CompressContentTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
new file mode 100644
index 0000000..a01d632
--- /dev/null
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -0,0 +1,933 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN  // This tells Catch to provide a main() - only do this in one cpp file
+#include <uuid/uuid.h>
+#include <fstream>
+#include <map>
+#include <memory>
+#include <utility>
+#include <string>
+#include <set>
+#include "FlowController.h"
+#include "../TestBase.h"
+#include "core/Core.h"
+#include "../../include/core/FlowFile.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include "CompressContent.h"
+#include "io/FileStream.h"
+#include "FlowFileRecord.h"
+#include <sstream>
+#include <iostream>
+
+static const char* EXPECT_COMPRESS_CONTENT = "/tmp/minifi-expect-compresscontent.txt";
+static const char* COMPRESS_CONTENT = "/tmp/minifi-compresscontent";
+static unsigned int globalSeed;
+
+class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+  explicit ReadCallback(uint64_t size) :
+      read_size_(0) {
+    buffer_size_ = size;
+    buffer_ = new uint8_t[buffer_size_];
+    archive_buffer_ = nullptr;
+  }
+  ~ReadCallback() {
+    if (buffer_)
+      delete[] buffer_;
+    if (archive_buffer_)
+      delete[] archive_buffer_;
+  }
+  int64_t process(std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> stream) {
+    int64_t ret = 0;
+    ret = stream->read(buffer_, buffer_size_);
+    if (!stream)
+      read_size_ = stream->getSize();
+    else
+      read_size_ = buffer_size_;
+    return ret;
+  }
+  void archive_read() {
+    struct archive *a;
+    a = archive_read_new();
+    archive_read_support_format_all(a);
+    archive_read_support_filter_all(a);
+    archive_read_open_memory(a, buffer_, read_size_);
+    struct archive_entry *ae;
+
+    if (archive_read_next_header(a, &ae) == ARCHIVE_OK) {
+      int size = archive_entry_size(ae);
+      archive_buffer_ = new char[size];
+      archive_buffer_size_ = size;
+      archive_read_data(a, archive_buffer_, size);
+    }
+    archive_read_free(a);
+  }
+
+  uint8_t *buffer_;
+  uint64_t buffer_size_;
+  uint64_t read_size_;
+  char *archive_buffer_;
+  int archive_buffer_size_;
+};
+
+TEST_CASE("CompressFileGZip", "[compressfiletest1]") {
+  try {
+    std::ofstream expectfile;
+    expectfile.open(EXPECT_COMPRESS_CONTENT);
+
+    for (int i = 0; i < 100000; i++) {
+      expectfile << std::to_string(rand_r(&globalSeed)%100);
+    }
+    expectfile.close();
+
+    TestController testController;
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
+    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+    uuid_t processoruuid;
+    REQUIRE(true == processor->getUUID(processoruuid));
+    uuid_t logAttributeuuid;
+    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+
+    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+    // connection from compress processor to log attribute
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+    connection->setRelationship(core::Relationship("success", "compress successful output"));
+    connection->setSource(processor);
+    connection->setDestination(logAttributeProcessor);
+    connection->setSourceUUID(processoruuid);
+    connection->setDestinationUUID(logAttributeuuid);
+    processor->addConnection(connection);
+    // connection to compress processor
+    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
+    compressconnection->setDestination(processor);
+    compressconnection->setDestinationUUID(processoruuid);
+    processor->addConnection(compressconnection);
+
+    std::set<core::Relationship> autoTerminatedRelationships;
+    core::Relationship failure("failure", "");
+    autoTerminatedRelationships.insert(failure);
+    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+    processor->incrementActiveTasks();
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    logAttributeProcessor->incrementActiveTasks();
+    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+    core::ProcessSession sessionGenFlowFile(context);
+    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
+    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0);
+    income_connection->put(flow);
+
+    REQUIRE(processor->getName() == "compresscontent");
+    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+    processor->onSchedule(context, factory);
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+
+    // validate the compress content
+    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+    REQUIRE(flow1->getSize() > 0);
+    {
+      REQUIRE(flow1->getSize() != flow->getSize());
+      std::string mime;
+      flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
+      REQUIRE(mime == "application/gzip");
+      ReadCallback callback(flow1->getSize());
+      sessionGenFlowFile.read(flow1, &callback);
+      callback.archive_read();
+      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
+      std::ifstream file1;
+      file1.open(flowFileName, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
+      REQUIRE(expectContents == contents);
+      // write the compress content for next test
+      std::ofstream file(COMPRESS_CONTENT);
+      file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      file.close();
+      file1.close();
+    }
+    LogTestController::getInstance().reset();
+  } catch (...) {
+  }
+}
+
+TEST_CASE("DecompressFileGZip", "[compressfiletest2]") {
+  try {
+    TestController testController;
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+    // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
+
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
+    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+    uuid_t processoruuid;
+    REQUIRE(true == processor->getUUID(processoruuid));
+    uuid_t logAttributeuuid;
+    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+    // connection from compress processor to log attribute
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+    connection->setRelationship(core::Relationship("success", "compress successful output"));
+    connection->setSource(processor);
+    connection->setDestination(logAttributeProcessor);
+    connection->setSourceUUID(processoruuid);
+    connection->setDestinationUUID(logAttributeuuid);
+    processor->addConnection(connection);
+    // connection to compress processor
+    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
+    compressconnection->setDestination(processor);
+    compressconnection->setDestinationUUID(processoruuid);
+    processor->addConnection(compressconnection);
+
+    std::set<core::Relationship> autoTerminatedRelationships;
+    core::Relationship failure("failure", "");
+    autoTerminatedRelationships.insert(failure);
+    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+    processor->incrementActiveTasks();
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    logAttributeProcessor->incrementActiveTasks();
+    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+    core::ProcessSession sessionGenFlowFile(context);
+    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
+    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0);
+    income_connection->put(flow);
+
+    REQUIRE(processor->getName() == "compresscontent");
+    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+    processor->onSchedule(context, factory);
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+
+    // validate the compress content
+    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+    REQUIRE(flow1->getSize() > 0);
+    {
+      REQUIRE(flow1->getSize() != flow->getSize());
+      std::string mime;
+      REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
+      ReadCallback callback(flow1->getSize());
+      sessionGenFlowFile.read(flow1, &callback);
+      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
+      std::ifstream file1;
+      file1.open(flowFileName, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      REQUIRE(expectContents == contents);
+      file1.close();
+    }
+    LogTestController::getInstance().reset();
+    unlink(COMPRESS_CONTENT);
+    unlink(EXPECT_COMPRESS_CONTENT);
+  } catch (...) {
+  }
+}
+
+TEST_CASE("CompressFileBZip", "[compressfiletest3]") {
+  try {
+    std::ofstream expectfile;
+    expectfile.open(EXPECT_COMPRESS_CONTENT);
+
+    for (int i = 0; i < 100000; i++) {
+      expectfile << std::to_string(rand_r(&globalSeed)%100);
+    }
+    expectfile.close();
+
+    TestController testController;
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
+    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+    uuid_t processoruuid;
+    REQUIRE(true == processor->getUUID(processoruuid));
+    uuid_t logAttributeuuid;
+    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+
+    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+    // connection from compress processor to log attribute
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+    connection->setRelationship(core::Relationship("success", "compress successful output"));
+    connection->setSource(processor);
+    connection->setDestination(logAttributeProcessor);
+    connection->setSourceUUID(processoruuid);
+    connection->setDestinationUUID(logAttributeuuid);
+    processor->addConnection(connection);
+    // connection to compress processor
+    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
+    compressconnection->setDestination(processor);
+    compressconnection->setDestinationUUID(processoruuid);
+    processor->addConnection(compressconnection);
+
+    std::set<core::Relationship> autoTerminatedRelationships;
+    core::Relationship failure("failure", "");
+    autoTerminatedRelationships.insert(failure);
+    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+    processor->incrementActiveTasks();
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    logAttributeProcessor->incrementActiveTasks();
+    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+    core::ProcessSession sessionGenFlowFile(context);
+    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
+    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0);
+    income_connection->put(flow);
+
+    REQUIRE(processor->getName() == "compresscontent");
+    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+    processor->onSchedule(context, factory);
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+
+    // validate the compress content
+    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+    REQUIRE(flow1->getSize() > 0);
+    {
+      REQUIRE(flow1->getSize() != flow->getSize());
+      std::string mime;
+      flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
+      REQUIRE(mime == "application/bzip2");
+      ReadCallback callback(flow1->getSize());
+      sessionGenFlowFile.read(flow1, &callback);
+      callback.archive_read();
+      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
+      std::ifstream file1;
+      file1.open(flowFileName, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
+      REQUIRE(expectContents == contents);
+      // write the compress content for next test
+      std::ofstream file(COMPRESS_CONTENT);
+      file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      file.close();
+      file1.close();
+    }
+    LogTestController::getInstance().reset();
+  } catch (...) {
+  }
+}
+
+
+TEST_CASE("DecompressFileBZip", "[compressfiletest4]") {
+  try {
+    TestController testController;
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+    // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
+
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
+    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+    uuid_t processoruuid;
+    REQUIRE(true == processor->getUUID(processoruuid));
+    uuid_t logAttributeuuid;
+    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+    // connection from compress processor to log attribute
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+    connection->setRelationship(core::Relationship("success", "compress successful output"));
+    connection->setSource(processor);
+    connection->setDestination(logAttributeProcessor);
+    connection->setSourceUUID(processoruuid);
+    connection->setDestinationUUID(logAttributeuuid);
+    processor->addConnection(connection);
+    // connection to compress processor
+    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
+    compressconnection->setDestination(processor);
+    compressconnection->setDestinationUUID(processoruuid);
+    processor->addConnection(compressconnection);
+
+    std::set<core::Relationship> autoTerminatedRelationships;
+    core::Relationship failure("failure", "");
+    autoTerminatedRelationships.insert(failure);
+    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+    processor->incrementActiveTasks();
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    logAttributeProcessor->incrementActiveTasks();
+    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+    core::ProcessSession sessionGenFlowFile(context);
+    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
+    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0);
+    income_connection->put(flow);
+
+    REQUIRE(processor->getName() == "compresscontent");
+    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+    processor->onSchedule(context, factory);
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+
+    // validate the compress content
+    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+    REQUIRE(flow1->getSize() > 0);
+    {
+      REQUIRE(flow1->getSize() != flow->getSize());
+      std::string mime;
+      REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
+      ReadCallback callback(flow1->getSize());
+      sessionGenFlowFile.read(flow1, &callback);
+      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
+      std::ifstream file1;
+      file1.open(flowFileName, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      REQUIRE(expectContents == contents);
+      file1.close();
+    }
+    LogTestController::getInstance().reset();
+    unlink(COMPRESS_CONTENT);
+    unlink(EXPECT_COMPRESS_CONTENT);
+  } catch (...) {
+  }
+}
+
+TEST_CASE("CompressFileLZMA", "[compressfiletest5]") {
+  try {
+    std::ofstream expectfile;
+    expectfile.open(EXPECT_COMPRESS_CONTENT);
+
+    for (int i = 0; i < 100000; i++) {
+      expectfile << std::to_string(rand_r(&globalSeed)%100);
+    }
+    expectfile.close();
+
+    TestController testController;
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
+    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+    uuid_t processoruuid;
+    REQUIRE(true == processor->getUUID(processoruuid));
+    uuid_t logAttributeuuid;
+    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+
+    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+    // connection from compress processor to log attribute
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+    connection->setRelationship(core::Relationship("success", "compress successful output"));
+    connection->setSource(processor);
+    connection->setDestination(logAttributeProcessor);
+    connection->setSourceUUID(processoruuid);
+    connection->setDestinationUUID(logAttributeuuid);
+    processor->addConnection(connection);
+    // connection to compress processor
+    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
+    compressconnection->setDestination(processor);
+    compressconnection->setDestinationUUID(processoruuid);
+    processor->addConnection(compressconnection);
+
+    std::set<core::Relationship> autoTerminatedRelationships;
+    core::Relationship failure("failure", "");
+    autoTerminatedRelationships.insert(failure);
+    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+    processor->incrementActiveTasks();
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    logAttributeProcessor->incrementActiveTasks();
+    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_LZMA);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+    core::ProcessSession sessionGenFlowFile(context);
+    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
+    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0);
+    income_connection->put(flow);
+
+    REQUIRE(processor->getName() == "compresscontent");
+    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+    processor->onSchedule(context, factory);
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+
+    if (LogTestController::getInstance().contains("compression not supported on this platform")) {
+      // platform not support LZMA
+      LogTestController::getInstance().reset();
+      return;
+    }
+
+    // validate the compress content
+    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+    REQUIRE(flow1->getSize() > 0);
+    {
+      REQUIRE(flow1->getSize() != flow->getSize());
+      std::string mime;
+      flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
+      REQUIRE(mime == "application/x-lzma");
+      ReadCallback callback(flow1->getSize());
+      sessionGenFlowFile.read(flow1, &callback);
+      callback.archive_read();
+      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
+      std::ifstream file1;
+      file1.open(flowFileName, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
+      REQUIRE(expectContents == contents);
+      // write the compress content for next test
+      std::ofstream file(COMPRESS_CONTENT);
+      file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      file.close();
+      file1.close();
+    }
+    LogTestController::getInstance().reset();
+  } catch (...) {
+  }
+}
+
+
+TEST_CASE("DecompressFileLZMA", "[compressfiletest6]") {
+  try {
+    TestController testController;
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+    // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
+
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
+    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+    uuid_t processoruuid;
+    REQUIRE(true == processor->getUUID(processoruuid));
+    uuid_t logAttributeuuid;
+    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+    // connection from compress processor to log attribute
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+    connection->setRelationship(core::Relationship("success", "compress successful output"));
+    connection->setSource(processor);
+    connection->setDestination(logAttributeProcessor);
+    connection->setSourceUUID(processoruuid);
+    connection->setDestinationUUID(logAttributeuuid);
+    processor->addConnection(connection);
+    // connection to compress processor
+    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
+    compressconnection->setDestination(processor);
+    compressconnection->setDestinationUUID(processoruuid);
+    processor->addConnection(compressconnection);
+
+    std::set<core::Relationship> autoTerminatedRelationships;
+    core::Relationship failure("failure", "");
+    autoTerminatedRelationships.insert(failure);
+    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+    processor->incrementActiveTasks();
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    logAttributeProcessor->incrementActiveTasks();
+    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+    core::ProcessSession sessionGenFlowFile(context);
+    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
+    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0);
+    flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-lzma");
+    income_connection->put(flow);
+
+    REQUIRE(processor->getName() == "compresscontent");
+    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+    processor->onSchedule(context, factory);
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+
+    if (LogTestController::getInstance().contains("compression not supported on this platform")) {
+      // platform not support LZMA
+      LogTestController::getInstance().reset();
+      return;
+    }
+
+    // validate the compress content
+    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+    REQUIRE(flow1->getSize() > 0);
+    {
+      REQUIRE(flow1->getSize() != flow->getSize());
+      std::string mime;
+      REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
+      ReadCallback callback(flow1->getSize());
+      sessionGenFlowFile.read(flow1, &callback);
+      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
+      std::ifstream file1;
+      file1.open(flowFileName, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      REQUIRE(expectContents == contents);
+      file1.close();
+    }
+    LogTestController::getInstance().reset();
+    unlink(COMPRESS_CONTENT);
+    unlink(EXPECT_COMPRESS_CONTENT);
+  } catch (...) {
+  }
+}
+
+TEST_CASE("CompressFileXYLZMA", "[compressfiletest7]") {
+  try {
+    std::ofstream expectfile;
+    expectfile.open(EXPECT_COMPRESS_CONTENT);
+
+    for (int i = 0; i < 100000; i++) {
+      expectfile << std::to_string(rand_r(&globalSeed)%100);
+    }
+    expectfile.close();
+
+    TestController testController;
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
+    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+    uuid_t processoruuid;
+    REQUIRE(true == processor->getUUID(processoruuid));
+    uuid_t logAttributeuuid;
+    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+
+    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+    // connection from compress processor to log attribute
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+    connection->setRelationship(core::Relationship("success", "compress successful output"));
+    connection->setSource(processor);
+    connection->setDestination(logAttributeProcessor);
+    connection->setSourceUUID(processoruuid);
+    connection->setDestinationUUID(logAttributeuuid);
+    processor->addConnection(connection);
+    // connection to compress processor
+    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
+    compressconnection->setDestination(processor);
+    compressconnection->setDestinationUUID(processoruuid);
+    processor->addConnection(compressconnection);
+
+    std::set<core::Relationship> autoTerminatedRelationships;
+    core::Relationship failure("failure", "");
+    autoTerminatedRelationships.insert(failure);
+    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+    processor->incrementActiveTasks();
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    logAttributeProcessor->incrementActiveTasks();
+    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_XZ_LZMA2);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+    core::ProcessSession sessionGenFlowFile(context);
+    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
+    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0);
+    income_connection->put(flow);
+
+    REQUIRE(processor->getName() == "compresscontent");
+    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+    processor->onSchedule(context, factory);
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+
+    if (LogTestController::getInstance().contains("compression not supported on this platform")) {
+      // platform not support LZMA
+      LogTestController::getInstance().reset();
+      return;
+    }
+
+    // validate the compress content
+    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+    REQUIRE(flow1->getSize() > 0);
+    {
+      REQUIRE(flow1->getSize() != flow->getSize());
+      std::string mime;
+      flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
+      REQUIRE(mime == "application/x-xz");
+      ReadCallback callback(flow1->getSize());
+      sessionGenFlowFile.read(flow1, &callback);
+      callback.archive_read();
+      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
+      std::ifstream file1;
+      file1.open(flowFileName, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
+      REQUIRE(expectContents == contents);
+      // write the compress content for next test
+      std::ofstream file(COMPRESS_CONTENT);
+      file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      file.close();
+      file1.close();
+    }
+    LogTestController::getInstance().reset();
+  } catch (...) {
+  }
+}
+
+
+TEST_CASE("DecompressFileXYLZMA", "[compressfiletest8]") {
+  try {
+    TestController testController;
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+    // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
+
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
+    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+    uuid_t processoruuid;
+    REQUIRE(true == processor->getUUID(processoruuid));
+    uuid_t logAttributeuuid;
+    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+    // connection from compress processor to log attribute
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+    connection->setRelationship(core::Relationship("success", "compress successful output"));
+    connection->setSource(processor);
+    connection->setDestination(logAttributeProcessor);
+    connection->setSourceUUID(processoruuid);
+    connection->setDestinationUUID(logAttributeuuid);
+    processor->addConnection(connection);
+    // connection to compress processor
+    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
+    compressconnection->setDestination(processor);
+    compressconnection->setDestinationUUID(processoruuid);
+    processor->addConnection(compressconnection);
+
+    std::set<core::Relationship> autoTerminatedRelationships;
+    core::Relationship failure("failure", "");
+    autoTerminatedRelationships.insert(failure);
+    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+    processor->incrementActiveTasks();
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    logAttributeProcessor->incrementActiveTasks();
+    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+    core::ProcessSession sessionGenFlowFile(context);
+    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
+    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0);
+    flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-xz");
+    income_connection->put(flow);
+
+    REQUIRE(processor->getName() == "compresscontent");
+    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+    processor->onSchedule(context, factory);
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+
+    if (LogTestController::getInstance().contains("compression not supported on this platform")) {
+      // platform not support LZMA
+      LogTestController::getInstance().reset();
+      return;
+    }
+
+    // validate the compress content
+    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+    REQUIRE(flow1->getSize() > 0);
+    {
+      REQUIRE(flow1->getSize() != flow->getSize());
+      std::string mime;
+      REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
+      ReadCallback callback(flow1->getSize());
+      sessionGenFlowFile.read(flow1, &callback);
+      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
+      std::ifstream file1;
+      file1.open(flowFileName, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      REQUIRE(expectContents == contents);
+      file1.close();
+    }
+    LogTestController::getInstance().reset();
+    unlink(COMPRESS_CONTENT);
+    unlink(EXPECT_COMPRESS_CONTENT);
+  } catch (...) {
+  }
+}
+


Mime
View raw message