nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
Date Mon, 24 Feb 2020 15:50:24 GMT
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383300357
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -391,322 +467,122 @@ void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document&
do
   doc.traverse(treeWalker);
 }
 
-void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+bool ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent, EventRender& renderedData)
{
   DWORD size = 0;
   DWORD used = 0;
   DWORD propertyCount = 0;
-  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount))
{
-    if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
-      if (used > maxBufferSize_) {
-        logger_->log_error("Dropping event %x because it couldn't be rendered within %ll
bytes.", hEvent, maxBufferSize_);
-        return;
-      }
-
-      size = used;
-      std::vector<wchar_t> buf(size / 2 + 1);
-      if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount))
{
-        logger_->log_error("!EvtRender error: %d.", GetLastError());
-        return;
-      }
-
-      std::string xml = wel::to_string(&buf[0]);
-
-      pugi::xml_document doc;
-      pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
-      if (!result) {
-        logger_->log_error("Invalid XML produced");
-        return;
-      }
-      // this is a well known path. 
-      std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-      wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_,
hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
-
-      // resolve the event metadata
-      doc.traverse(walker);
-
-      EventRender renderedData;
-
-      if (writePlainText_) {
-        auto handler = getEventLogHandler(providerName);
-        auto message = handler.getEventMessage(hEvent);
-
-        if (!message.empty()) {
-
-          for (const auto &mapEntry : walker.getIdentifiers()) {
-            // replace the identifiers with their translated strings.
-            utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
-          }
-          wel::WindowsEventLogHeader log_header(header_names_);
-          // set the delimiter
-          log_header.setDelimiter(header_delimiter_);
-          // render the header.
-          renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-          renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
-          renderedData.rendered_text_ += message;
-        }
-      }
-
-      if (writeXML_) {
-        substituteXMLPercentageItems(doc);
-
-        if (resolve_as_attributes_) {
-          renderedData.matched_fields_ = walker.getFieldValues();
-        }
-
-        wel::XmlString writer;
-        doc.print(writer, "", pugi::format_raw); // no indentation or formatting
-        xml = writer.xml_;
-
-        renderedData.text_ = std::move(xml);
-      }
-
-      if (pBookmark_) {
-        std::wstring bookmarkXml;
-        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
-          renderedData.bookmarkXml_ = bookmarkXml;
-        }
-      }
-
-      listRenderedData_.enqueue(std::move(renderedData));
-    }
+  EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount);
+  if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
+    logger_->log_error("!EvtRender error %d.", GetLastError());
+    return false;
   }
-}
 
-bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring&
channel, const std::wstring& query) {
-  if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark))
{
-    logger_->log_error("!EvtSeek error %d.", GetLastError());
+  if (used > maxBufferSize_) {
+    logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.",
hEvent, maxBufferSize_);
     return false;
   }
 
-  // Enumerate the events in the result set after the bookmarked event.
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
-      DWORD status = ERROR_SUCCESS;
-      if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
-        logger_->log_error("!EvtNext error %d.", status);
-      }
-      break;
-    }
-
-    processEvent(hEvent);
-
-    EvtClose(hEvent);
+  size = used;
+  std::vector<wchar_t> buf(size / 2 + 1);
+  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount))
{
+    logger_->log_error("!EvtRender error: %d.", GetLastError());
+    return false;
   }
 
-  return true;
-}
-
+  std::string xml = wel::to_string(&buf[0]);
 
-bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContext>
&context) {
-  context->getProperty(Channel.getName(), channel_);
-  context->getProperty(Query.getName(), query_);
+  pugi::xml_document doc;
+  pugi::xml_parse_result result = doc.load_string(xml.c_str());
 
-  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
-  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
-
-  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query_;
-
-  std::string strInactiveDurationToReconnect;
-  context->getProperty(InactiveDurationToReconnect.getName(), strInactiveDurationToReconnect);
-
-  // Get 'inactiveDurationToReconnect_'.
-  core::TimeUnit unit;
-  if (core::Property::StringToTime(strInactiveDurationToReconnect, inactiveDurationToReconnect_,
unit) &&
-    core::Property::ConvertTimeUnitToMS(inactiveDurationToReconnect_, unit, inactiveDurationToReconnect_))
{
-    logger_->log_info("inactiveDurationToReconnect: [%lld] ms", inactiveDurationToReconnect_);
-  }
-
-  if (!pBookmark_) {
-    logger_->log_error("!pBookmark_");
+  if (!result) {
+    logger_->log_error("Invalid XML produced");
     return false;
   }
 
-  auto channel = std::wstring(channel_.begin(), channel_.end());
-  auto query = std::wstring(query_.begin(), query_.end());
+  // this is a well known path. 
+  std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
+  wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent,
!resolve_as_attributes_, apply_identifier_function_, regex_);
 
-  do {
-    auto hEventResults = EvtQuery(0, channel.c_str(), query.c_str(), EvtQueryChannelPath);
-    if (!hEventResults) {
-      logger_->log_error("!EvtQuery error: %d.", GetLastError());
-      // Consider it as a serious error.
-      return false;
-    }
-    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults);
});
+  // resolve the event metadata
+  doc.traverse(walker);
 
