nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ab...@apache.org
Subject [nifi-minifi-cpp] branch main updated: MINIFICPP-1387 - Speed up Identifier::to_string
Date Mon, 26 Oct 2020 10:37:38 GMT
This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new e22ede1  MINIFICPP-1387 - Speed up Identifier::to_string
e22ede1 is described below

commit e22ede1ec3673e8667022435601db2e533080f55
Author: Adam Debreceni <adam.debreceni@protonmail.com>
AuthorDate: Thu Oct 8 15:43:37 2020 +0200

    MINIFICPP-1387 - Speed up Identifier::to_string
    
    Signed-off-by: Arpad Boda <aboda@apache.org>
    
    This closes #926
---
 libminifi/include/core/Core.h                      |   3 +-
 libminifi/include/core/FlowFile.h                  |   5 -
 libminifi/include/core/ProcessSession.h            |  30 +++-
 libminifi/include/core/logging/Logger.h            |   6 +
 .../include/core/state/nodes/FlowInformation.h     |   2 +-
 libminifi/include/io/OutputStream.h                |   6 +
 libminifi/include/utils/Id.h                       |  13 +-
 libminifi/include/utils/SmallString.h              |  90 ++++++++++
 libminifi/src/core/FlowFile.cpp                    |   2 +-
 libminifi/src/core/ProcessSession.cpp              | 199 +++++++++------------
 libminifi/src/utils/Id.cpp                         |  45 ++++-
 nanofi/src/api/nanofi.cpp                          |   2 +-
 12 files changed, 260 insertions(+), 143 deletions(-)

diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h
index 5dc8a39..6ca5303 100644
--- a/libminifi/include/core/Core.h
+++ b/libminifi/include/core/Core.h
@@ -193,9 +193,8 @@ class CoreComponent {
   // unsigned const char *getUUID();
   /**
    * Return the UUID string
-   * @param constant reference to the UUID str
    */
-  std::string getUUIDStr() const {
+  utils::SmallString<36> getUUIDStr() const {
     return uuid_.to_string();
   }
 
diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h
index 73c569d..687ac0b 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -220,11 +220,6 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
    */
   uint64_t getOffset() const;
 
-  bool getUUID(utils::Identifier& other) {
-    other = uuid_;
-    return true;
-  }
-
   // Check whether it is still being penalized
   bool isPenalized() const {
     return penaltyExpiration_ms_ > 0 && penaltyExpiration_ms_ > utils::timeutils::getTimeMillis();
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index c453abe..125439c 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -139,25 +139,37 @@ class ProcessSession : public ReferenceContainer {
   ProcessSession &operator=(const ProcessSession &parent) = delete;
 
  protected:
+  struct FlowFileUpdate {
+    std::shared_ptr<FlowFile> modified;
+    std::shared_ptr<FlowFile> snapshot;
+  };
+
   // FlowFiles being modified by current process session
-  std::map<std::string, std::shared_ptr<core::FlowFile>> _updatedFlowFiles;
-  // Copy of the original FlowFiles being modified by current process session as above
-  std::map<std::string, std::shared_ptr<core::FlowFile>> _flowFileSnapShots;
+  std::map<utils::Identifier, FlowFileUpdate> _updatedFlowFiles;
   // FlowFiles being added by current process session
-  std::map<std::string, std::shared_ptr<core::FlowFile>> _addedFlowFiles;
+  std::map<utils::Identifier, std::shared_ptr<core::FlowFile>> _addedFlowFiles;
   // FlowFiles being deleted by current process session
-  std::map<std::string, std::shared_ptr<core::FlowFile>> _deletedFlowFiles;
+  std::vector<std::shared_ptr<core::FlowFile>> _deletedFlowFiles;
   // FlowFiles being transfered to the relationship
-  std::map<std::string, Relationship> _transferRelationship;
+  std::map<utils::Identifier, Relationship> _transferRelationship;
   // FlowFiles being cloned for multiple connections per relationship
-  std::map<std::string, std::shared_ptr<core::FlowFile>> _clonedFlowFiles;
+  std::vector<std::shared_ptr<core::FlowFile>> _clonedFlowFiles;
 
  private:
+  enum class RouteResult {
+    Ok_Routed,
+    Ok_AutoTerminated,
+    Ok_Deleted,
+    Error_NoRelationship
+  };
+
+  RouteResult routeFlowFile(const std::shared_ptr<FlowFile>& record);
+
   void persistFlowFilesBeforeTransfer(
       std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<core::FlowFile>>>&
transactionMap,
-      const std::map<std::string, std::shared_ptr<FlowFile>>& originalFlowFileSnapShots);
+      const std::map<utils::Identifier, FlowFileUpdate>& modifiedFlowFiles);
   // Clone the flow file during transfer to multiple connections for a relationship
-  std::shared_ptr<core::FlowFile> cloneDuringTransfer(std::shared_ptr<core::FlowFile>
&parent);
+  std::shared_ptr<core::FlowFile> cloneDuringTransfer(const std::shared_ptr<core::FlowFile>
&parent);
   // ProcessContext
   std::shared_ptr<ProcessContext> process_context_;
   // Logger
diff --git a/libminifi/include/core/logging/Logger.h b/libminifi/include/core/logging/Logger.h
index e409df9..332151f 100644
--- a/libminifi/include/core/logging/Logger.h
+++ b/libminifi/include/core/logging/Logger.h
@@ -27,6 +27,7 @@
 
 #include "spdlog/common.h"
 #include "spdlog/logger.h"
+#include "utils/SmallString.h"
 
 namespace org {
 namespace apache {
@@ -53,6 +54,11 @@ inline char const* conditional_conversion(std::string const& str) {
   return str.c_str();
 }
 
+template<size_t N>
+inline char const* conditional_conversion(const utils::SmallString<N>& arr) {
+  return arr.c_str();
+}
+
 template<typename T>
 inline T conditional_conversion(T const& t) {
   return t;
diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h
index 0777f4a..ba3e5e3 100644
--- a/libminifi/include/core/state/nodes/FlowInformation.h
+++ b/libminifi/include/core/state/nodes/FlowInformation.h
@@ -200,7 +200,7 @@ class FlowInformation : public FlowMonitor {
 
         SerializedResponseNode queueUUIDNode;
         queueUUIDNode.name = "uuid";
-        queueUUIDNode.value = queue.second->getUUIDStr();
+        queueUUIDNode.value = std::string{queue.second->getUUIDStr()};
 
         SerializedResponseNode queuesize;
         queuesize.name = "size";
diff --git a/libminifi/include/io/OutputStream.h b/libminifi/include/io/OutputStream.h
index 466ebd0..ebee70c 100644
--- a/libminifi/include/io/OutputStream.h
+++ b/libminifi/include/io/OutputStream.h
@@ -23,6 +23,7 @@
 #include <string>
 #include "Stream.h"
 #include "utils/gsl.h"
+#include "utils/SmallString.h"
 
 namespace org {
 namespace apache {
@@ -68,6 +69,11 @@ class OutputStream : public virtual Stream {
    **/
   int write(const char* str, bool widen = false);
 
+  template<size_t N>
+  int write(const utils::SmallString<N>& str, bool widen = false) {
+    return write(str.c_str(), widen);
+  }
+
   /**
   * writes sizeof(Integral) bytes to the stream
   * @param value to write
diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h
index 68dbf7a..bcd8119 100644
--- a/libminifi/include/utils/Id.h
+++ b/libminifi/include/utils/Id.h
@@ -23,6 +23,7 @@
 #include <memory>
 #include <string>
 #include <thread>
+#include "SmallString.h"
 
 #ifndef WIN32
 class uuid;
@@ -53,6 +54,7 @@ namespace utils {
 class Identifier {
   friend struct IdentifierTestAccessor;
   static constexpr const char* UUID_FORMAT_STRING = "%02hhx%02hhx%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx";
+  static constexpr const char* hex_lut = "0123456789abcdef";
 
  public:
   using Data = std::array<uint8_t, 16>;
@@ -65,10 +67,19 @@ class Identifier {
 
   bool operator!=(const Identifier& other) const;
   bool operator==(const Identifier& other) const;
+  bool operator<(const Identifier& other) const;
 
   bool isNil() const;
 
-  std::string to_string() const;
+  // Numerous places query the string representation
+  // just to then forward the temporary to build logs,
+  // streams, or others. Dynamically allocating in these
+  // instances is wasteful as we immediately discard
+  // the result. The difference on the test machine is 8x,
+  // building the representation itself takes 10ns, while
+  // subsequently turning it into a std::string would take
+  // 70ns more.
+  SmallString<36> to_string() const;
 
   static utils::optional<Identifier> parse(const std::string& str);
 
diff --git a/libminifi/include/utils/SmallString.h b/libminifi/include/utils/SmallString.h
new file mode 100644
index 0000000..a4c17c6
--- /dev/null
+++ b/libminifi/include/utils/SmallString.h
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <array>
+#include <ostream>
+#include <string>
+#include <utility>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<size_t N>
+class SmallString : public std::array<char, N + 1> {
+ public:
+  operator std::string() const {  // NOLINT
+    return {c_str()};
+  }
+
+  const char* c_str() const {
+    return this->data();
+  }
+
+  friend std::ostream &operator<<(std::ostream &out, const SmallString &str)
{
+    return out << str.c_str();
+  }
+
+  friend std::string operator+(const std::string &lhs, const SmallString &rhs) {
+    return lhs + rhs.c_str();
+  }
+
+  friend std::string operator+(std::string &&lhs, const SmallString &rhs) {
+    return std::move(lhs) + rhs.c_str();
+  }
+
+  friend std::string operator+(const SmallString &lhs, const std::string &rhs) {
+    return lhs.c_str() + rhs;
+  }
+
+  friend std::string operator+(const SmallString &lhs, std::string &&rhs) {
+    return lhs.c_str() + std::move(rhs);
+  }
+
+  friend bool operator==(const std::string& lhs, const SmallString& rhs) {
+    return lhs == rhs.c_str();
+  }
+
+  friend bool operator==(const SmallString& lhs, const std::string& rhs) {
+    return lhs.c_str() == rhs;
+  }
+
+  friend bool operator==(const SmallString& lhs, const SmallString& rhs) {
+    return static_cast<std::array<char, N + 1>>(lhs) == static_cast<std::array<char,
N + 1>>(rhs);
+  }
+
+  friend bool operator!=(const std::string& lhs, const SmallString& rhs) {
+    return !(lhs == rhs);
+  }
+
+  friend bool operator!=(const SmallString& lhs, const std::string& rhs) {
+    return !(lhs == rhs);
+  }
+
+  friend bool operator!=(const SmallString& lhs, const SmallString& rhs) {
+    return !(lhs == rhs);
+  }
+};
+
+}  // namespace utils
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index 53f834a..a6f3fb1 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -108,7 +108,7 @@ void FlowFile::setStashClaim(const std::string& key, const std::shared_ptr<Resou
   if (hasStashClaim(key)) {
     logger_->log_warn("Stashing content of record %s to existing key %s; "
                       "existing content will be overwritten",
-                      getUUIDStr().c_str(), key.c_str());
+                      getUUIDStr(), key.c_str());
   }
 
   stashedContent_[key] = claim;
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 6233f73..2abf4cf 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -66,10 +66,12 @@ ProcessSession::~ProcessSession() {
 }
 
 void ProcessSession::add(const std::shared_ptr<core::FlowFile> &record) {
-  if (_updatedFlowFiles.find(record->getUUIDStr()) != _updatedFlowFiles.end()) {
+  utils::Identifier uuid;
+  record->getUUID(uuid);
+  if (_updatedFlowFiles.find(uuid) != _updatedFlowFiles.end()) {
     throw Exception(ExceptionType::PROCESSOR_EXCEPTION, "Mustn't add file that was provided
by this session");
   }
-  _addedFlowFiles[record->getUUIDStr()] = record;
+  _addedFlowFiles[uuid] = record;
   record->setDeleted(false);
 }
 
@@ -94,7 +96,9 @@ std::shared_ptr<core::FlowFile> ProcessSession::create(const std::shared_ptr<cor
     parent->getlineageIdentifiers().push_back(parent->getUUIDStr());
   }
 
-  _addedFlowFiles[record->getUUIDStr()] = record;
+  utils::Identifier uuid;
+  record->getUUID(uuid);
+  _addedFlowFiles[uuid] = record;
   logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr());
   std::stringstream details;
   details << process_context_->getProcessorNode()->getName() << " creates
flow record " << record->getUUIDStr();
@@ -119,14 +123,14 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(const std::shared_ptr<core
   return record;
 }
 
-std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(std::shared_ptr<core::FlowFile>
&parent) {
+std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(const std::shared_ptr<core::FlowFile>
&parent) {
   auto record = std::make_shared<FlowFileRecord>();
 
   auto flow_version = process_context_->getProcessorNode()->getFlowIdentifier();
   if (flow_version != nullptr) {
     record->setAttribute(SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId());
   }
-  this->_clonedFlowFiles[record->getUUIDStr()] = record;
+  this->_clonedFlowFiles.push_back(record);
   logger_->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr());
   // Copy attributes
   for (const auto& attribute : parent->getAttributes()) {
@@ -155,19 +159,15 @@ std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(std::shared_
 }
 
 std::shared_ptr<core::FlowFile> ProcessSession::clone(const std::shared_ptr<core::FlowFile>
&parent, int64_t offset, int64_t size) {
+  if ((uint64_t) (offset + size) > parent->getSize()) {
+    // Set offset and size
+    logger_->log_error("clone offset %" PRId64 " and size %" PRId64 " exceed parent size
%" PRIu64, offset, size, parent->getSize());
+    return nullptr;
+  }
   std::shared_ptr<core::FlowFile> record = this->create(parent);
   if (record) {
     logger_->log_debug("Cloned parent flow files %s to %s, with %u:%u", parent->getUUIDStr(),
record->getUUIDStr(), offset, size);
     if (parent->getResourceClaim()) {
-      if ((uint64_t) (offset + size) > parent->getSize()) {
-        // Set offset and size
-        logger_->log_error("clone offset %" PRId64 " and size %" PRId64 " exceed parent
size %" PRIu64, offset, size, parent->getSize());
-        // Remove the Add FlowFile for the session
-        auto it = this->_addedFlowFiles.find(record->getUUIDStr());
-        if (it != this->_addedFlowFiles.end())
-          this->_addedFlowFiles.erase(record->getUUIDStr());
-        return nullptr;
-      }
       record->setOffset(parent->getOffset() + offset);
       record->setSize(size);
       // Copy Resource Claim
@@ -180,7 +180,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(const std::shared_ptr<core
 
 void ProcessSession::remove(const std::shared_ptr<core::FlowFile> &flow) {
   flow->setDeleted(true);
-  _deletedFlowFiles[flow->getUUIDStr()] = flow;
+  _deletedFlowFiles.push_back(flow);
   std::string reason = process_context_->getProcessorNode()->getName() + " drop flow
record " + flow->getUUIDStr();
   provenance_report_->drop(flow, reason);
 }
@@ -207,7 +207,9 @@ void ProcessSession::penalize(const std::shared_ptr<core::FlowFile>
&flow) {
 
 void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship
relationship) {
   logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() <<
" from " << process_context_->getProcessorNode()->getName() << " to relationship
" << relationship.getName();
-  _transferRelationship[flow->getUUIDStr()] = relationship;
+  utils::Identifier uuid;
+  flow->getUUID(uuid);
+  _transferRelationship[uuid] = relationship;
   flow->setDeleted(false);
 }
 
@@ -629,46 +631,55 @@ void ProcessSession::restore(const std::string &key, const std::shared_ptr<core:
   flow->clearStashClaim(key);
 }
 
+ProcessSession::RouteResult ProcessSession::routeFlowFile(const std::shared_ptr<FlowFile>
&record) {
+  if (record->isDeleted()) {
+    return RouteResult::Ok_Deleted;
+  }
+  utils::Identifier uuid;
+  record->getUUID(uuid);
+  auto itRelationship = _transferRelationship.find(uuid);
+  if (itRelationship == _transferRelationship.end()) {
+    return RouteResult::Error_NoRelationship;
+  }
+  Relationship relationship = itRelationship->second;
+  // Find the relationship, we need to find the connections for that relationship
+  std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode()->getOutGoingConnections(relationship.getName());
+  if (connections.empty()) {
+    // No connection
+    if (!process_context_->getProcessorNode()->isAutoTerminated(relationship)) {
+      // Not autoterminate, we should have the connect
+      std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
+      throw Exception(PROCESS_SESSION_EXCEPTION, message);
+    } else {
+      // Autoterminated
+      remove(record);
+    }
+  } else {
+    // We connections, clone the flow and assign the connection accordingly
+    for (auto itConnection = connections.begin(); itConnection != connections.end(); ++itConnection)
{
+      std::shared_ptr<Connectable> connection = *itConnection;
+      if (itConnection == connections.begin()) {
+        // First connection which the flow need be routed to
+        record->setConnection(connection);
+      } else {
+        // Clone the flow file and route to the connection
+        std::shared_ptr<core::FlowFile> cloneRecord = this->cloneDuringTransfer(record);
+        if (cloneRecord)
+          cloneRecord->setConnection(connection);
+        else
+          throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer
" + record->getUUIDStr());
+      }
+    }
+  }
+  return RouteResult::Ok_Routed;
+}
+
 void ProcessSession::commit() {
   try {
-    // First we clone the flow record based on the transfered relationship for updated flow
record
+    // First we clone the flow record based on the transferred relationship for updated flow
record
     for (auto && it : _updatedFlowFiles) {
-      std::shared_ptr<core::FlowFile> record = it.second;
-      if (record->isDeleted())
-        continue;
-      auto itRelationship = this->_transferRelationship.find(record->getUUIDStr());
-      if (itRelationship != _transferRelationship.end()) {
-        Relationship relationship = itRelationship->second;
-        // Find the relationship, we need to find the connections for that relationship
-        std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode()->getOutGoingConnections(relationship.getName());
-        if (connections.empty()) {
-          // No connection
-          if (!process_context_->getProcessorNode()->isAutoTerminated(relationship))
{
-            // Not autoterminate, we should have the connect
-            std::string message = "Connect empty for non auto terminated relationship " +
relationship.getName();
-            throw Exception(PROCESS_SESSION_EXCEPTION, message);
-          } else {
-            // Autoterminated
-            remove(record);
-          }
-        } else {
-          // We connections, clone the flow and assign the connection accordingly
-          for (auto itConnection = connections.begin(); itConnection != connections.end();
++itConnection) {
-            std::shared_ptr<Connectable> connection = *itConnection;
-            if (itConnection == connections.begin()) {
-              // First connection which the flow need be routed to
-              record->setConnection(connection);
-            } else {
-              // Clone the flow file and route to the connection
-              std::shared_ptr<core::FlowFile> cloneRecord = this->cloneDuringTransfer(record);
-              if (cloneRecord)
-                cloneRecord->setConnection(connection);
-              else
-                throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer
" + record->getUUIDStr());
-            }
-          }
-        }
-      } else {
+      auto record = it.second.modified;
+      if (routeFlowFile(record) == RouteResult::Error_NoRelationship) {
         // Can not find relationship for the flow
         throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship
for the updated flow " + record->getUUIDStr());
       }
@@ -676,44 +687,8 @@ void ProcessSession::commit() {
 
     // Do the same thing for added flow file
     for (const auto& it : _addedFlowFiles) {
-      std::shared_ptr<core::FlowFile> record = it.second;
-      if (record->isDeleted())
-        continue;
-      auto itRelationship = this->_transferRelationship.find(record->getUUIDStr());
-      if (itRelationship != _transferRelationship.end()) {
-        Relationship relationship = itRelationship->second;
-        // Find the relationship, we need to find the connections for that relationship
-        std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode()->getOutGoingConnections(relationship.getName());
-        if (connections.empty()) {
-          // No connection
-          if (!process_context_->getProcessorNode()->isAutoTerminated(relationship))
{
-            // Not autoterminate, we should have the connect
-            std::string message = "Connect empty for non auto terminated relationship " +
relationship.getName();
-            throw Exception(PROCESS_SESSION_EXCEPTION, message);
-          } else {
-            logger_->log_debug("added flow file is auto terminated");
-            // Auto-terminated
-            remove(record);
-          }
-        } else {
-          // We connections, clone the flow and assign the connection accordingly
-          for (auto itConnection = connections.begin(); itConnection != connections.end();
++itConnection) {
-            std::shared_ptr<Connectable> connection(*itConnection);
-            if (itConnection == connections.begin()) {
-              // First connection which the flow need be routed to
-              record->setConnection(connection);
-            } else {
-              // Clone the flow file and route to the connection
-              std::shared_ptr<core::FlowFile> cloneRecord;
-              cloneRecord = this->cloneDuringTransfer(record);
-              if (cloneRecord)
-                cloneRecord->setConnection(connection);
-              else
-                throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer"
+ record->getUUIDStr());
-            }
-          }
-        }
-      } else {
+      auto record = it.second;
+      if (routeFlowFile(record) == RouteResult::Error_NoRelationship) {
         // Can not find relationship for the flow
         throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship
for the added flow " + record->getUUIDStr());
       }
@@ -724,7 +699,7 @@ void ProcessSession::commit() {
     std::shared_ptr<Connectable> connection = nullptr;
     // Complete process the added and update flow files for the session, send the flow file
to its queue
     for (const auto &it : _updatedFlowFiles) {
-      std::shared_ptr<core::FlowFile> record = it.second;
+      auto record = it.second.modified;
       logger_->log_trace("See %s in %s", record->getUUIDStr(), "_updatedFlowFiles");
       if (record->isDeleted()) {
         continue;
@@ -736,7 +711,7 @@ void ProcessSession::commit() {
       }
     }
     for (const auto &it : _addedFlowFiles) {
-      std::shared_ptr<core::FlowFile> record = it.second;
+      auto record = it.second;
       logger_->log_trace("See %s in %s", record->getUUIDStr(), "_addedFlowFiles");
       if (record->isDeleted()) {
         continue;
@@ -747,8 +722,7 @@ void ProcessSession::commit() {
       }
     }
     // Process the clone flow files
-    for (const auto &it : _clonedFlowFiles) {
-      std::shared_ptr<core::FlowFile> record = it.second;
+    for (const auto &record : _clonedFlowFiles) {
       logger_->log_trace("See %s in %s", record->getUUIDStr(), "_clonedFlowFiles");
       if (record->isDeleted()) {
         continue;
@@ -759,8 +733,7 @@ void ProcessSession::commit() {
       }
     }
 
-    for (const auto& it : _deletedFlowFiles) {
-        auto record = it.second;
+    for (const auto& record : _deletedFlowFiles) {
         if (!record->isDeleted()) {
           continue;
         }
@@ -772,7 +745,7 @@ void ProcessSession::commit() {
 
     content_session_->commit();
 
-    persistFlowFilesBeforeTransfer(connectionQueues, _flowFileSnapShots);
+    persistFlowFilesBeforeTransfer(connectionQueues, _updatedFlowFiles);
 
     for (auto& cq : connectionQueues) {
       auto connection = std::dynamic_pointer_cast<Connection>(cq.first);
@@ -790,7 +763,6 @@ void ProcessSession::commit() {
     _addedFlowFiles.clear();
     _clonedFlowFiles.clear();
     _deletedFlowFiles.clear();
-    _flowFileSnapShots.clear();
 
     _transferRelationship.clear();
     // persistent the provenance report
@@ -813,9 +785,9 @@ void ProcessSession::rollback() {
   try {
     // Requeue the snapshot of the flowfile back
     for (const auto &it : _updatedFlowFiles) {
-      auto flowFile = it.second;
+      auto flowFile = it.second.modified;
       // restore flowFile to original state
-      *flowFile = *_flowFileSnapShots.find(it.first)->second;
+      *flowFile = *it.second.snapshot;
       logger_->log_debug("ProcessSession rollback for %s, record %s, to connection %s",
           process_context_->getProcessorNode()->getName(),
           flowFile->getUUIDStr(),
@@ -823,8 +795,8 @@ void ProcessSession::rollback() {
       connectionQueues[flowFile->getConnection()].push_back(flowFile);
     }
 
-    for (const auto& it : _deletedFlowFiles) {
-      it.second->setDeleted(false);
+    for (const auto& record : _deletedFlowFiles) {
+      record->setDeleted(false);
     }
 
     // put everything back where it came from
@@ -841,8 +813,6 @@ void ProcessSession::rollback() {
 
     content_session_->rollback();
 
-    _flowFileSnapShots.clear();
-
     _clonedFlowFiles.clear();
     _addedFlowFiles.clear();
     _updatedFlowFiles.clear();
@@ -859,7 +829,7 @@ void ProcessSession::rollback() {
 
 void ProcessSession::persistFlowFilesBeforeTransfer(
     std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<core::FlowFile>
> >& transactionMap,
-    const std::map<std::string, std::shared_ptr<FlowFile>>& originalFlowFileSnapShots)
{
+    const std::map<utils::Identifier, FlowFileUpdate>& modifiedFlowFiles) {
 
   std::vector<std::pair<std::string, std::unique_ptr<io::BufferStream>>>
flowData;
 
@@ -895,8 +865,10 @@ void ProcessSession::persistFlowFilesBeforeTransfer(
     const bool shouldDropEmptyFiles = connection ? connection->getDropEmptyFlowFiles()
: false;
     auto& flows = transaction.second;
     for (auto &ff : flows) {
-      auto snapshotIt = originalFlowFileSnapShots.find(ff->getUUIDStr());
-      auto original = snapshotIt != originalFlowFileSnapShots.end() ? snapshotIt->second
: nullptr;
+      utils::Identifier uuid;
+      ff->getUUID(uuid);
+      auto snapshotIt = modifiedFlowFiles.find(uuid);
+      auto original = snapshotIt != modifiedFlowFiles.end() ? snapshotIt->second.snapshot
: nullptr;
       if (shouldDropEmptyFiles && ff->getSize() == 0) {
         // the receiver promised to drop this FF, no need for it anymore
         if (ff->isStored() && flowFileRepo->Delete(ff->getUUIDStr())) {
@@ -947,17 +919,16 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
     if (ret) {
       // add the flow record to the current process session update map
       ret->setDeleted(false);
-      _updatedFlowFiles[ret->getUUIDStr()] = ret;
       std::shared_ptr<FlowFile> snapshot = std::make_shared<FlowFileRecord>();
       *snapshot = *ret;
+      logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr());
+      utils::Identifier uuid;
+      ret->getUUID(uuid);
+      _updatedFlowFiles[uuid] = {ret, snapshot};
       auto flow_version = process_context_->getProcessorNode()->getFlowIdentifier();
       if (flow_version != nullptr) {
         ret->setAttribute(SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId());
       }
-      logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr());
-      // save a snapshot
-      auto result = _flowFileSnapShots.emplace(snapshot->getUUIDStr(), std::move(snapshot));
-      assert(result.second);
       return ret;
     }
     current = std::static_pointer_cast<Connection>(process_context_->getProcessorNode()->pickIncomingConnection());
@@ -984,7 +955,7 @@ bool ProcessSession::outgoingConnectionsFull(const std::string& relationship)
{
 
 bool ProcessSession::existsFlowFileInRelationship(const Relationship &relationship) {
   return std::any_of(_transferRelationship.begin(), _transferRelationship.end(),
-      [&relationship](const std::map<std::string, Relationship>::value_type &key_value_pair)
{
+      [&relationship](const std::map<utils::Identifier, Relationship>::value_type
&key_value_pair) {
         return relationship == key_value_pair.second;
   });
 }
diff --git a/libminifi/src/utils/Id.cpp b/libminifi/src/utils/Id.cpp
index 0264833..dc8132f 100644
--- a/libminifi/src/utils/Id.cpp
+++ b/libminifi/src/utils/Id.cpp
@@ -103,15 +103,42 @@ bool Identifier::operator==(const Identifier& other) const {
   return data_ == other.data_;
 }
 
-std::string Identifier::to_string() const {
-  char uuidStr[37]{};  // 36+1 for the \0
-  snprintf(uuidStr, sizeof(uuidStr), UUID_FORMAT_STRING,
-           data_[0], data_[1], data_[2], data_[3],
-           data_[4], data_[5],
-           data_[6], data_[7],
-           data_[8], data_[9],
-           data_[10], data_[11], data_[12], data_[13], data_[14], data_[15]);
-  return {uuidStr};
+bool Identifier::operator<(const Identifier &other) const {
+  return data_ < other.data_;
+}
+
+SmallString<36> Identifier::to_string() const {
+  SmallString<36> uuidStr;
+  // xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx is 36 long: 16 bytes * 2 hex digits / byte + 4
hyphens
+  int byteIdx = 0;
+  int charIdx = 0;
+
+  // [xxxxxxxx]-xxxx-xxxx-xxxx-xxxxxxxxxxxx
+  while (byteIdx < 4) {
+    uuidStr[charIdx++] = hex_lut[data_[byteIdx] >> 4];
+    uuidStr[charIdx++] = hex_lut[data_[byteIdx++] & 0xf];
+  }
+  // xxxxxxxx[-]xxxx-xxxx-xxxx-xxxxxxxxxxxx
+  uuidStr[charIdx++] = '-';
+
+  // xxxxxxxx-[xxxx-xxxx-xxxx-]xxxxxxxxxxxx - 3x 2 bytes and a hyphen
+  for (int idx = 0; idx < 3; ++idx) {
+    uuidStr[charIdx++] = hex_lut[data_[byteIdx] >> 4];
+    uuidStr[charIdx++] = hex_lut[data_[byteIdx++] & 0xf];
+    uuidStr[charIdx++] = hex_lut[data_[byteIdx] >> 4];
+    uuidStr[charIdx++] = hex_lut[data_[byteIdx++] & 0xf];
+    uuidStr[charIdx++] = '-';
+  }
+
+  // xxxxxxxx-xxxx-xxxx-xxxx-[xxxxxxxxxxxx] - the rest, i.e. until byte 16
+  while (byteIdx < 16) {
+    uuidStr[charIdx++] = hex_lut[data_[byteIdx] >> 4];
+    uuidStr[charIdx++] = hex_lut[data_[byteIdx++] & 0xf];
+  }
+
+  // null terminator
+  uuidStr[charIdx] = 0;
+  return uuidStr;
 }
 
 utils::optional<Identifier> Identifier::parse(const std::string &str) {
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index f021013..a53a27f 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -170,7 +170,7 @@ void clear_content_repo(const nifi_instance * instance) {
 }
 
 void get_proc_uuid_from_processor(standalone_processor * proc, char * uuid_target) {
-    strcpy(uuid_target, proc->getUUIDStr().c_str());
+    strcpy(uuid_target, proc->getUUIDStr().data());
 }
 
 void get_proc_uuid_from_context(const processor_context * ctx, char * uuid_target) {


Mime
View raw message