nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.
Date Wed, 01 Jul 2020 08:24:13 GMT

szaszm commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r445652913



##########
File path: extensions/libarchive/BinFiles.cpp
##########
@@ -153,7 +156,7 @@ void BinManager::gatherReadyBins() {
 void BinManager::removeOldestBin() {
   std::lock_guard < std::mutex > lock(mutex_);
   uint64_t olddate = ULLONG_MAX;
-  std::unique_ptr < std::deque<std::unique_ptr<Bin>>>*oldqueue;
+  std::unique_ptr < std::deque<std::unique_ptr<Bin>>>* oldqueue;

Review comment:
       not your issue, but wtf: pointer to a smart ptr to a collection of smart ptrs to Bin.

##########
File path: libminifi/include/core/ContentRepository.h
##########
@@ -104,7 +104,7 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim>
{
     if (count != count_map_.end() && count->second > 0) {
       count_map_[str] = count->second - 1;
     } else {
-	count_map_.erase(str);
+	    count_map_.erase(str);

Review comment:
       wrong indentation

##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -30,9 +30,56 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>&
claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&&
claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_)
{
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_))
{
+      // taking ownership of claim, no need to increment/decrement
+    }
+    FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr&
ref) = delete;
+    FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&&
ref) = delete;
+
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr&
ref) {
+      return set(owner, ref.claim_);
+    }
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>&
newClaim) {
+      auto oldClaim = claim_;
+      claim_ = newClaim;
+      // the order of increase/release is important
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+      if (oldClaim) owner.releaseClaim(oldClaim);
+      return *this;
+    }
+    const std::shared_ptr<ResourceClaim>& get() const {
+      return claim_;
+    }
+    const std::shared_ptr<ResourceClaim>& operator->() const {
+      return claim_;
+    }
+    operator bool() const noexcept {
+      return static_cast<bool>(claim_);
+    }
+    ~FlowFileOwnedResourceClaimPtr() {
+      // allow the owner FlowFile to manually release the claim
+      // while logging stuff and removing it from repositories
+      assert(!claim_);
+    }
+   private:
+    std::shared_ptr<ResourceClaim> claim_;
+  };
  public:
   FlowFile();
-  ~FlowFile();
+  virtual ~FlowFile();

Review comment:
       Add `override` instead for clarification.

##########
File path: libminifi/test/BufferReader.h
##########
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_BUFFERREADER_H
+#define NIFI_MINIFI_CPP_BUFFERREADER_H
+
+#include "FlowFileRecord.h"
+
+class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+  explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
+  template<class Input>
+  int write(Input input, std::size_t len) {
+    uint8_t tmpBuffer[4096]{};
+    int total_read = 0;
+    do {
+      auto ret = input.read(tmpBuffer, std::min(len, sizeof(tmpBuffer)));
+      if (ret == 0) break;
+      if (ret < 0) return ret;
+      len -= ret;

Review comment:
       I think the code would be easier to understand if we didn't change the argument, but
used a separate `read` or  `remaining` variable. The less moving parts, the better. The generated
code would most likely be the same.

##########
File path: libminifi/test/BufferReader.h
##########
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_BUFFERREADER_H
+#define NIFI_MINIFI_CPP_BUFFERREADER_H
+
+#include "FlowFileRecord.h"
+
+class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+  explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
+  template<class Input>
+  int write(Input input, std::size_t len) {
+    uint8_t tmpBuffer[4096]{};
+    int total_read = 0;
+    do {
+      auto ret = input.read(tmpBuffer, std::min(len, sizeof(tmpBuffer)));
+      if (ret == 0) break;
+      if (ret < 0) return ret;
+      len -= ret;
+      total_read += ret;
+      auto prevSize = buffer_.size();
+      buffer_.resize(prevSize + ret);
+      std::move(tmpBuffer, tmpBuffer + ret, buffer_.data() + prevSize);
+    } while (len);

Review comment:
       I'm not a fan of utilizing implicit integer -> bool conversion. Also, a pretest
cycle would avoid calling `input.read` on `len == 0`.
   ```suggestion
       } while (len > 0);
   ```

##########
File path: libminifi/test/BufferReader.h
##########
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_BUFFERREADER_H
+#define NIFI_MINIFI_CPP_BUFFERREADER_H
+
+#include "FlowFileRecord.h"
+
+class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+  explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
+  template<class Input>
+  int write(Input input, std::size_t len) {

Review comment:
       What is the type of `input` meant to be? I'd like to see some type or trait/concept
checks.
   All I can see is that it has to have a `read` member function that takes a uint8_t array
and a size.

##########
File path: extensions/libarchive/BinFiles.cpp
##########
@@ -70,6 +71,8 @@ void BinFiles::initialize() {
   relationships.insert(Original);
   relationships.insert(Failure);
   setSupportedRelationships(relationships);
+
+  out_going_connections_[Self.getName()].insert(shared_from_this());

Review comment:
       This is a shared_ptr cycle.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message