nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
Date Mon, 10 Feb 2020 09:49:24 GMT
bakaid commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty
flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r376957040
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -194,120 +187,142 @@ class PublishKafka : public core::Processor {
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    struct rd_kafka_headers_deleter {
+      void operator()(rd_kafka_headers_t* ptr) const noexcept {
+        rd_kafka_headers_destroy(ptr);
+      }
+    };
+
+    using rd_kafka_headers_unique_ptr = std::unique_ptr<rd_kafka_headers_t, rd_kafka_headers_deleter>;
+
+   private:
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file)
{
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    static rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& flow_file,
utils::Regex& attribute_name_regex) {
+      const utils::owner<rd_kafka_headers_t*> result{ rd_kafka_headers_new(8) };
+      if (!result) { throw std::bad_alloc{}; }
+
+      for (const auto& kv : flow_file.getAttributes()) {
 
 Review comment:
   Compilation fails with
   ```
   /Users/danielbakai/nifi-minifi-cpp/extensions/librdkafka/PublishKafka.h:212:29: error:
'this' argument to
         member function 'getAttributes' has type 'const core::FlowFile', but function is
not marked const
         for (const auto& kv : flow_file.getAttributes()) {
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message