-    if (pBookmark_->hasBookmarkXml()) {
-      if (!processEventsAfterBookmark(hEventResults, channel, query)) {
-        break;
-      }
-    } else {
-      // Seek to the last event in the hEventResults.
-      if (!EvtSeek(hEventResults, 0, 0, 0, EvtSeekRelativeToLast)) {
-        logger_->log_error("!EvtSeek error: %d.", GetLastError());
-        break;
-      }
+  if (writePlainText_) {
+    auto handler = getEventLogHandler(providerName);
+    auto message = handler.getEventMessage(hEvent);
 
-      DWORD dwReturned{};
-      EVT_HANDLE hEvent{};
-      if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
-        logger_->log_error("!EvtNext error: %d.", GetLastError());
-        break;
+    if (!message.empty()) {
+
+      for (const auto &mapEntry : walker.getIdentifiers()) {
+        // replace the identifiers with their translated strings.
+        utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
       }
+      wel::WindowsEventLogHeader log_header(header_names_);
+      // set the delimiter
+      log_header.setDelimiter(header_delimiter_);
+      // render the header.
+      renderedData.rendered_text_ = log_header.getEventHeader(&walker);
+      renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
+      renderedData.rendered_text_ += message;
+    }
+  }
 
-      pBookmark_->saveBookmark(hEvent);
+  if (writeXML_) {
+    substituteXMLPercentageItems(doc);
+
+    if (resolve_as_attributes_) {
+      renderedData.matched_fields_ = walker.getFieldValues();
     }
-  } while (false);
-
-  subscriptionHandle_ = EvtSubscribe(
-      NULL,
-      NULL,
-      channel.c_str(),
-      query.c_str(),
-      NULL,
-      this,
-      [](EVT_SUBSCRIBE_NOTIFY_ACTION action, PVOID pContext, EVT_HANDLE hEvent)
-      {
-        auto pConsumeWindowsEventLog = static_cast<ConsumeWindowsEventLog*>(pContext);
-
-        auto& logger = pConsumeWindowsEventLog->logger_;
-
-        if (action == EvtSubscribeActionError) {
-          if (ERROR_EVT_QUERY_RESULT_STALE == (DWORD)hEvent) {
-            logger->log_error("Received missing event notification. Consider triggering
processor more frequently or increasing queue size.");
-          } else {
-            logger->log_error("Received the following Win32 error: %x", hEvent);
-          }
-        } else if (action == EvtSubscribeActionDeliver) {
-          pConsumeWindowsEventLog->processEvent(hEvent);
-        }
 
-        return 0UL;
-      },
-      EvtSubscribeToFutureEvents | EvtSubscribeStrict);
+    wel::XmlString writer;
+    doc.print(writer, "", pugi::format_raw); // no indentation or formatting
+    xml = writer.xml_;
 
-  if (!subscriptionHandle_) {
-    logger_->log_error("Unable to subscribe with provided parameters, received the following
error code: %d", GetLastError());
-    return false;
+    renderedData.text_ = std::move(xml);
   }
 
-  lastActivityTimestamp_ = GetTickCount64();
-
   return true;
 }
 
-void ConsumeWindowsEventLog::unsubscribe()
-{
-  if (subscriptionHandle_) {
-    EvtClose(subscriptionHandle_);
-    subscriptionHandle_ = 0;
-  }
-}
-
-int ConsumeWindowsEventLog::processQueue(const std::shared_ptr<core::ProcessSession>
&session)
+void ConsumeWindowsEventLog::processEventRender(const EventRender& renderedData, core::ProcessSession&
session)
 {
-  struct WriteCallback: public OutputStreamCallback {
+  struct WriteCallback : public OutputStreamCallback {
     WriteCallback(const std::string& str)
-      : data_(str.c_str()), size_(str.length()) {
+      : str_(str.c_str()) {
 
 Review comment:
   I think this shouldn't work. Since `str_` became a reference and we initialize it with
a `const char*`, this should create a temporary with a copy of `str`, make `str_` refer to
this temporary, and then destroy the temporary after the initialization, leaving us with a
dangling reference.
   
   This should be `str_(str)`, so that we bind `str_` to the same object as `str` refers to.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message