nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
Date Mon, 24 Feb 2020 15:50:24 GMT
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383331881
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -248,21 +239,119 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
     }
   }
 
-  if (subscriptionHandle_) {
-    logger_->log_error("Processor already subscribed to Event Log, expected cleanup to
unsubscribe.");
-  } else {
-    sessionFactory_ = sessionFactory;
+  context->getProperty(Channel.getName(), channel_);
+  wstrChannel_ = std::wstring(channel_.begin(), channel_.end());
+
+  std::string query;
+  context->getProperty(Query.getName(), query);
+  wstrQuery_ = std::wstring(query.begin(), query.end());
 
-    subscribe(context);
+  if (!pBookmark_) {
+    std::string bookmarkDir;
+    context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+    if (bookmarkDir.empty()) {
+      logger_->log_error("State Directory is empty");
+      return;
+    }
+    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir,
getUUIDStr(), logger_);
+    if (!*pBookmark_) {
+      pBookmark_.reset();
+      return;
+    }
   }
+
+  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
+
+  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
 }
 
-void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!subscriptionHandle_) {
-    if (!subscribe(context)) {
-      context->yield();
+void ConsumeWindowsEventLog::processEventsAfterBookmark(core::ProcessSession& session)
{
+  // External loop is used in case if there are new events while the events after bookmark
are processed.
+  bool hasEvent{};
+  do {
+    hasEvent = false;
+
+    auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+    if (!hEventResults) {
+      logger_->log_error("!EvtQuery error: %d.", GetLastError());
       return;
     }
+    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults);
});
+
+    if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark))
{
+      logger_->log_error("!EvtSeek error: %d.", GetLastError());
+      return;
+    }
+
+    size_t eventCount = 0;
+    auto before_time = std::chrono::high_resolution_clock::now();
+    utils::ScopeGuard timeGuard([&]() {
+      logger_->log_debug("processed %d Events in %llu ms",
+                         eventCount,
+                         std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()
- before_time).count());
+    });
+
+    // Enumerate the events in the result set after the bookmarked event.
+    bool commitAndSaveBookmark{};
+    std::wstring bookmarkXml;
+    while (true) {
+      EVT_HANDLE hEvent{};
+      DWORD dwReturned{};
+      if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+        DWORD status = ERROR_SUCCESS;
+        if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
+          logger_->log_error("!EvtNext error %d.", status);
+        }
+        break;
+      }
+      const utils::ScopeGuard guard_hEvent([hEvent]() { EvtClose(hEvent); });
+
+      commitAndSaveBookmark = false;
+      EventRender renderedData;
+      if (processEvent(hEvent, renderedData)) {
+        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
+          commitAndSaveBookmark = true;
+
+          eventCount++;
+          processEventRender(renderedData, session);
+
+          if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0))
{
+            auto before_commit = std::chrono::high_resolution_clock::now();
+            session.commit();
+            logger_->log_debug("processQueue commit took %llu ms",
+                               std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()
- before_commit).count());
+
+            pBookmark_->saveBookmarkXml(bookmarkXml);
+
+            if (session.outgoingConnectionsFull("success")) {
+              return;
+            }
+
+            commitAndSaveBookmark = false;
+          }
+        }
+      }
+
+      hasEvent = true;
+    }
+
+    if (commitAndSaveBookmark) {
+      session.commit();
 
 Review comment:
   Can we postpone `commit()` to outside the outermost loop, running it only once every `onTrigger()`
call, unless specified otherwise via `batch_commit_size_`? For performance reasons.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message