nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mcgil...@apache.org
Subject [22/50] [abbrv] incubator-nifi git commit: NIFI-527: Code cleanup
Date Tue, 28 Apr 2015 14:04:56 GMT
NIFI-527: Code cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3cd18b0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3cd18b0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3cd18b0b

Branch: refs/heads/NIFI-292
Commit: 3cd18b0babc5133e35a2771bc0d0acaf974c381f
Parents: 666de3d
Author: Mark Payne <markap14@hotmail.com>
Authored: Mon Apr 27 14:13:55 2015 -0400
Committer: Mark Payne <markap14@hotmail.com>
Committed: Mon Apr 27 14:13:55 2015 -0400

----------------------------------------------------------------------
 .../nifi/provenance/IndexConfiguration.java     |  12 +-
 .../PersistentProvenanceRepository.java         | 612 +++++++-------
 .../provenance/RepositoryConfiguration.java     | 106 +--
 .../nifi/provenance/StandardRecordReader.java   | 246 +++---
 .../nifi/provenance/StandardRecordWriter.java   | 138 ++--
 .../provenance/expiration/ExpirationAction.java |   6 +-
 .../provenance/lucene/DeleteIndexAction.java    |  12 +-
 .../nifi/provenance/lucene/DocsReader.java      |  79 +-
 .../nifi/provenance/lucene/IndexManager.java    | 820 +++++++++----------
 .../nifi/provenance/lucene/IndexSearch.java     |  38 +-
 .../nifi/provenance/lucene/IndexingAction.java  | 119 +--
 .../nifi/provenance/lucene/LineageQuery.java    |   6 +-
 .../nifi/provenance/lucene/LuceneUtil.java      |  38 +-
 .../provenance/rollover/CompressionAction.java  |  59 --
 .../provenance/rollover/RolloverAction.java     |  35 -
 .../provenance/serialization/RecordReader.java  |  57 +-
 .../provenance/serialization/RecordReaders.java | 136 +--
 .../provenance/serialization/RecordWriter.java  |  23 +-
 .../provenance/serialization/RecordWriters.java |   8 +-
 .../nifi/provenance/toc/StandardTocReader.java  |  44 +-
 .../nifi/provenance/toc/StandardTocWriter.java  |  35 +-
 .../apache/nifi/provenance/toc/TocReader.java   |  20 +-
 .../org/apache/nifi/provenance/toc/TocUtil.java |  27 +-
 .../apache/nifi/provenance/toc/TocWriter.java   |  16 +-
 .../TestPersistentProvenanceRepository.java     | 118 +--
 .../TestStandardRecordReaderWriter.java         | 162 ++--
 .../org/apache/nifi/provenance/TestUtil.java    |   2 +-
 .../provenance/toc/TestStandardTocReader.java   |  20 +-
 .../provenance/toc/TestStandardTocWriter.java   |   4 +-
 29 files changed, 1391 insertions(+), 1607 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
index a5474d5..3beab65 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
@@ -92,7 +92,7 @@ public class IndexConfiguration {
             }
             return firstRecord.getEventTime();
         } catch (final FileNotFoundException | EOFException fnf) {
-            return null;	// file no longer exists or there's no record in this file
+            return null; // file no longer exists or there's no record in this file
         } catch (final IOException ioe) {
             logger.warn("Failed to read first entry in file {} due to {}", provenanceLogFile, ioe.toString());
             logger.warn("", ioe);
@@ -201,7 +201,8 @@ public class IndexConfiguration {
      * desired
      * @param endTime the end time of the query for which the indices are
      * desired
-     * @return
+     * @return the index directories that are applicable only for the given time
+     * span (times inclusive).
      */
     public List<File> getIndexDirectories(final Long startTime, final Long endTime) {
         if (startTime == null && endTime == null) {
@@ -252,7 +253,8 @@ public class IndexConfiguration {
      *
      * @param provenanceLogFile the provenance log file for which the index
      * directories are desired
-     * @return
+     * @return the index directories that are applicable only for the given
+     * event log
      */
     public List<File> getIndexDirectories(final File provenanceLogFile) {
         final List<File> dirs = new ArrayList<>();
@@ -334,9 +336,7 @@ public class IndexConfiguration {
     }
 
     /**
-     * Returns the amount of disk space in bytes used by all of the indices
-     *
-     * @return
+     * @return the amount of disk space in bytes used by all of the indices
      */
     public long getIndexSize() {
         lock.lock();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 48cc164..fe89a5e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -139,7 +139,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
     private final List<ExpirationAction> expirationActions = new ArrayList<>();
 
-    private final IndexingAction indexingAction;
     private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, AsyncLineageSubmission> lineageSubmissionMap = new ConcurrentHashMap<>();
 
@@ -151,7 +150,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     private final AtomicBoolean initialized = new AtomicBoolean(false);
 
     private final AtomicBoolean repoDirty = new AtomicBoolean(false);
-    // we keep the last 1000 records on hand so that when the UI is opened and it asks for the last 1000 records we don't need to 
+    // we keep the last 1000 records on hand so that when the UI is opened and it asks for the last 1000 records we don't need to
     // read them. Since this is a very cheap operation to keep them, it's worth the tiny expense for the improved user experience.
     private final RingBuffer<ProvenanceEventRecord> latestRecords = new RingBuffer<>(1000);
     private EventReporter eventReporter;
@@ -184,13 +183,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         this.indexManager = new IndexManager();
         this.alwaysSync = configuration.isAlwaysSync();
         this.rolloverCheckMillis = rolloverCheckMillis;
-        
-        final List<SearchableField> fields = configuration.getSearchableFields();
-        if (fields != null && !fields.isEmpty()) {
-            indexingAction = new IndexingAction(this, indexConfig);
-        } else {
-            indexingAction = null;
-        }
 
         scheduledExecService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("Provenance Maintenance Thread"));
         queryExecService = Executors.newFixedThreadPool(configuration.getQueryThreadPoolSize(), new NamedThreadFactory("Provenance Query Thread"));
@@ -205,69 +197,69 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
     @Override
     public void initialize(final EventReporter eventReporter) throws IOException {
-    	writeLock.lock();
-    	try {
-	        if (initialized.getAndSet(true)) {
-	            return;
-	        }
-	
-	        this.eventReporter = eventReporter;
-	
-	        recover();
-	
-	        if (configuration.isAllowRollover()) {
-	            writers = createWriters(configuration, idGenerator.get());
-	        }
-	
-	        if (configuration.isAllowRollover()) {
-	            scheduledExecService.scheduleWithFixedDelay(new Runnable() {
-	                @Override
-	                public void run() {
-	                    // Check if we need to roll over
-	                    if (needToRollover()) {
-	                        // it appears that we do need to roll over. Obtain write lock so that we can do so, and then
-	                        // confirm that we still need to.
-	                        writeLock.lock();
-	                        try {
-	                            logger.debug("Obtained write lock to perform periodic rollover");
-	
-	                            if (needToRollover()) {
-	                                try {
-	                                    rollover(false);
-	                                } catch (final Exception e) {
-	                                    logger.error("Failed to roll over Provenance Event Log due to {}", e.toString());
-	                                    logger.error("", e);
-	                                }
-	                            }
-	                        } finally {
-	                            writeLock.unlock();
-	                        }
-	                    }
-	                }
-	            }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS);
-	
-	            scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
-	            scheduledExecService.scheduleWithFixedDelay(new Runnable() {
-	                @Override
-	                public void run() {
-	                    try {
-	                        purgeOldEvents();
-	                    } catch (final Exception e) {
-	                        logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString());
-	                        if (logger.isDebugEnabled()) {
-	                            logger.error("", e);
-	                        }
-	                        eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString());
-	                    }
-	                }
-	            }, 1L, 1L, TimeUnit.MINUTES);
-	
-	            expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager));
-	            expirationActions.add(new FileRemovalAction());
-	        }
-    	} finally {
-    		writeLock.unlock();
-    	}
+        writeLock.lock();
+        try {
+            if (initialized.getAndSet(true)) {
+                return;
+            }
+
+            this.eventReporter = eventReporter;
+
+            recover();
+
+            if (configuration.isAllowRollover()) {
+                writers = createWriters(configuration, idGenerator.get());
+            }
+
+            if (configuration.isAllowRollover()) {
+                scheduledExecService.scheduleWithFixedDelay(new Runnable() {
+                    @Override
+                    public void run() {
+                        // Check if we need to roll over
+                        if (needToRollover()) {
+                            // it appears that we do need to roll over. Obtain write lock so that we can do so, and then
+                            // confirm that we still need to.
+                            writeLock.lock();
+                            try {
+                                logger.debug("Obtained write lock to perform periodic rollover");
+
+                                if (needToRollover()) {
+                                    try {
+                                        rollover(false);
+                                    } catch (final Exception e) {
+                                        logger.error("Failed to roll over Provenance Event Log due to {}", e.toString());
+                                        logger.error("", e);
+                                    }
+                                }
+                            } finally {
+                                writeLock.unlock();
+                            }
+                        }
+                    }
+                }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS);
+
+                scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
+                scheduledExecService.scheduleWithFixedDelay(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            purgeOldEvents();
+                        } catch (final Exception e) {
+                            logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString());
+                            if (logger.isDebugEnabled()) {
+                                logger.error("", e);
+                            }
+                            eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString());
+                        }
+                    }
+                }, 1L, 1L, TimeUnit.MINUTES);
+
+                expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager));
+                expirationActions.add(new FileRemovalAction());
+            }
+        } finally {
+            writeLock.unlock();
+        }
     }
 
     private static RepositoryConfiguration createRepositoryConfiguration() throws IOException {
@@ -489,28 +481,26 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 maxIdFile = file;
             }
 
