nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi] turcsanyip commented on a change in pull request #3702: NIFI-6636: Fixed ListGCSBucket file duplication error
Date Sun, 08 Sep 2019 20:57:31 GMT
turcsanyip commented on a change in pull request #3702: NIFI-6636: Fixed ListGCSBucket file
duplication error
URL: https://github.com/apache/nifi/pull/3702#discussion_r322028981
 
 

 ##########
 File path: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
 ##########
 @@ -381,40 +384,36 @@ public void onTrigger(ProcessContext context, ProcessSession session)
throws Pro
                 // Update state
                 if (lastModified > maxTimestamp) {
                     maxTimestamp = lastModified;
-                    currentKeys.clear();
+                    maxKeys.clear();
                 }
                 if (lastModified == maxTimestamp) {
-                    currentKeys.add(blob.getName());
+                    maxKeys.add(blob.getName());
                 }
-                listCount++;
+                loadCount++;
             }
 
-            blobPages = blobPages.getNextPage();
-            commit(context, session, listCount);
-            listCount = 0;
-        } while (blobPages != null);
-
-        currentTimestamp = maxTimestamp;
+            commit(context, session, loadCount);
 
-        final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-        getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket,
listMillis});
+            blobPage = blobPage.getNextPage();
+        } while (blobPage != null);
 
-        if (!commit(context, session, listCount)) {
-            if (currentTimestamp > 0) {
-                persistState(context);
-            }
-            getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", new Object[]{bucket});
+        if (maxTimestamp != 0) {
+            currentTimestamp = maxTimestamp;
+            currentKeys = maxKeys;
+            persistState(context);
+        } else {
+            getLogger().debug("No new objects in GCS bucket {} to load. Yielding.", new Object[]{bucket});
             context.yield();
         }
+
+        final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket,
listMillis});
     }
 
-    private boolean commit(final ProcessContext context, final ProcessSession session, int
listCount) {
-        boolean willCommit = listCount > 0;
-        if (willCommit) {
-            getLogger().info("Successfully listed {} new files from GCS; routing to success",
new Object[] {listCount});
+    private void commit(final ProcessContext context, final ProcessSession session, int loadCount)
{
+        if (loadCount > 0) {
+            getLogger().info("Successfully loaded {} new files from GCS; routing to success",
new Object[] {loadCount});
             session.commit();
-            persistState(context);
 
 Review comment:
   @markap14 Thanks for the quick feedback!
   I agree with you that there should be some checkpointing / recovery mechanism but the previous
solution did not work either.
   `persistState()` saves `currentTimestamp` and `currentKeys`. `currentTimestamp` gets updated
only at the end, after the while cycle over the blob pages, so it is useless to save it within
the cycle. On the other hand, we can't update `currentTimestamp` in the cycle because the
blobs are not sorted by time (that is the next cycle / blob page can contain items that haven't
been loaded yet but which are older than the max timestamp from the previous page).
   Previously, only `currentKeys` were updated in the cycle but it was an inconsistent state
without the timestamp, furthermore caused the current bug. So I removed it and also the useless
`persistState()` call.
   
   Implementing a proper solution to this problem could be the scope of a separate issue I
think.
   I could not find a way to parameterize the GCS call to return the blobs sorted by last
modification time (only alphabetical order).
   We could fetch all the blobs, sort them on our side and then create the flowfiles in a
cycle with saving the current state. However, its performance impact (memory considerations)
must be investigated beforehand.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message