nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject nifi git commit: NIFI-4439: This closes #2190. When a Provenance Event File is rolled over, we were failing to close the resource before attempting to compress it. Fixed that. NIFI-4439: Addressed threading bug that can occur when rolling over provenance
Date Mon, 16 Oct 2017 21:18:16 GMT
Repository: nifi
Updated Branches:
  refs/heads/master ba8f17bac -> b950eed1a


NIFI-4439: This closes #2190. When a Provenance Event File is rolled over, we were failing
to close the resource before attempting to compress it. Fixed that.
NIFI-4439: Addressed threading bug that can occur when rolling over provenance record writer

Signed-off-by: joewitt <joewitt@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b950eed1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b950eed1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b950eed1

Branch: refs/heads/master
Commit: b950eed1a5a3c7c604ecc6c46edc397fda597cfe
Parents: ba8f17b
Author: Mark Payne <markap14@hotmail.com>
Authored: Tue Oct 3 09:44:54 2017 -0400
Committer: joewitt <joewitt@apache.org>
Committed: Mon Oct 16 17:18:01 2017 -0400

----------------------------------------------------------------------
 .../store/WriteAheadStorePartition.java         | 59 +++++++++++---------
 1 file changed, 34 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b950eed1/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
index fde76f5..6c5cc8d 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
@@ -253,38 +253,47 @@ public class WriteAheadStorePartition implements EventStorePartition
{
         final long nextEventId = idGenerator.get();
         final File updatedEventFile = new File(partitionDirectory, nextEventId + ".prov");
         final RecordWriter updatedWriter = recordWriterFactory.createWriter(updatedEventFile,
idGenerator, false, true);
-        final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(),
config.getMaxEventFileCount());
-        final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease);
 
-        if (updated) {
-            updatedWriter.writeHeader(nextEventId);
+        // Synchronize on the writer to ensure that no other thread is able to obtain the
writer and start writing events to it until after it has
+        // been fully initialized (i.e., the header has been written, etc.)
+        synchronized (updatedWriter) {
+            final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(),
config.getMaxEventFileCount());
+            final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease);
 
-            synchronized (minEventIdToPathMap) {
-                minEventIdToPathMap.put(nextEventId, updatedEventFile);
-            }
+            if (updated) {
+                if (lease != null) {
+                    lease.close();
+                }
 
-            if (config.isCompressOnRollover() && lease != null && lease.getWriter()
!= null) {
-                boolean offered = false;
-                while (!offered && !closed) {
-                    try {
-                        offered = filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS);
-                    } catch (final InterruptedException ie) {
-                        Thread.currentThread().interrupt();
-                        throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile()
+ " for compression");
+                updatedWriter.writeHeader(nextEventId);
+
+                synchronized (minEventIdToPathMap) {
+                    minEventIdToPathMap.put(nextEventId, updatedEventFile);
+                }
+
+                if (config.isCompressOnRollover() && lease != null && lease.getWriter()
!= null) {
+                    boolean offered = false;
+                    while (!offered && !closed) {
+                        try {
+                            offered = filesToCompress.offer(lease.getWriter().getFile(),
1, TimeUnit.SECONDS);
+                        } catch (final InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                            throw new IOException("Interrupted while waiting to enqueue "
+ lease.getWriter().getFile() + " for compression");
+                        }
                     }
                 }
-            }
 
-            return true;
-        } else {
-            try {
-                updatedWriter.close();
-            } catch (final Exception e) {
-                logger.warn("Failed to close Record Writer {}; some resources may not be
cleaned up properly.", updatedWriter, e);
-            }
+                return true;
+            } else {
+                try {
+                    updatedWriter.close();
+                } catch (final Exception e) {
+                    logger.warn("Failed to close Record Writer {}; some resources may not
be cleaned up properly.", updatedWriter, e);
+                }
 
-            updatedEventFile.delete();
-            return false;
+                updatedEventFile.delete();
+                return false;
+            }
         }
     }
 


Mime
View raw message