-            if (firstId > maxIndexedId && indexingAction != null && indexingAction.hasBeenPerformed(file)) {
+            if (firstId > maxIndexedId) {
                 maxIndexedId = firstId - 1;
             }
 
-            if (firstId < minIndexedId && indexingAction != null && indexingAction.hasBeenPerformed(file)) {
+            if (firstId < minIndexedId) {
                 minIndexedId = firstId;
             }
         }
 
         if (maxIdFile != null) {
-            final boolean lastFileIndexed = indexingAction == null ? false : indexingAction.hasBeenPerformed(maxIdFile);
-
             // Determine the max ID in the last file.
             try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles())) {
-            	final long eventId = reader.getMaxEventId();
+                final long eventId = reader.getMaxEventId();
                 if (eventId > maxId) {
                     maxId = eventId;
                 }
 
                 // If the ID is greater than the max indexed id and this file was indexed, then
                 // update the max indexed id
-                if (eventId > maxIndexedId && lastFileIndexed) {
+                if (eventId > maxIndexedId) {
                     maxIndexedId = eventId;
                 }
             } catch (final IOException ioe) {
@@ -567,7 +557,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             // Read the records in the last file to find its max id
             if (greatestMinIdFile != null) {
                 try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.<Path>emptyList())) {
-                	maxId = recordReader.getMaxEventId();
+                    maxId = recordReader.getMaxEventId();
                 }
             }
 
@@ -604,11 +594,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             queryExecService.shutdownNow();
 
             indexManager.close();
-            
+
             if ( writers != null ) {
-	            for (final RecordWriter writer : writers) {
-	                writer.close();
-	            }
+                for (final RecordWriter writer : writers) {
+                    writer.close();
+                }
             }
         } finally {
             writeLock.unlock();
@@ -624,7 +614,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         readLock.lock();
         try {
             if (repoDirty.get()) {
-                logger.debug("Cannot persist provenance record because there was an IOException last time a record persistence was attempted. Will not attempt to persist more records until the repo has been rolled over.");
+                logger.debug("Cannot persist provenance record because there was an IOException last time a record persistence was attempted. "
+                        + "Will not attempt to persist more records until the repo has been rolled over.");
                 return;
             }
 
@@ -670,7 +661,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             } catch (final IOException ioe) {
                 logger.error("Failed to persist Provenance Event due to {}. Will not attempt to write to the Provenance Repository again until the repository has rolled over.", ioe.toString());
                 logger.error("", ioe);
-                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to persist Provenance Event due to " + ioe.toString() + ". Will not attempt to write to the Provenance Repository again until the repository has rolled over");
+                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to persist Provenance Event due to " + ioe.toString() +
+                        ". Will not attempt to write to the Provenance Repository again until the repository has rolled over");
 
                 // Switch from readLock to writeLock so that we can perform rollover
                 readLock.unlock();
@@ -735,9 +727,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     /**
      * Returns the size, in bytes, of the Repository storage
      *
-     * @param logFiles
-     * @param timeCutoff
-     * @return
+     * @param logFiles the log files to consider
+     * @param timeCutoff if a log file's last modified date is before timeCutoff, it will be skipped
+     * @return the size of all log files given whose last mod date comes after (or equal to) timeCutoff
      */
     public long getSize(final List<File> logFiles, final long timeCutoff) {
         long bytesUsed = 0L;
@@ -760,7 +752,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     /**
      * Purges old events from the repository
      *
-     * @throws IOException
+     * @throws IOException if unable to purge old events due to an I/O problem
      */
     void purgeOldEvents() throws IOException {
         while (!recoveryFinished.get()) {
@@ -858,12 +850,16 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
                 removed.add(baseName);
             } catch (final FileNotFoundException fnf) {
-                logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} because the file no longer exists; will not perform additional Expiration Actions on this file", currentAction, file);
+                logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} because the file no longer exists; will not "
+                        + "perform additional Expiration Actions on this file", currentAction, file);
                 removed.add(baseName);
             } catch (final Throwable t) {
-                logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional Expiration Actions on this file at this time", currentAction, file, t.toString());
+                logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional "
+                        + "Expiration Actions on this file at this time", currentAction, file, t.toString());
                 logger.warn("", t);
-                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction + " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions on this file at this time");
+                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction +
+                        " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " +
+                        "on this file at this time");
             }
         }
 
@@ -906,24 +902,24 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
     // made protected for testing purposes
     protected int getJournalCount() {
-    	// determine how many 'journals' we have in the journals directories
+        // determine how many 'journals' we have in the journals directories
         int journalFileCount = 0;
         for ( final File storageDir : configuration.getStorageDirectories() ) {
-        	final File journalsDir = new File(storageDir, "journals");
-        	final File[] journalFiles = journalsDir.listFiles();
-        	if ( journalFiles != null ) {
-        		journalFileCount += journalFiles.length;
-        	}
+            final File journalsDir = new File(storageDir, "journals");
+            final File[] journalFiles = journalsDir.listFiles();
+            if ( journalFiles != null ) {
+                journalFileCount += journalFiles.length;
+            }
         }
-        
+
         return journalFileCount;
     }
