nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #888: MINIFICPP-1321 Do not override MergeContent output attributes
Date Tue, 01 Sep 2020 08:22:01 GMT

hunyadi-dev commented on a change in pull request #888:
URL: https://github.com/apache/nifi-minifi-cpp/pull/888#discussion_r480959718



##########
File path: extensions/libarchive/MergeContent.cpp
##########
@@ -270,93 +280,78 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio
     return false;
   }
 
-  std::shared_ptr<core::FlowFile> mergeFlow;
   try {
-    mergeFlow = mergeBin->merge(context, session, bin->getFlowFile(), headerContent_,
footerContent_, demarcatorContent_);
+    mergeBin->merge(context, session, bin->getFlowFile(), headerContent_, footerContent_,
demarcatorContent_, merge_flow);
   } catch (...) {
     logger_->log_error("Merge Content merge catch exception");
     return false;
   }
-  session->putAttribute(mergeFlow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize()));
-
-  if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON)
-    KeepOnlyCommonAttributesMerger(bin->getFlowFile()).mergeAttributes(session, mergeFlow);
-  else if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE)
-    KeepAllUniqueAttributesMerger(bin->getFlowFile()).mergeAttributes(session, mergeFlow);
-  else {
-    logger_->log_error("Attribute strategy not supported %s", attributeStrategy_);
-    throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Invalid attribute strategy:
" + attributeStrategy_);
-  }
+  session->putAttribute(merge_flow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize()));
 
   // we successfully merge the flow
-  session->transfer(mergeFlow, Merge);
+  session->transfer(merge_flow, Merge);
   std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
   for (auto flow : flows) {
     session->transfer(flow, Original);
   }
-  logger_->log_info("Merge FlowFile record UUID %s, payload length %d", mergeFlow->getUUIDStr(),
mergeFlow->getSize());
+  logger_->log_info("Merge FlowFile record UUID %s, payload length %d", merge_flow->getUUIDStr(),
merge_flow->getSize());
 
   return true;
 }
 
-std::shared_ptr<core::FlowFile> BinaryConcatenationMerge::merge(core::ProcessContext
*context, core::ProcessSession *session,
-        std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header,
std::string &footer, std::string &demarcator) {
-  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord
> (session->create());
+void BinaryConcatenationMerge::merge(core::ProcessContext *context, core::ProcessSession
*session,
+    std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header,
std::string &footer, std::string &demarcator,
+    const std::shared_ptr<core::FlowFile> &merge_flow) {
   BinaryConcatenationMerge::WriteCallback callback(header, footer, demarcator, flows, session);
-  session->write(flowFile, &callback);
-  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), getMergedContentType());
+  session->write(merge_flow, &callback);
+  session->putAttribute(merge_flow, FlowAttributeKey(MIME_TYPE), getMergedContentType());
   std::string fileName;
   if (flows.size() == 1) {
     flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
   } else {
     flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName);
   }
   if (!fileName.empty())
-    session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
-  return flowFile;
+    session->putAttribute(merge_flow, FlowAttributeKey(FILENAME), fileName);
 }
 
-std::shared_ptr<core::FlowFile> TarMerge::merge(core::ProcessContext *context, core::ProcessSession
*session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string
&header,
-    std::string &footer, std::string &demarcator) {
-  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord
> (session->create());
+void TarMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>>
&flows, std::string &header,

Review comment:
       Recommendation: unused arguments should not be named ->
   https://google.github.io/styleguide/cppguide.html#Function_Declarations_and_Definitions




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message