-    
+
     /**
      * MUST be called with the write lock held
      *
-     * @param force
-     * @throws IOException
+     * @param force if true, will force a rollover regardless of whether or not data has been written
+     * @throws IOException if unable to complete rollover
      */
     private void rollover(final boolean force) throws IOException {
         if (!configuration.isAllowRollover()) {
@@ -938,44 +934,44 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 final File writerFile = writer.getFile();
                 journalsToMerge.add(writerFile);
                 try {
-                	writer.close();
+                    writer.close();
                 } catch (final IOException ioe) {
-                	logger.warn("Failed to close {} due to {}", writer, ioe.toString());
-                	if ( logger.isDebugEnabled() ) {
-                		logger.warn("", ioe);
-                	}
+                    logger.warn("Failed to close {} due to {}", writer, ioe.toString());
+                    if ( logger.isDebugEnabled() ) {
+                        logger.warn("", ioe);
+                    }
                 }
             }
             if ( logger.isDebugEnabled() ) {
-            	logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
+                logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
             }
 
             int journalFileCount = getJournalCount();
             final int journalCountThreshold = configuration.getJournalCount() * 5;
             if ( journalFileCount > journalCountThreshold ) {
-            	logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
-            			+ "Slowing down flow to accomodate. Currently, there are {} journal files and "
-            			+ "threshold for blocking is {}", journalFileCount, journalCountThreshold);
-            	eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
-            			+ "exceeding the provenance recording rate. Slowing down flow to accomodate");
-            	
-            	while (journalFileCount > journalCountThreshold) {
-            		try {
-            			Thread.sleep(1000L);
-            		} catch (final InterruptedException ie) {
-            		}
-            		
-                	logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
-                			+ "to accomodate. Currently, there are {} journal files and "
-                			+ "threshold for blocking is {}", journalFileCount, journalCountThreshold);
-
-            		journalFileCount = getJournalCount();
-            	}
-            	
-            	logger.info("Provenance Repository has no caught up with rolling over journal files. Current number of "
-            			+ "journal files to be rolled over is {}", journalFileCount);
-            }
-            
+                logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
+                        + "Slowing down flow to accomodate. Currently, there are {} journal files and "
+                        + "threshold for blocking is {}", journalFileCount, journalCountThreshold);
+                eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
+                        + "exceeding the provenance recording rate. Slowing down flow to accomodate");
+
+                while (journalFileCount > journalCountThreshold) {
+                    try {
+                        Thread.sleep(1000L);
+                    } catch (final InterruptedException ie) {
+                    }
+
+                    logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
+                            + "to accomodate. Currently, there are {} journal files and "
+                            + "threshold for blocking is {}", journalFileCount, journalCountThreshold);
+
+                    journalFileCount = getJournalCount();
+                }
+
+                logger.info("Provenance Repository has no caught up with rolling over journal files. Current number of "
+                        + "journal files to be rolled over is {}", journalFileCount);
+            }
+
             writers = createWriters(configuration, idGenerator.get());
             streamStartTime.set(System.currentTimeMillis());
             recordsWrittenSinceRollover.getAndSet(0);
@@ -989,24 +985,24 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             final Runnable rolloverRunnable = new Runnable() {
                 @Override
                 public void run() {
-                	try {
-	                    final File fileRolledOver;
-	
-	                    try {
-	                        fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords);
-	                        repoDirty.set(false);
-	                    } catch (final IOException ioe) {
-	                        repoDirty.set(true);
-	                        logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
-	                        logger.error("", ioe);
-	                        return;
-	                    }
-	
-	                    if (fileRolledOver == null) {
-	                        return;
-	                    }
-	                    File file = fileRolledOver;
-	
+                    try {
+                        final File fileRolledOver;
+
+                        try {
+                            fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords);
+                            repoDirty.set(false);
+                        } catch (final IOException ioe) {
+                            repoDirty.set(true);
+                            logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
+                            logger.error("", ioe);
+                            return;
+                        }
+
+                        if (fileRolledOver == null) {
+                            return;
+                        }
+                        File file = fileRolledOver;
+
                         // update our map of id to Path
                         // need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a
                         // get() to obtain the most up-to-date version but we use a writeLock to prevent multiple threads modifying
@@ -1021,24 +1017,24 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                         } finally {
                             writeLock.unlock();
                         }
-	
-	                    logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
-	                    rolloverCompletions.getAndIncrement();
-	                    
-	                    // We have finished successfully. Cancel the future so that we don't run anymore
-	                    Future<?> future;
-	                    while ((future = futureReference.get()) == null) {
-	                    	try {
-	                    		Thread.sleep(10L);
-	                    	} catch (final InterruptedException ie) {
-	                    	}
-	                    }
-	                    
-	                    future.cancel(false);
-	                } catch (final Throwable t) {
-	                	logger.error("Failed to rollover Provenance repository due to {}", t.toString());
-	                	logger.error("", t);
-	                }
+
+                        logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
+                        rolloverCompletions.getAndIncrement();
+
+                        // We have finished successfully. Cancel the future so that we don't run anymore
+                        Future<?> future;
+                        while ((future = futureReference.get()) == null) {
+                            try {
+                                Thread.sleep(10L);
+                            } catch (final InterruptedException ie) {
+                            }
+                        }
+
+                        future.cancel(false);
+                    } catch (final Throwable t) {
+                        logger.error("Failed to rollover Provenance repository due to {}", t.toString());
+                        logger.error("", t);
+                    }
                 }
             };
 
@@ -1074,10 +1070,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             }
 
             for (final File journalFile : journalFiles) {
-            	if ( journalFile.isDirectory() ) {
-            		continue;
-            	}
-            	
+                if ( journalFile.isDirectory() ) {
+                    continue;
+                }
+
                 final String basename = LuceneUtil.substringBefore(journalFile.getName(), ".");
                 List<File> files = journalMap.get(basename);
                 if (files == null) {
@@ -1120,83 +1116,84 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         return mergedFile;
     }
 
-    File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter, final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
-    	logger.debug("Merging {} to {}", journalFiles, mergedFile);
-    	if ( this.closed ) {
-    		logger.info("Provenance Repository has been closed; will not merge journal files to {}", mergedFile);
-    		return null;
-    	}
-    	
+    File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter,
+            final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
+        logger.debug("Merging {} to {}", journalFiles, mergedFile);
+        if ( this.closed ) {
+            logger.info("Provenance Repository has been closed; will not merge journal files to {}", mergedFile);
+            return null;
+        }
+
         if (journalFiles.isEmpty()) {
             return null;
         }
 
         Collections.sort(journalFiles, new Comparator<File>() {
-			@Override
-			public int compare(final File o1, final File o2) {
-				final String suffix1 = LuceneUtil.substringAfterLast(o1.getName(), ".");
-				final String suffix2 = LuceneUtil.substringAfterLast(o2.getName(), ".");
-
-				try {
-					final int journalIndex1 = Integer.parseInt(suffix1);
-					final int journalIndex2 = Integer.parseInt(suffix2);
-					return Integer.compare(journalIndex1, journalIndex2);
-				} catch (final NumberFormatException nfe) {
-					return o1.getName().compareTo(o2.getName());
-				}
-			}
+            @Override
+            public int compare(final File o1, final File o2) {
+                final String suffix1 = LuceneUtil.substringAfterLast(o1.getName(), ".");
+                final String suffix2 = LuceneUtil.substringAfterLast(o2.getName(), ".");
+
+                try {
+                    final int journalIndex1 = Integer.parseInt(suffix1);
+                    final int journalIndex2 = Integer.parseInt(suffix2);
+                    return Integer.compare(journalIndex1, journalIndex2);
+                } catch (final NumberFormatException nfe) {
+                    return o1.getName().compareTo(o2.getName());
+                }
+            }
         });
-        
+
         final String firstJournalFile = journalFiles.get(0).getName();
         final String firstFileSuffix = LuceneUtil.substringAfterLast(firstJournalFile, ".");
         final boolean allPartialFiles = firstFileSuffix.equals("0");
-        
+
         // check if we have all of the "partial" files for the journal.
         if (allPartialFiles) {
-        	if ( mergedFile.exists() ) {
-        		// we have all "partial" files and there is already a merged file. Delete the data from the index
-        		// because the merge file may not be fully merged. We will re-merge.
-        		logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist "
-        				+ "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency.");
-        		
-        		final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager);
-        		try {
-        			deleteAction.execute(mergedFile);
-        		} catch (final Exception e) {
-        			logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", mergedFile, e.toString());
-        			if ( logger.isDebugEnabled() ) {
-        				logger.warn("", e);
-        			}
-        		}
-
-        		// Since we only store the file's basename, block offset, and event ID, and because the newly created file could end up on
-        		// a different Storage Directory than the original, we need to ensure that we delete both the partially merged
-        		// file and the TOC file. Otherwise, we could get the wrong copy and have issues retrieving events.
-        		if ( !mergedFile.delete() ) {
-        			logger.error("Failed to delete partially written Provenance Journal File {}. This may result in events from this journal "
-        					+ "file not being able to be displayed. This file should be deleted manually.", mergedFile);
-        		}
-        		
-        		final File tocFile = TocUtil.getTocFile(mergedFile);
-        		if ( tocFile.exists() && !tocFile.delete() ) {
-        			logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. "
-        					+ "This can be corrected by manually deleting the {} file", tocFile, mergedFile, tocFile);
-        		}
-        	}
+            if ( mergedFile.exists() ) {
+                // we have all "partial" files and there is already a merged file. Delete the data from the index
+                // because the merge file may not be fully merged. We will re-merge.
+                logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist "
+                        + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency.");
+
+                final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager);
+                try {
+                    deleteAction.execute(mergedFile);
+                } catch (final Exception e) {
+                    logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", mergedFile, e.toString());
+                    if ( logger.isDebugEnabled() ) {
+                        logger.warn("", e);
+                    }
+                }
+
+                // Since we only store the file's basename, block offset, and event ID, and because the newly created file could end up on
+                // a different Storage Directory than the original, we need to ensure that we delete both the partially merged
+                // file and the TOC file. Otherwise, we could get the wrong copy and have issues retrieving events.
+                if ( !mergedFile.delete() ) {
+                    logger.error("Failed to delete partially written Provenance Journal File {}. This may result in events from this journal "
+                            + "file not being able to be displayed. This file should be deleted manually.", mergedFile);
+                }
+
+                final File tocFile = TocUtil.getTocFile(mergedFile);
+                if ( tocFile.exists() && !tocFile.delete() ) {
+                    logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. "
+                            + "This can be corrected by manually deleting the {} file", tocFile, mergedFile, tocFile);
+                }
+            }
         } else {
-        	logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' "
-        			+ "but it did not; assuming that the files were already merged but only some finished deletion "
-        			+ "before restart. Deleting remaining partial journal files.", journalFiles);
-        	
-        	for ( final File file : journalFiles ) {
-        		if ( !file.delete() && file.exists() ) {
-        			logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file);
-        		}
-        	}
-        	
-        	return null;
-        }
-        
+            logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' "
+                    + "but it did not; assuming that the files were already merged but only some finished deletion "
+                    + "before restart. Deleting remaining partial journal files.", journalFiles);
+
+            for ( final File file : journalFiles ) {
+                if ( !file.delete() && file.exists() ) {
+                    logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file);
+                }
+            }
+
+            return null;
+        }
+
         final long startNanos = System.nanoTime();
 
         // Map each journal to a RecordReader
@@ -1241,12 +1238,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                     record = reader.nextRecord();
                 } catch (final EOFException eof) {
                 } catch (final Exception e) {
-                    logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't completely written to the file. This record will be skipped.");
+                    logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't "
+                            + "completely written to the file. This record will be skipped.");
                     if (logger.isDebugEnabled()) {
                         logger.warn("", e);
                     }
 
-                    eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e + "; it's possible that hte record wasn't completely written to the file. This record will be skipped.");
+                    eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e +
+                            "; it's possible that hte record wasn't completely written to the file. This record will be skipped.");
                 }
 
                 if (record == null) {
@@ -1261,47 +1260,47 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) {
                 writer.writeHeader();
 
-                final IndexingAction indexingAction = new IndexingAction(this, indexConfig);
-                
+                final IndexingAction indexingAction = new IndexingAction(this);
+
                 final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile);
                 final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory);
                 try {
-                	long maxId = 0L;
-                	
-	                while (!recordToReaderMap.isEmpty()) {
-	                    final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
-	                    final StandardProvenanceEventRecord record = entry.getKey();
-	                    final RecordReader reader = entry.getValue();
-	
-	                    writer.writeRecord(record, record.getEventId());
-	                    final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
-	                    
-	                    indexingAction.index(record, indexWriter, blockIndex);
-	                    maxId = record.getEventId();
-	                    
-	                    ringBuffer.add(record);
-	                    records++;
-	
-	                    // Remove this entry from the map
-	                    recordToReaderMap.remove(record);
-	
-	                    // Get the next entry from this reader and add it to the map
-	                    StandardProvenanceEventRecord nextRecord = null;
-	
-	                    try {
-	                        nextRecord = reader.nextRecord();
-	                    } catch (final EOFException eof) {
-	                    }
-	
-	                    if (nextRecord != null) {
-	                        recordToReaderMap.put(nextRecord, reader);
-	                    }
-	                }
-	                
-	                indexWriter.commit();
-	                indexConfig.setMaxIdIndexed(maxId);
+                    long maxId = 0L;
+
+                    while (!recordToReaderMap.isEmpty()) {
+                        final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
+                        final StandardProvenanceEventRecord record = entry.getKey();
+                        final RecordReader reader = entry.getValue();
+
+                        writer.writeRecord(record, record.getEventId());
+                        final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
+
+                        indexingAction.index(record, indexWriter, blockIndex);
+                        maxId = record.getEventId();
+
+                        ringBuffer.add(record);
+                        records++;
+
+                        // Remove this entry from the map
+                        recordToReaderMap.remove(record);
+
+                        // Get the next entry from this reader and add it to the map
+                        StandardProvenanceEventRecord nextRecord = null;
+
+                        try {
+                            nextRecord = reader.nextRecord();
+                        } catch (final EOFException eof) {
+                        }
+
+                        if (nextRecord != null) {
+                            recordToReaderMap.put(nextRecord, reader);
+                        }
+                    }
+
+                    indexWriter.commit();
+                    indexConfig.setMaxIdIndexed(maxId);
                 } finally {
-                	indexManager.returnIndexWriter(indexingDirectory, indexWriter);
+                    indexManager.returnIndexWriter(indexingDirectory, indexWriter);
                 }
             }
         } finally {
@@ -1319,7 +1318,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath());
                 eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; this file should be cleaned up manually");
             }
-            
+
             final File tocFile = TocUtil.getTocFile(journalFile);
             if (!tocFile.delete() && tocFile.exists()) {
                 logger.warn("Failed to remove temporary journal TOC file {}; this file should be cleaned up manually", tocFile.getAbsolutePath());
@@ -1374,7 +1373,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     public QuerySubmission submitQuery(final Query query) {
         final int numQueries = querySubmissionMap.size();
         if (numQueries > MAX_UNDELETED_QUERY_RESULTS) {
-            throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later.");
+            throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not "
+                    + "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later.");
         }
 
         if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) {
@@ -1416,7 +1416,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         final AtomicInteger retrievalCount = new AtomicInteger(0);
         final List<File> indexDirectories = indexConfig.getIndexDirectories(
                 query.getStartDate() == null ? null : query.getStartDate().getTime(),
-                query.getEndDate() == null ? null : query.getEndDate().getTime());
+                        query.getEndDate() == null ? null : query.getEndDate().getTime());
         final AsyncQuerySubmission result = new AsyncQuerySubmission(query, indexDirectories.size());
         querySubmissionMap.put(query.getIdentifier(), result);
 
@@ -1432,11 +1432,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     }
 
     /**
-     * REMOVE-ME: This is for testing only and can be removed.
+     * This is for testing only and not actually used other than in debugging
      *
-     * @param luceneQuery
-     * @return
-     * @throws IOException
+     * @param luceneQuery the lucene query to execute
+     * @return an Iterator of ProvenanceEventRecord that match the query
+     * @throws IOException if unable to perform the query
      */
     public Iterator<ProvenanceEventRecord> queryLucene(final org.apache.lucene.search.Query luceneQuery) throws IOException {
         final List<File> indexFiles = indexConfig.getIndexDirectories();
@@ -1601,7 +1601,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         return computeLineage(Collections.<String>singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
     }
 
-    private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final Long startTimestamp, final Long endTimestamp) throws IOException {
+    private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final Long startTimestamp,
+            final Long endTimestamp) throws IOException {
         final AsyncLineageSubmission submission = submitLineageComputation(flowFileUuids, computationType, eventId, startTimestamp, endTimestamp);
         final StandardLineageResult result = submission.getResult();
         while (!result.isFinished()) {
@@ -1623,7 +1624,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         return submitLineageComputation(Collections.singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
     }
 
-    private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final long startTimestamp, final long endTimestamp) {
+    private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType,
+            final Long eventId, final long startTimestamp, final long endTimestamp) {
         final List<File> indexDirs = indexConfig.getIndexDirectories(startTimestamp, endTimestamp);
         final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, indexDirs.size());
         lineageSubmissionMap.put(result.getLineageIdentifier(), result);
@@ -1647,16 +1649,16 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             }
 
             switch (event.getEventType()) {
-                case CLONE:
-                case FORK:
-                case JOIN:
-                case REPLAY:
-                    return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE);
-                default:
-                    final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
-                    lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
-                    submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
-                    return submission;
+            case CLONE:
+            case FORK:
+            case JOIN:
+            case REPLAY:
+                return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE);
+            default:
+                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
+                lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
+                submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
+                return submission;
             }
         } catch (final IOException ioe) {
             final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
@@ -1684,17 +1686,17 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             }
 
             switch (event.getEventType()) {
-                case JOIN:
-                case FORK:
-                case CLONE:
-                case REPLAY:
-                    return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, 0L, event.getEventTime());
-                default: {
-                    final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
-                    lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
-                    submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
-                    return submission;
-                }
+            case JOIN:
+            case FORK:
+            case CLONE:
+            case REPLAY:
+                return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, 0L, event.getEventTime());
+            default: {
+                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
+                lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
+                submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
+                return submission;
+            }
             }
         } catch (final IOException ioe) {
             final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index 3951591..d0d147c 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -34,7 +34,7 @@ public class RepositoryConfiguration {
     private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
     private int journalCount = 16;
     private int compressionBlockBytes = 1024 * 1024;
-    
+
     private List<SearchableField> searchableFields = new ArrayList<>();
     private List<SearchableField> searchableAttributes = new ArrayList<>();
     private boolean compress = true;
@@ -50,19 +50,19 @@ public class RepositoryConfiguration {
         return allowRollover;
     }
 
-    
+
     public int getCompressionBlockBytes() {
-		return compressionBlockBytes;
-	}
+        return compressionBlockBytes;
+    }
 
-	public void setCompressionBlockBytes(int compressionBlockBytes) {
-		this.compressionBlockBytes = compressionBlockBytes;
-	}
+    public void setCompressionBlockBytes(int compressionBlockBytes) {
+        this.compressionBlockBytes = compressionBlockBytes;
+    }
 
-	/**
+    /**
      * Specifies where the repository will store data
      *
-     * @return
+     * @return the directories where provenance files will be stored
      */
     public List<File> getStorageDirectories() {
         return Collections.unmodifiableList(storageDirectories);
@@ -71,18 +71,15 @@ public class RepositoryConfiguration {
     /**
      * Specifies where the repository should store data
      *
-     * @param storageDirectory
+     * @param storageDirectory the directory to store provenance files
      */
     public void addStorageDirectory(final File storageDirectory) {
         this.storageDirectories.add(storageDirectory);
     }
 
     /**
-     * Returns the minimum amount of time that a given record will stay in the
-     * repository
-     *
-     * @param timeUnit
-     * @return
+     * @param timeUnit the desired time unit
+     * @return the max amount of time that a given record will stay in the repository
      */
     public long getMaxRecordLife(final TimeUnit timeUnit) {
         return timeUnit.convert(recordLifeMillis, TimeUnit.MILLISECONDS);
@@ -91,8 +88,8 @@ public class RepositoryConfiguration {
     /**
      * Specifies how long a record should stay in the repository
      *
-     * @param maxRecordLife
-     * @param timeUnit
+     * @param maxRecordLife the max amount of time to keep a record in the repo
+     * @param timeUnit the period of time used by maxRecordLife
      */
     public void setMaxRecordLife(final long maxRecordLife, final TimeUnit timeUnit) {
         this.recordLifeMillis = TimeUnit.MILLISECONDS.convert(maxRecordLife, timeUnit);
@@ -101,7 +98,7 @@ public class RepositoryConfiguration {
     /**
      * Returns the maximum amount of data to store in the repository (in bytes)
      *
-     * @return
+     * @return the maximum amount of disk space to use for the prov repo
      */
     public long getMaxStorageCapacity() {
         return storageCapacity;
@@ -109,107 +106,91 @@ public class RepositoryConfiguration {
 
     /**
      * Sets the maximum amount of data to store in the repository (in bytes)
-     * @param maxStorageCapacity
+     *
+     * @param maxStorageCapacity the maximum amount of disk space to use for the prov repo
      */
     public void setMaxStorageCapacity(final long maxStorageCapacity) {
         this.storageCapacity = maxStorageCapacity;
     }
 
     /**
-     * Returns the maximum amount of time to write to a single event file
-     *
-     * @param timeUnit
-     * @return
+     * @param timeUnit the desired time unit for the returned value
+     * @return the maximum amount of time that the repo will write to a single event file
      */
     public long getMaxEventFileLife(final TimeUnit timeUnit) {
         return timeUnit.convert(eventFileMillis, TimeUnit.MILLISECONDS);
     }
 
     /**
-     * Sets the maximum amount of time to write to a single event file
-     *
-     * @param maxEventFileTime
-     * @param timeUnit
+     * @param maxEventFileTime the max amount of time to write to a single event file
+     * @param timeUnit the units for the value supplied by maxEventFileTime
      */
     public void setMaxEventFileLife(final long maxEventFileTime, final TimeUnit timeUnit) {
         this.eventFileMillis = TimeUnit.MILLISECONDS.convert(maxEventFileTime, timeUnit);
     }
 
     /**
-     * Returns the maximum number of bytes (pre-compression) that will be
+     * @return the maximum number of bytes (pre-compression) that will be
      * written to a single event file before the file is rolled over
-     *
-     * @return
      */
     public long getMaxEventFileCapacity() {
         return eventFileBytes;
     }
 
     /**
-     * Sets the maximum number of bytes (pre-compression) that will be written
+     * @param maxEventFileBytes the maximum number of bytes (pre-compression) that will be written
      * to a single event file before the file is rolled over
-     *
-     * @param maxEventFileBytes
      */
     public void setMaxEventFileCapacity(final long maxEventFileBytes) {
         this.eventFileBytes = maxEventFileBytes;
     }
 
     /**
-     * Returns the fields that can be indexed
-     *
-     * @return
+     * @return the fields that should be indexed
      */
     public List<SearchableField> getSearchableFields() {
         return Collections.unmodifiableList(searchableFields);
     }
 
     /**
-     * Sets the fields to index
-     *
-     * @param searchableFields
+     * @param searchableFields the fields to index
      */
     public void setSearchableFields(final List<SearchableField> searchableFields) {
         this.searchableFields = new ArrayList<>(searchableFields);
     }
 
     /**
-     * Returns the FlowFile attributes that can be indexed
-     *
-     * @return
+     * @return the FlowFile attributes that should be indexed
      */
     public List<SearchableField> getSearchableAttributes() {
         return Collections.unmodifiableList(searchableAttributes);
     }
 
     /**
-     * Sets the FlowFile attributes to index
-     *
-     * @param searchableAttributes
+     * @param searchableAttributes the FlowFile attributes to index
      */
     public void setSearchableAttributes(final List<SearchableField> searchableAttributes) {
         this.searchableAttributes = new ArrayList<>(searchableAttributes);
     }
 
     /**
-     * Indicates whether or not event files will be compressed when they are
+     * @return whether or not event files will be compressed when they are
      * rolled over
-     *
-     * @return
      */
     public boolean isCompressOnRollover() {
         return compress;
     }
 
     /**
-     * Specifies whether or not to compress event files on rollover
-     *
-     * @param compress
+     * @param compress if true, the data will be compressed when rolled over
      */
     public void setCompressOnRollover(final boolean compress) {
         this.compress = compress;
     }
 
+    /**
+     * @return the number of threads to use to query the repo
+     */
     public int getQueryThreadPoolSize() {
         return queryThreadPoolSize;
     }
@@ -246,27 +227,23 @@ public class RepositoryConfiguration {
      * </li>
      * </ol>
      *
-     * @param bytes
+     * @param bytes the number of bytes to write to an index before beginning a new shard
      */
     public void setDesiredIndexSize(final long bytes) {
         this.desiredIndexBytes = bytes;
     }
 
     /**
-     * Returns the desired size of each index shard. See the
-     * {@Link #setDesiredIndexSize} method for an explanation of why we choose
+     * @return the desired size of each index shard. See the
+     * {@link #setDesiredIndexSize} method for an explanation of why we choose
      * to shard the index.
-     *
-     * @return
      */
     public long getDesiredIndexSize() {
         return desiredIndexBytes;
     }
 
     /**
-     * Sets the number of Journal files to use when persisting records.
-     *
-     * @param numJournals
+     * @param numJournals the number of Journal files to use when persisting records.
      */
     public void setJournalCount(final int numJournals) {
         if (numJournals < 1) {
@@ -277,19 +254,14 @@ public class RepositoryConfiguration {
     }
 
     /**
-     * Returns the number of Journal files that will be used when persisting
-     * records.
-     *
-     * @return
+     * @return the number of Journal files that will be used when persisting records.
      */
     public int getJournalCount() {
         return journalCount;
     }
 
     /**
-     * Specifies whether or not the Repository should sync all updates to disk.
-     *
-     * @return
+     * @return <code>true</code> if the repository will perform an 'fsync' for all updates to disk
      */
     public boolean isAlwaysSync() {
         return alwaysSync;
@@ -301,7 +273,7 @@ public class RepositoryConfiguration {
      * persisted across restarted, even if there is a power failure or a sudden
      * Operating System crash, but it can be very expensive.
      *
-     * @param alwaysSync
+     * @param alwaysSync whether or not to perform an 'fsync' for all updates to disk
      */
     public void setAlwaysSync(boolean alwaysSync) {
         this.alwaysSync = alwaysSync;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
index 9bbf195..ca0d5ed 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
@@ -39,40 +39,40 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StandardRecordReader implements RecordReader {
-	private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class);
-	
-	private final ByteCountingInputStream rawInputStream;
+    private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class);
+
+    private final ByteCountingInputStream rawInputStream;
     private final String filename;
     private final int serializationVersion;
     private final boolean compressed;
     private final TocReader tocReader;
     private final int headerLength;
-    
+
     private DataInputStream dis;
     private ByteCountingInputStream byteCountingIn;
 
     public StandardRecordReader(final InputStream in, final String filename) throws IOException {
-    	this(in, filename, null);
+        this(in, filename, null);
     }
-    
+
     public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader) throws IOException {
-    	logger.trace("Creating RecordReader for {}", filename);
-    	
-    	rawInputStream = new ByteCountingInputStream(in);
+        logger.trace("Creating RecordReader for {}", filename);
+
+        rawInputStream = new ByteCountingInputStream(in);
 
         final InputStream limitedStream;
         if ( tocReader == null ) {
-        	limitedStream = rawInputStream;
+            limitedStream = rawInputStream;
         } else {
-        	final long offset1 = tocReader.getBlockOffset(1);
-        	if ( offset1 < 0 ) {
-        		limitedStream = rawInputStream;
-        	} else {
-        		limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed());
-        	}
-        }
-        
-    	final InputStream readableStream;
+            final long offset1 = tocReader.getBlockOffset(1);
+            if ( offset1 < 0 ) {
+                limitedStream = rawInputStream;
+            } else {
+                limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed());
+            }
+        }
+
+        final InputStream readableStream;
         if (filename.endsWith(".gz")) {
             readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
             compressed = true;
@@ -83,11 +83,11 @@ public class StandardRecordReader implements RecordReader {
 
         byteCountingIn = new ByteCountingInputStream(readableStream);
         dis = new DataInputStream(byteCountingIn);
-        
+
         final String repoClassName = dis.readUTF();
         final int serializationVersion = dis.readInt();
-        headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4;	// 2 bytes for string length, 4 for integer.
-        
+        headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for string length, 4 for integer.
+
         if (serializationVersion < 1 || serializationVersion > 8) {
             throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-8");
         }
@@ -99,52 +99,52 @@ public class StandardRecordReader implements RecordReader {
 
     @Override
     public void skipToBlock(final int blockIndex) throws IOException {
-    	if ( tocReader == null ) {
-    		throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log");
-    	}
-    	
-    	if ( blockIndex < 0 ) {
-    		throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative");
-    	}
-    	
-    	if ( blockIndex == getBlockIndex() ) {
-    		return;
-    	}
-    	
-    	final long offset = tocReader.getBlockOffset(blockIndex);
-    	if ( offset < 0 ) {
-    		throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename);
-    	}
-    	
-    	final long curOffset = rawInputStream.getBytesConsumed();
-    	
-    	final long bytesToSkip = offset - curOffset;
-    	if ( bytesToSkip >= 0 ) {
-	    	try {
-	    		StreamUtils.skip(rawInputStream, bytesToSkip);
-	    		logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip);
-	    	} catch (final IOException e) {
-	    		throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e);
-	    	}
-	
-	    	resetStreamForNextBlock();
-    	}
+        if ( tocReader == null ) {
+            throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log");
+        }
+
+        if ( blockIndex < 0 ) {
+            throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative");
+        }
+
+        if ( blockIndex == getBlockIndex() ) {
+            return;
+        }
+
+        final long offset = tocReader.getBlockOffset(blockIndex);
+        if ( offset < 0 ) {
+            throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename);
+        }
+
+        final long curOffset = rawInputStream.getBytesConsumed();
+
+        final long bytesToSkip = offset - curOffset;
+        if ( bytesToSkip >= 0 ) {
+            try {
+                StreamUtils.skip(rawInputStream, bytesToSkip);
+                logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip);
+            } catch (final IOException e) {
+                throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e);
+            }
+
+            resetStreamForNextBlock();
+        }
     }
-    
+
     private void resetStreamForNextBlock() throws IOException {
-    	final InputStream limitedStream;
+        final InputStream limitedStream;
         if ( tocReader == null ) {
-        	limitedStream = rawInputStream;
+            limitedStream = rawInputStream;
         } else {
-        	final long offset = tocReader.getBlockOffset(1 + getBlockIndex());
-        	if ( offset < 0 ) {
-        		limitedStream = rawInputStream;
-        	} else {
-        		limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed());
-        	}
-        }
-    	
-    	final InputStream readableStream;
+            final long offset = tocReader.getBlockOffset(1 + getBlockIndex());
+            if ( offset < 0 ) {
+                limitedStream = rawInputStream;
+            } else {
+                limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed());
+            }
+        }
+
+        final InputStream readableStream;
         if (compressed) {
             readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
         } else {
@@ -154,32 +154,32 @@ public class StandardRecordReader implements RecordReader {
         byteCountingIn = new ByteCountingInputStream(readableStream, rawInputStream.getBytesConsumed());
         dis = new DataInputStream(byteCountingIn);
     }
-    
-    
+
+
     @Override
     public TocReader getTocReader() {
-    	return tocReader;
+        return tocReader;
     }
-    
+
     @Override
     public boolean isBlockIndexAvailable() {
-    	return tocReader != null;
+        return tocReader != null;
     }
-    
+
     @Override
     public int getBlockIndex() {
-    	if ( tocReader == null ) {
-    		throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename);
-    	}
-    	
-    	return tocReader.getBlockIndex(rawInputStream.getBytesConsumed());
+        if ( tocReader == null ) {
+            throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename);
+        }
+
+        return tocReader.getBlockIndex(rawInputStream.getBytesConsumed());
     }
-    
+
     @Override
     public long getBytesConsumed() {
-    	return byteCountingIn.getBytesConsumed();
+        return byteCountingIn.getBytesConsumed();
     }
-    
+
     private StandardProvenanceEventRecord readPreVersion6Record() throws IOException {
         final long startOffset = byteCountingIn.getBytesConsumed();
 
@@ -374,17 +374,17 @@ public class StandardRecordReader implements RecordReader {
     }
 
     private String readUUID(final DataInputStream in) throws IOException {
-    	if ( serializationVersion < 8 ) {
-	        final long msb = in.readLong();
-	        final long lsb = in.readLong();
-	        return new UUID(msb, lsb).toString();
-    	} else {
-    		// before version 8, we serialized UUID's as two longs in order to
-    		// write less data. However, in version 8 we changed to just writing
-    		// out the string because it's extremely expensive to call UUID.fromString.
-    		// In the end, since we generally compress, the savings in minimal anyway.
-    		return in.readUTF();
-    	}
+        if ( serializationVersion < 8 ) {
+            final long msb = in.readLong();
+            final long lsb = in.readLong();
+            return new UUID(msb, lsb).toString();
+        } else {
+            // before version 8, we serialized UUID's as two longs in order to
+            // write less data. However, in version 8 we changed to just writing
+            // out the string because it's extremely expensive to call UUID.fromString.
+            // In the end, since we generally compress, the savings in minimal anyway.
+            return in.readUTF();
+        }
     }
 
     private String readNullableString(final DataInputStream in) throws IOException {
@@ -416,53 +416,53 @@ public class StandardRecordReader implements RecordReader {
         byteCountingIn.mark(1);
         int nextByte = byteCountingIn.read();
         byteCountingIn.reset();
-        
+
         if ( nextByte < 0 ) {
-        	try {
-        		resetStreamForNextBlock();
-        	} catch (final EOFException eof) {
-        		return false;
-        	}
-        	
+            try {
+                resetStreamForNextBlock();
+            } catch (final EOFException eof) {
+                return false;
+            }
+
             byteCountingIn.mark(1);
             nextByte = byteCountingIn.read();
             byteCountingIn.reset();
         }
-        
+
         return (nextByte >= 0);
     }
-    
+
     @Override
     public long getMaxEventId() throws IOException {
-    	if ( tocReader != null ) {
-    		final long lastBlockOffset = tocReader.getLastBlockOffset();
-    		skipToBlock(tocReader.getBlockIndex(lastBlockOffset));
-    	}
-    	
-    	ProvenanceEventRecord record;
-    	ProvenanceEventRecord lastRecord = null;
-    	try {
-	    	while ((record = nextRecord()) != null) {
-	    		lastRecord = record;
-	    	}
-    	} catch (final EOFException eof) {
-    		// This can happen if we stop NIFi while the record is being written.
-    		// This is OK, we just ignore this record. The session will not have been
-    		// committed, so we can just process the FlowFile again.
-    	}
-    	
-    	return (lastRecord == null) ? -1L : lastRecord.getEventId();
+        if ( tocReader != null ) {
+            final long lastBlockOffset = tocReader.getLastBlockOffset();
+            skipToBlock(tocReader.getBlockIndex(lastBlockOffset));
+        }
+
+        ProvenanceEventRecord record;
+        ProvenanceEventRecord lastRecord = null;
+        try {
+            while ((record = nextRecord()) != null) {
+                lastRecord = record;
+            }
+        } catch (final EOFException eof) {
+            // This can happen if we stop NIFi while the record is being written.
+            // This is OK, we just ignore this record. The session will not have been
+            // committed, so we can just process the FlowFile again.
+        }
+
+        return (lastRecord == null) ? -1L : lastRecord.getEventId();
     }
 
     @Override
     public void close() throws IOException {
-    	logger.trace("Closing Record Reader for {}", filename);
-    	
+        logger.trace("Closing Record Reader for {}", filename);
+
         dis.close();
         rawInputStream.close();
-        
+
         if ( tocReader != null ) {
-        	tocReader.close();
+            tocReader.close();
         }
     }
 
@@ -473,9 +473,9 @@ public class StandardRecordReader implements RecordReader {
 
     @Override
     public void skipTo(final long position) throws IOException {
-    	// we are subtracting headerLength from the number of bytes consumed because we used to 
-    	// consider the offset of the first record "0" - now we consider it whatever position it
-    	// it really is in the stream.
+        // we are subtracting headerLength from the number of bytes consumed because we used to
+        // consider the offset of the first record "0" - now we consider it whatever position it
+        // it really is in the stream.
         final long currentPosition = byteCountingIn.getBytesConsumed() - headerLength;
         if (currentPosition == position) {
             return;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index dbb2c48..3095f13 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -36,15 +36,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StandardRecordWriter implements RecordWriter {
-	private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class);
-	
+    private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class);
+
     private final File file;
     private final FileOutputStream fos;
     private final ByteCountingOutputStream rawOutStream;
     private final TocWriter tocWriter;
     private final boolean compressed;
     private final int uncompressedBlockSize;
-    
+
     private DataOutputStream out;
     private ByteCountingOutputStream byteCountingOut;
     private long lastBlockOffset = 0L;
@@ -52,21 +52,21 @@ public class StandardRecordWriter implements RecordWriter {
 
     private final Lock lock = new ReentrantLock();
 
-    
+
     public StandardRecordWriter(final File file, final TocWriter writer, final boolean compressed, final int uncompressedBlockSize) throws IOException {
-    	logger.trace("Creating Record Writer for {}", file.getName());
-    	
+        logger.trace("Creating Record Writer for {}", file.getName());
+
         this.file = file;
         this.compressed = compressed;
         this.fos = new FileOutputStream(file);
         rawOutStream = new ByteCountingOutputStream(fos);
         this.uncompressedBlockSize = uncompressedBlockSize;
-        
+
         this.tocWriter = writer;
     }
 
     static void writeUUID(final DataOutputStream out, final String uuid) throws IOException {
-    	out.writeUTF(uuid);
+        out.writeUTF(uuid);
     }
 
     static void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException {
@@ -85,49 +85,49 @@ public class StandardRecordWriter implements RecordWriter {
         return file;
     }
 
-	@Override
+    @Override
     public synchronized void writeHeader() throws IOException {
         lastBlockOffset = rawOutStream.getBytesWritten();
         resetWriteStream();
-        
+
         out.writeUTF(PersistentProvenanceRepository.class.getName());
         out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION);
         out.flush();
     }
-    
+
     private void resetWriteStream() throws IOException {
-    	if ( out != null ) {
-    		out.flush();
-    	}
-
-    	final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
-    	
-    	final OutputStream writableStream;
-    	if ( compressed ) {
-    		// because of the way that GZIPOutputStream works, we need to call close() on it in order for it
-    		// to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
-    		// the underlying OutputStream in a NonCloseableOutputStream
-    		if ( out != null ) {
-    			out.close();
-    		}
-
-        	if ( tocWriter != null ) {
-        		tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
-        	}
-
-    		writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
-    	} else {
-        	if ( tocWriter != null ) {
-        		tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
-        	}
-
-    		writableStream = new BufferedOutputStream(rawOutStream, 65536);
-    	}
-    	
+        if ( out != null ) {
+            out.flush();
+        }
+
+        final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
+
+        final OutputStream writableStream;
+        if ( compressed ) {
+            // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
+            // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
+            // the underlying OutputStream in a NonCloseableOutputStream
+            if ( out != null ) {
+                out.close();
+            }
+
+            if ( tocWriter != null ) {
+                tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
+            }
+
+            writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
+        } else {
+            if ( tocWriter != null ) {
+                tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
+            }
+
+            writableStream = new BufferedOutputStream(rawOutStream, 65536);
+        }
+
         this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset);
         this.out = new DataOutputStream(byteCountingOut);
     }
-    
+
 
     @Override
     public synchronized long writeRecord(final ProvenanceEventRecord record, long recordIdentifier) throws IOException {
@@ -136,16 +136,16 @@ public class StandardRecordWriter implements RecordWriter {
 
         // add a new block to the TOC if needed.
         if ( tocWriter != null && (startBytes - lastBlockOffset >= uncompressedBlockSize) ) {
-        	lastBlockOffset = startBytes;
-        	
-        	if ( compressed ) {
-        		// because of the way that GZIPOutputStream works, we need to call close() on it in order for it
-        		// to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
-        		// the underlying OutputStream in a NonCloseableOutputStream
-        		resetWriteStream();
-        	}
+            lastBlockOffset = startBytes;
+
+            if ( compressed ) {
+                // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
+                // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
+                // the underlying OutputStream in a NonCloseableOutputStream
+                resetWriteStream();
+            }
         }
-        
+
         out.writeLong(recordIdentifier);
         out.writeUTF(record.getEventType().name());
         out.writeLong(record.getEventTime());
@@ -175,7 +175,7 @@ public class StandardRecordWriter implements RecordWriter {
             writeLongNullableString(out, entry.getValue());
         }
 
-        // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'. 
+        // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
         if (record.getContentClaimSection() != null && record.getContentClaimContainer() != null && record.getContentClaimIdentifier() != null) {
             out.writeBoolean(true);
             out.writeUTF(record.getContentClaimContainer());
@@ -261,24 +261,24 @@ public class StandardRecordWriter implements RecordWriter {
 
     @Override
     public synchronized void close() throws IOException {
-    	logger.trace("Closing Record Writer for {}", file.getName());
-    	
+        logger.trace("Closing Record Writer for {}", file.getName());
+
         lock();
         try {
-        	try {
-        		out.flush();
-        		out.close();
-        	} finally {
-        		rawOutStream.close();
-            
-	            if ( tocWriter != null ) {
-	            	tocWriter.close();
-	            }
-        	}
+            try {
+                out.flush();
+                out.close();
+            } finally {
+                rawOutStream.close();
+
+                if ( tocWriter != null ) {
+                    tocWriter.close();
+                }
+            }
         } finally {
             unlock();
         }
-        
+
     }
 
     @Override
@@ -308,14 +308,14 @@ public class StandardRecordWriter implements RecordWriter {
 
     @Override
     public void sync() throws IOException {
-    	if ( tocWriter != null ) {
-    		tocWriter.sync();
-    	}
-    	fos.getFD().sync();
+        if ( tocWriter != null ) {
+            tocWriter.sync();
+        }
+        fos.getFD().sync();
     }
-    
+
     @Override
     public TocWriter getTocWriter() {
-    	return tocWriter;
+        return tocWriter;
     }
 }


Mime
View raw message