nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [04/10] incubator-nifi git commit: NIFI-527: Merging develop
Date Mon, 27 Apr 2015 16:04:49 GMT
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
new file mode 100644
index 0000000..3943504
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.lucene;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndexManager implements Closeable {
+	private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
+	
+	private final Lock lock = new ReentrantLock();
+	private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
+	private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
+	
+	
+	public void removeIndex(final File indexDirectory) {
+		final File absoluteFile = indexDirectory.getAbsoluteFile();
+		logger.info("Removing index {}", indexDirectory);
+		
+		lock.lock();
+		try {
+			final IndexWriterCount count = writerCounts.remove(absoluteFile);
+			if ( count != null ) {
+				try {
+					count.close();
+				} catch (final IOException ioe) {
+					logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
+					if ( logger.isDebugEnabled() ) {
+						logger.warn("", ioe);
+					}
+				}
+			}
+			
+			for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
+				for ( final ActiveIndexSearcher searcher : searcherList ) {
+					try {
+						searcher.close();
+					} catch (final IOException ioe) {
+						logger.warn("Failed to close Index Searcher {} for {} due to {}", 
+								searcher.getSearcher(), absoluteFile, ioe);
+						if ( logger.isDebugEnabled() ) {
+							logger.warn("", ioe);
+						}
+					}
+				}
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+	
+	public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
+		final File absoluteFile = indexingDirectory.getAbsoluteFile();
+		logger.debug("Borrowing index writer for {}", indexingDirectory);
+		
+		lock.lock();
+		try {
+			IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+			if ( writerCount == null ) {
+				final List<Closeable> closeables = new ArrayList<>();
+                final Directory directory = FSDirectory.open(indexingDirectory);
+                closeables.add(directory);
+                
+                try {
+                	final Analyzer analyzer = new StandardAnalyzer();
+                	closeables.add(analyzer);
+                	
+                    final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
+                    config.setWriteLockTimeout(300000L);
+
+                    final IndexWriter indexWriter = new IndexWriter(directory, config);
+                    writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
+                    logger.debug("Providing new index writer for {}", indexingDirectory);
+                } catch (final IOException ioe) {
+                	for ( final Closeable closeable : closeables ) {
+                		try {
+                			closeable.close();
+                		} catch (final IOException ioe2) {
+                			ioe.addSuppressed(ioe2);
+                		}
+                	}
+                	
+                	throw ioe;
+                }
+                
+                writerCounts.put(absoluteFile, writerCount);
+			} else {
+				logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
+				writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+						writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+			}
+			
+			return writerCount.getWriter();
+		} finally {
+			lock.unlock();
+		}
+	}
+	
+	public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
+		final File absoluteFile = indexingDirectory.getAbsoluteFile();
+		logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
+		
+		lock.lock();
+		try {
+			IndexWriterCount count = writerCounts.remove(absoluteFile);
+			
+			try {
+				if ( count == null ) {
+					logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+							+ "This could potentially lead to a resource leak", writer, indexingDirectory);
+					writer.close();
+				} else if ( count.getCount() <= 1 ) {
+					// we are finished with this writer.
+					logger.debug("Closing Index Writer for {}", indexingDirectory);
+					count.close();
+				} else {
+					// decrement the count.
+					logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
+					writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
+				}
+			} catch (final IOException ioe) {
+				logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
+				if ( logger.isDebugEnabled() ) {
+					logger.warn("", ioe);
+				}
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	
+	public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
+		final File absoluteFile = indexDir.getAbsoluteFile();
+		logger.debug("Borrowing index searcher for {}", indexDir);
+		
+		lock.lock();
+		try {
+			// check if we already have a reader cached.
+			List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+			if ( currentlyCached == null ) {
+				currentlyCached = new ArrayList<>();
+				activeSearchers.put(absoluteFile, currentlyCached);
+			} else {
+				// keep track of any searchers that have been closed so that we can remove them
+				// from our cache later.
+				final Set<ActiveIndexSearcher> expired = new HashSet<>();
+				
+				try {
+					for ( final ActiveIndexSearcher searcher : currentlyCached ) {
+						if ( searcher.isCache() ) {
+							final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
+							if ( refCount <= 0 ) {
+								// if refCount == 0, then the reader has been closed, so we need to discard the searcher
+								logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+									+ "removing cached searcher", absoluteFile, refCount);
+								expired.add(searcher);
+								continue;
+							}
+							
+							logger.debug("Providing previously cached index searcher for {}", indexDir);
+							return searcher.getSearcher();
+						}
+					}
+				} finally {
+					// if we have any expired index searchers, we need to close them and remove them
+					// from the cache so that we don't try to use them again later.
+					for ( final ActiveIndexSearcher searcher : expired ) {
+						try {
+							searcher.close();
+						} catch (final Exception e) {
+							logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
+						}
+						
+						currentlyCached.remove(searcher);
+					}
+				}
+			}
+			
+			IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+			if ( writerCount == null ) {
+				final Directory directory = FSDirectory.open(absoluteFile);
+				logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
+				
+				try {
+					final DirectoryReader directoryReader = DirectoryReader.open(directory);
+					final IndexSearcher searcher = new IndexSearcher(directoryReader);
+					
+					// we want to cache the searcher that we create, since it's just a reader.
+					final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
+					currentlyCached.add(cached);
+					
+					return cached.getSearcher();
+				} catch (final IOException e) {
+					try {
+						directory.close();
+					} catch (final IOException ioe) {
+						e.addSuppressed(ioe);
+					}
+					
+					throw e;
+				}
+			} else {
+				logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
+						+ "counter to {}", indexDir, writerCount.getCount() + 1);
+
+				// increment the writer count to ensure that it's kept open.
+				writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+						writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+				
+				// create a new Index Searcher from the writer so that we don't have an issue with trying
+				// to read from a directory that's locked. If we get the "no segments* file found" with
+				// Lucene, this indicates that an IndexWriter already has the directory open.
+				final IndexWriter writer = writerCount.getWriter();
+				final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
+				final IndexSearcher searcher = new IndexSearcher(directoryReader);
+				
+				// we don't want to cache this searcher because it's based on a writer, so we want to get
+				// new values the next time that we search.
+				final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
+				
+				currentlyCached.add(activeSearcher);
+				return activeSearcher.getSearcher();
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+	
+	
+	public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
+		final File absoluteFile = indexDirectory.getAbsoluteFile();
+		logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
+		
+		lock.lock();
+		try {
+			// check if we already have a reader cached.
+			List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+			if ( currentlyCached == null ) {
+				logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
+						+ "result in a resource leak", indexDirectory);
+				return;
+			}
+			
+			final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
+			while (itr.hasNext()) {
+				final ActiveIndexSearcher activeSearcher = itr.next();
+				if ( activeSearcher.getSearcher().equals(searcher) ) {
+					if ( activeSearcher.isCache() ) {
+						// the searcher is cached. Just leave it open.
+						logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
+						return;
+					} else {
+						// searcher is not cached. It was created from a writer, and we want
+						// the newest updates the next time that we get a searcher, so we will
+						// go ahead and close this one out.
+						itr.remove();
+						
+						// decrement the writer count because we incremented it when creating the searcher
+						final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+						if ( writerCount != null ) {
+							if ( writerCount.getCount() <= 1 ) {
+								try {
+									logger.debug("Index searcher for {} is not cached. Writer count is "
+											+ "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
+									
+									writerCount.close();
+								} catch (final IOException ioe) {
+									logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
+									if ( logger.isDebugEnabled() ) {
+										logger.warn("", ioe);
+									}
+								}
+							} else {
+								logger.debug("Index searcher for {} is not cached. Writer count is decremented "
+										+ "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
+								
+								writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+									writerCount.getAnalyzer(), writerCount.getDirectory(), 
+									writerCount.getCount() - 1));
+							}
+						}
+
+						try {
+							logger.debug("Closing Index Searcher for {}", indexDirectory);
+							activeSearcher.close();
+						} catch (final IOException ioe) {
+							logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
+							if ( logger.isDebugEnabled() ) {
+								logger.warn("", ioe);
+							}
+						}
+					}
+				}
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+	
+	@Override
+	public void close() throws IOException {
+		logger.debug("Closing Index Manager");
+		
+		lock.lock();
+		try {
+			IOException ioe = null;
+			
+			for ( final IndexWriterCount count : writerCounts.values() ) {
+				try {
+					count.close();
+				} catch (final IOException e) {
+					if ( ioe == null ) {
+						ioe = e;
+					} else {
+						ioe.addSuppressed(e);
+					}
+				}
+			}
+			
+			for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
+				for (final ActiveIndexSearcher searcher : searcherList) {
+					try {
+						searcher.close();
+					} catch (final IOException e) {
+						if ( ioe == null ) {
+							ioe = e;
+						} else {
+							ioe.addSuppressed(e);
+						}
+					}
+				}
+			}
+			
+			if ( ioe != null ) {
+				throw ioe;
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	
+	private static void close(final Closeable... closeables) throws IOException {
+		IOException ioe = null;
+		for ( final Closeable closeable : closeables ) {
+			if ( closeable == null ) {
+				continue;
+			}
+			
+			try {
+				closeable.close();
+			} catch (final IOException e) {
+				if ( ioe == null ) {
+					ioe = e;
+				} else {
+					ioe.addSuppressed(e);
+				}
+			}
+		}
+		
+		if ( ioe != null ) {
+			throw ioe;
+		}
+	}
+	
+	
+	private static class ActiveIndexSearcher implements Closeable {
+		private final IndexSearcher searcher;
+		private final DirectoryReader directoryReader;
+		private final Directory directory;
+		private final boolean cache;
+		
+		public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader, 
+				Directory directory, final boolean cache) {
+			this.searcher = searcher;
+			this.directoryReader = directoryReader;
+			this.directory = directory;
+			this.cache = cache;
+		}
+
+		public boolean isCache() {
+			return cache;
+		}
+
+		public IndexSearcher getSearcher() {
+			return searcher;
+		}
+		
+		@Override
+		public void close() throws IOException {
+			IndexManager.close(directoryReader, directory);
+		}
+	}
+	
+	
+	private static class IndexWriterCount implements Closeable {
+		private final IndexWriter writer;
+		private final Analyzer analyzer;
+		private final Directory directory;
+		private final int count;
+		
+		public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
+			this.writer = writer;
+			this.analyzer = analyzer;
+			this.directory = directory;
+			this.count = count;
+		}
+
+		public Analyzer getAnalyzer() {
+			return analyzer;
+		}
+
+		public Directory getDirectory() {
+			return directory;
+		}
+
+		public IndexWriter getWriter() {
+			return writer;
+		}
+
+		public int getCount() {
+			return count;
+		}
+
+		@Override
+		public void close() throws IOException {
+			IndexManager.close(writer, analyzer, directory);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index e2854c3..dcb6e08 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -17,31 +17,33 @@
 package org.apache.nifi.provenance.lucene;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Date;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.nifi.provenance.PersistentProvenanceRepository;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.StandardQueryResult;
-
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexNotFoundException;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.provenance.PersistentProvenanceRepository;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.StandardQueryResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class IndexSearch {
-
+	private final Logger logger = LoggerFactory.getLogger(IndexSearch.class);
     private final PersistentProvenanceRepository repository;
     private final File indexDirectory;
+    private final IndexManager indexManager;
 
-    public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory) {
+    public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory, final IndexManager indexManager) {
         this.repository = repo;
         this.indexDirectory = indexDirectory;
+        this.indexManager = indexManager;
     }
 
     public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount) throws IOException {
@@ -55,30 +57,57 @@ public class IndexSearch {
         final StandardQueryResult sqr = new StandardQueryResult(provenanceQuery, 1);
         final Set<ProvenanceEventRecord> matchingRecords;
 
-        try (final DirectoryReader directoryReader = DirectoryReader.open(FSDirectory.open(indexDirectory))) {
-            final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
-            if (provenanceQuery.getEndDate() == null) {
-                provenanceQuery.setEndDate(new Date());
-            }
-            final Query luceneQuery = LuceneUtil.convertQuery(provenanceQuery);
+        if (provenanceQuery.getEndDate() == null) {
+            provenanceQuery.setEndDate(new Date());
+        }
+        final Query luceneQuery = LuceneUtil.convertQuery(provenanceQuery);
 
-            TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
+        final long start = System.nanoTime();
+        IndexSearcher searcher = null;
+        try {
+        	searcher = indexManager.borrowIndexSearcher(indexDirectory);
+            final long searchStartNanos = System.nanoTime();
+            final long openSearcherNanos = searchStartNanos - start;
+            
+            final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
+            final long finishSearch = System.nanoTime();
+            final long searchNanos = finishSearch - searchStartNanos;
+            
+            logger.debug("Searching {} took {} millis; opening searcher took {} millis", this, 
+            		TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
+            
             if (topDocs.totalHits == 0) {
                 sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
                 return sqr;
             }
 
             final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
-            matchingRecords = docsReader.read(topDocs, directoryReader, repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults());
-
+            matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults());
+            
+            final long readRecordsNanos = System.nanoTime() - finishSearch;
+            logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);
+            
             sqr.update(matchingRecords, topDocs.totalHits);
             return sqr;
-        } catch (final IndexNotFoundException e) {
-            // nothing has been indexed yet.
+        } catch (final FileNotFoundException e) {
+            // nothing has been indexed yet, or the data has already aged off
+        	logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e);
+        	if ( logger.isDebugEnabled() ) {
+        		logger.warn("", e);
+        	}
+        	
             sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
             return sqr;
+        } finally {
+        	if ( searcher != null ) {
+        		indexManager.returnIndexSearcher(indexDirectory, searcher);
+        	}
         }
     }
 
+    
+    @Override
+    public String toString() {
+    	return "IndexSearcher[" + indexDirectory + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
index 214267a..5e87913 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
@@ -24,27 +24,27 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.provenance.IndexConfiguration;
-import org.apache.nifi.provenance.PersistentProvenanceRepository;
-import org.apache.nifi.provenance.ProvenanceEventType;
-import org.apache.nifi.provenance.SearchableFields;
-import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.provenance.rollover.RolloverAction;
-import org.apache.nifi.provenance.search.SearchableField;
-import org.apache.nifi.provenance.serialization.RecordReader;
-import org.apache.nifi.provenance.serialization.RecordReaders;
-
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.IntField;
 import org.apache.lucene.document.LongField;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.provenance.IndexConfiguration;
+import org.apache.nifi.provenance.PersistentProvenanceRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.rollover.RolloverAction;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,15 +72,93 @@ public class IndexingAction implements RolloverAction {
         doc.add(new StringField(field.getSearchableFieldName(), value.toLowerCase(), store));
     }
 
+    
+    public void index(final StandardProvenanceEventRecord record, final IndexWriter indexWriter, final Integer blockIndex) throws IOException {
+        final Map<String, String> attributes = record.getAttributes();
+
+        final Document doc = new Document();
+        addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid(), Store.NO);
+        addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), Store.NO);
+        addField(doc, SearchableFields.ComponentID, record.getComponentId(), Store.NO);
+        addField(doc, SearchableFields.AlternateIdentifierURI, record.getAlternateIdentifierUri(), Store.NO);
+        addField(doc, SearchableFields.EventType, record.getEventType().name(), Store.NO);
+        addField(doc, SearchableFields.Relationship, record.getRelationship(), Store.NO);
+        addField(doc, SearchableFields.Details, record.getDetails(), Store.NO);
+        addField(doc, SearchableFields.ContentClaimSection, record.getContentClaimSection(), Store.NO);
+        addField(doc, SearchableFields.ContentClaimContainer, record.getContentClaimContainer(), Store.NO);
+        addField(doc, SearchableFields.ContentClaimIdentifier, record.getContentClaimIdentifier(), Store.NO);
+        addField(doc, SearchableFields.SourceQueueIdentifier, record.getSourceQueueIdentifier(), Store.NO);
+
+        if (nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) {
+            addField(doc, SearchableFields.TransitURI, record.getTransitUri(), Store.NO);
+        }
+
+        for (final SearchableField searchableField : attributeSearchableFields) {
+            addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), Store.NO);
+        }
+
+        final String storageFilename = LuceneUtil.substringBefore(record.getStorageFilename(), ".");
+
+        // Index the fields that we always index (unless there's nothing else to index at all)
+        if (!doc.getFields().isEmpty()) {
+            doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), record.getLineageStartDate(), Store.NO));
+            doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO));
+            doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO));
+            doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES));
+            
+            if ( blockIndex == null ) {
+            	doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES));
+            } else {
+	            doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES));
+	            doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES));
+            }
+            
+            for (final String lineageIdentifier : record.getLineageIdentifiers()) {
+                addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO);
+            }
+
+            // If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs.
+            if (record.getEventType() == ProvenanceEventType.FORK || record.getEventType() == ProvenanceEventType.CLONE || record.getEventType() == ProvenanceEventType.REPLAY) {
+                for (final String uuid : record.getChildUuids()) {
+                    if (!uuid.equals(record.getFlowFileUuid())) {
+                        addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
+                    }
+                }
+            } else if (record.getEventType() == ProvenanceEventType.JOIN) {
+                for (final String uuid : record.getParentUuids()) {
+                    if (!uuid.equals(record.getFlowFileUuid())) {
+                        addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
+                    }
+                }
+            } else if (record.getEventType() == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) {
+                // If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID
+                // that the Source System uses to refer to the data.
+                final String sourceIdentifier = record.getSourceSystemFlowFileIdentifier();
+                final String sourceFlowFileUUID;
+                final int lastColon = sourceIdentifier.lastIndexOf(":");
+                if (lastColon > -1 && lastColon < sourceIdentifier.length() - 2) {
+                    sourceFlowFileUUID = sourceIdentifier.substring(lastColon + 1);
+                } else {
+                    sourceFlowFileUUID = null;
+                }
+
+                if (sourceFlowFileUUID != null) {
+                    addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID, Store.NO);
+                }
+            }
+
+            indexWriter.addDocument(doc);
+        }
+    }
+    
     @Override
-    @SuppressWarnings("deprecation")
     public File execute(final File fileRolledOver) throws IOException {
         final File indexingDirectory = indexConfiguration.getWritableIndexDirectory(fileRolledOver);
         int indexCount = 0;
         long maxId = -1L;
 
         try (final Directory directory = FSDirectory.open(indexingDirectory);
-                final Analyzer analyzer = new StandardAnalyzer(LuceneUtil.LUCENE_VERSION)) {
+                final Analyzer analyzer = new StandardAnalyzer()) {
 
             final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
             config.setWriteLockTimeout(300000L);
@@ -89,6 +167,13 @@ public class IndexingAction implements RolloverAction {
                     final RecordReader reader = RecordReaders.newRecordReader(fileRolledOver, repository.getAllLogFiles())) {
                 StandardProvenanceEventRecord record;
                 while (true) {
+                	final Integer blockIndex;
+                	if ( reader.isBlockIndexAvailable() ) {
+                		blockIndex = reader.getBlockIndex();
+                	} else {
+                		blockIndex = null;
+                	}
+                	
                     try {
                         record = reader.nextRecord();
                     } catch (final EOFException eof) {
@@ -104,76 +189,8 @@ public class IndexingAction implements RolloverAction {
 
                     maxId = record.getEventId();
 
-                    final Map<String, String> attributes = record.getAttributes();
-
-                    final Document doc = new Document();
-                    addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid(), Store.NO);
-                    addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), Store.NO);
-                    addField(doc, SearchableFields.ComponentID, record.getComponentId(), Store.NO);
-                    addField(doc, SearchableFields.AlternateIdentifierURI, record.getAlternateIdentifierUri(), Store.NO);
-                    addField(doc, SearchableFields.EventType, record.getEventType().name(), Store.NO);
-                    addField(doc, SearchableFields.Relationship, record.getRelationship(), Store.NO);
-                    addField(doc, SearchableFields.Details, record.getDetails(), Store.NO);
-                    addField(doc, SearchableFields.ContentClaimSection, record.getContentClaimSection(), Store.NO);
-                    addField(doc, SearchableFields.ContentClaimContainer, record.getContentClaimContainer(), Store.NO);
-                    addField(doc, SearchableFields.ContentClaimIdentifier, record.getContentClaimIdentifier(), Store.NO);
-                    addField(doc, SearchableFields.SourceQueueIdentifier, record.getSourceQueueIdentifier(), Store.NO);
-
-                    if (nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) {
-                        addField(doc, SearchableFields.TransitURI, record.getTransitUri(), Store.NO);
-                    }
-
-                    for (final SearchableField searchableField : attributeSearchableFields) {
-                        addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), Store.NO);
-                    }
-
-                    final String storageFilename = LuceneUtil.substringBefore(record.getStorageFilename(), ".");
-
-                    // Index the fields that we always index (unless there's nothing else to index at all)
-                    if (!doc.getFields().isEmpty()) {
-                        doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), record.getLineageStartDate(), Store.NO));
-                        doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO));
-                        doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO));
-                        doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES));
-                        doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES));
-
-                        for (final String lineageIdentifier : record.getLineageIdentifiers()) {
-                            addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO);
-                        }
-
-                        // If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs.
-                        if (record.getEventType() == ProvenanceEventType.FORK || record.getEventType() == ProvenanceEventType.CLONE || record.getEventType() == ProvenanceEventType.REPLAY) {
-                            for (final String uuid : record.getChildUuids()) {
-                                if (!uuid.equals(record.getFlowFileUuid())) {
-                                    addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
-                                }
-                            }
-                        } else if (record.getEventType() == ProvenanceEventType.JOIN) {
-                            for (final String uuid : record.getParentUuids()) {
-                                if (!uuid.equals(record.getFlowFileUuid())) {
-                                    addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
-                                }
-                            }
-                        } else if (record.getEventType() == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) {
-                            // If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID
-                            // that the Source System uses to refer to the data.
-                            final String sourceIdentifier = record.getSourceSystemFlowFileIdentifier();
-                            final String sourceFlowFileUUID;
-                            final int lastColon = sourceIdentifier.lastIndexOf(":");
-                            if (lastColon > -1 && lastColon < sourceIdentifier.length() - 2) {
-                                sourceFlowFileUUID = sourceIdentifier.substring(lastColon + 1);
-                            } else {
-                                sourceFlowFileUUID = null;
-                            }
-
-                            if (sourceFlowFileUUID != null) {
-                                addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID, Store.NO);
-                            }
-                        }
-
-                        indexWriter.addDocument(doc);
-                        indexCount++;
-                    }
+                    index(record, indexWriter, blockIndex);
+                    indexCount++;
                 }
 
                 indexWriter.commit();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
index a7076d5..59dc10b 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
@@ -27,8 +27,8 @@ import java.util.List;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.search.SearchTerm;
-
 import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.BooleanClause;
 import org.apache.lucene.search.BooleanClause.Occur;
@@ -78,7 +78,16 @@ public class LuceneUtil {
         final String searchString = baseName + ".";
         for (final Path path : allProvenanceLogs) {
             if (path.toFile().getName().startsWith(searchString)) {
-                matchingFiles.add(path.toFile());
+            	final File file = path.toFile();
+            	if ( file.exists() ) {
+            		matchingFiles.add(file);
+            	} else {
+            		final File dir = file.getParentFile();
+            		final File gzFile = new File(dir, file.getName() + ".gz");
+            		if ( gzFile.exists() ) {
+            			matchingFiles.add(gzFile);
+            		}
+            	}
             }
         }
 
@@ -132,6 +141,19 @@ public class LuceneUtil {
                     return filenameComp;
                 }
 
+                final IndexableField fileOffset1 = o1.getField(FieldNames.BLOCK_INDEX);
+                final IndexableField fileOffset2 = o1.getField(FieldNames.BLOCK_INDEX);
+                if ( fileOffset1 != null && fileOffset2 != null ) {
+                	final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue());
+                	if ( blockIndexResult != 0 ) {
+                		return blockIndexResult;
+                	}
+                	
+                	final long eventId1 = o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+                	final long eventId2 = o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+                	return Long.compare(eventId1, eventId2);
+                }
+                
                 final long offset1 = o1.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
                 final long offset2 = o2.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
                 return Long.compare(offset1, offset2);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
index 862bc2b..8bdc88a 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
@@ -20,12 +20,79 @@ import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.toc.TocReader;
 
 public interface RecordReader extends Closeable {
 
+	/**
+	 * Returns the next record in the reader, or <code>null</code> if there is no more data available.
+	 * @return
+	 * @throws IOException
+	 */
     StandardProvenanceEventRecord nextRecord() throws IOException;
 
+    /**
+     * Skips the specified number of bytes
+     * @param bytesToSkip
+     * @throws IOException
+     */
     void skip(long bytesToSkip) throws IOException;
 
+    /**
+     * Skips to the specified byte offset in the underlying stream.
+     * @param position
+     * @throws IOException if the underlying stream throws IOException, or if the reader has already
+     * passed the specified byte offset
+     */
     void skipTo(long position) throws IOException;
+    
+    /**
+     * Skips to the specified compression block
+     * 
+     * @param blockIndex
+     * @throws IOException if the underlying stream throws IOException, or if the reader has already
+     * read passed the specified compression block index
+     * @throws IllegalStateException if the RecordReader does not have a TableOfContents associated with it
+     */
+    void skipToBlock(int blockIndex) throws IOException;
+    
+    /**
+     * Returns the block index that the Reader is currently reading from.
+     * Note that the block index is incremented at the beginning of the {@link #nextRecord()}
+     * method. This means that this method will return the block from which the previous record was read, 
+     * if calling {@link #nextRecord()} continually, not the block from which the next record will be read.
+     * @return
+     */
+    int getBlockIndex();
+    
+    /**
+     * Returns <code>true</code> if the compression block index is available. It will be available
+     * if and only if the reader is created with a TableOfContents
+     * 
+     * @return
+     */
+    boolean isBlockIndexAvailable();
+    
+    /**
+     * Returns the {@link TocReader} that is used to keep track of compression blocks, if one exists,
+     * <code>null</code> otherwise
+     * @return
+     */
+    TocReader getTocReader();
+    
+    /**
+     * Returns the number of bytes that have been consumed from the stream (read or skipped).
+     * @return
+     */
+    long getBytesConsumed();
+    
+    /**
+     * Returns the ID of the last event in this record reader, or -1 if the reader has no records or
+     * has already read through all records. Note: This method will consume the stream until the end,
+     * so no more records will be available on this reader after calling this method.
+     * 
+     * @return
+     * @throws IOException
+     */
+    long getMaxEventId() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index 8f06995..dff281c 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.provenance.serialization;
 
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -24,82 +23,90 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Path;
 import java.util.Collection;
-import java.util.zip.GZIPInputStream;
 
-import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.provenance.StandardRecordReader;
 import org.apache.nifi.provenance.lucene.LuceneUtil;
+import org.apache.nifi.provenance.toc.StandardTocReader;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.provenance.toc.TocUtil;
 
 public class RecordReaders {
 
     public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles) throws IOException {
         final File originalFile = file;
-        
-        if (!file.exists()) {
-            if (provenanceLogFiles == null) {
-                throw new FileNotFoundException(file.toString());
-            }
-
-            final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + ".";
-            for (final Path path : provenanceLogFiles) {
-                if (path.toFile().getName().startsWith(baseName)) {
-                    file = path.toFile();
-                    break;
-                }
-            }
-        }
-
         InputStream fis = null;
-        if ( file.exists() ) {
-            try {
-                fis = new FileInputStream(file);
-            } catch (final FileNotFoundException fnfe) {
-                fis = null;
-            }
-        }
-        
-        openStream: while ( fis == null ) {
-            final File dir = file.getParentFile();
-            final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
-            
-            // depending on which rollover actions have occurred, we could have 3 possibilities for the
-            // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
-            // because most often we are compressing on rollover and most often we have already finished
-            // compressing by the time that we are querying the data.
-            for ( final String extension : new String[] {".indexed.prov.gz", ".indexed.prov", ".prov"} ) {
-                file = new File(dir, baseName + extension);
-                if ( file.exists() ) {
-                    try {
-                        fis = new FileInputStream(file);
-                        break openStream;
-                    } catch (final FileNotFoundException fnfe) {
-                        // file was modified by a RolloverAction after we verified that it exists but before we could
-                        // create an InputStream for it. Start over.
-                        fis = null;
-                        continue openStream;
-                    }
-                }
-            }
-            
-            break;
-        }
 
-        if ( fis == null ) {
-            throw new FileNotFoundException("Unable to locate file " + originalFile);
-        }
-        final InputStream readableStream;
-        if (file.getName().endsWith(".gz")) {
-            readableStream = new BufferedInputStream(new GZIPInputStream(fis));
-        } else {
-            readableStream = new BufferedInputStream(fis);
+        try {
+	        if (!file.exists()) {
+	            if (provenanceLogFiles != null) {
+		            final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + ".";
+		            for (final Path path : provenanceLogFiles) {
+		                if (path.toFile().getName().startsWith(baseName)) {
+		                    file = path.toFile();
+		                    break;
+		                }
+		            }
+	            }
+	        }
+	
+	        if ( file.exists() ) {
+	            try {
+	                fis = new FileInputStream(file);
+	            } catch (final FileNotFoundException fnfe) {
+	                fis = null;
+	            }
+	        }
+	        
+	        String filename = file.getName();
+	        openStream: while ( fis == null ) {
+	            final File dir = file.getParentFile();
+	            final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
+	            
+	            // depending on which rollover actions have occurred, we could have 3 possibilities for the
+	            // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
+	            // because most often we are compressing on rollover and most often we have already finished
+	            // compressing by the time that we are querying the data.
+	            for ( final String extension : new String[] {".prov.gz", ".prov"} ) {
+	                file = new File(dir, baseName + extension);
+	                if ( file.exists() ) {
+	                    try {
+	                        fis = new FileInputStream(file);
+	                        filename = baseName + extension;
+	                        break openStream;
+	                    } catch (final FileNotFoundException fnfe) {
+	                        // file was modified by a RolloverAction after we verified that it exists but before we could
+	                        // create an InputStream for it. Start over.
+	                        fis = null;
+	                        continue openStream;
+	                    }
+	                }
+	            }
+	            
+	            break;
+	        }
+	
+	        if ( fis == null ) {
+	            throw new FileNotFoundException("Unable to locate file " + originalFile);
+	        }
+	
+	    	final File tocFile = TocUtil.getTocFile(file);
+	    	if ( tocFile.exists() ) {
+	    		final TocReader tocReader = new StandardTocReader(tocFile);
+	    		return new StandardRecordReader(fis, filename, tocReader);
+	    	} else {
+	    		return new StandardRecordReader(fis, filename);
+	    	}
+        } catch (final IOException ioe) {
+        	if ( fis != null ) {
+        		try {
+        			fis.close();
+        		} catch (final IOException inner) {
+        			ioe.addSuppressed(inner);
+        		}
+        	}
+        	
+        	throw ioe;
         }
-
-        final DataInputStream dis = new DataInputStream(readableStream);
-        @SuppressWarnings("unused")
-        final String repoClassName = dis.readUTF();
-        final int serializationVersion = dis.readInt();
-
-        return new StandardRecordReader(dis, serializationVersion, file.getName());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index de98ab9..58f4dc2 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.toc.TocWriter;
 
 public interface RecordWriter extends Closeable {
 
@@ -82,4 +83,9 @@ public interface RecordWriter extends Closeable {
      */
     void sync() throws IOException;
 
+    /**
+     * Returns the TOC Writer that is being used to write the Table of Contents for this journal
+     * @return
+     */
+    TocWriter getTocWriter();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
index 15349de..47b7c7e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
@@ -20,11 +20,20 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.nifi.provenance.StandardRecordWriter;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
 
 public class RecordWriters {
+	private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024;	// 1 MB
 
-    public static RecordWriter newRecordWriter(final File file) throws IOException {
-        return new StandardRecordWriter(file);
+    public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc) throws IOException {
+    	return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE);
+    }
+    
+    public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc, final int compressionBlockBytes) throws IOException {
+    	final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
+        return new StandardRecordWriter(file, tocWriter, compressed, compressionBlockBytes);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
new file mode 100644
index 0000000..8944cec
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Standard implementation of TocReader.
+ * 
+ * Expects .toc file to be in the following format;
+ * 
+ * byte 0: version
+ * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed
+ * byte 2-9: long: offset of block 0
+ * byte 10-17: long: offset of block 1
+ * ...
+ * byte (N*8+2)-(N*8+9): long: offset of block N
+ */
+public class StandardTocReader implements TocReader {
+    private final boolean compressed;
+    private final long[] offsets;
+    
+    public StandardTocReader(final File file) throws IOException {
+        try (final FileInputStream fis = new FileInputStream(file);
+             final DataInputStream dis = new DataInputStream(fis)) {
+            
+            final int version = dis.read();
+            if ( version < 0 ) {
+                throw new EOFException();
+            }
+            
+            final int compressionFlag = dis.read();
+            if ( compressionFlag < 0 ) {
+                throw new EOFException();
+            }
+            
+            if ( compressionFlag == 0 ) {
+                compressed = false;
+            } else if ( compressionFlag == 1 ) {
+                compressed = true;
+            } else {
+                throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag);
+            }
+            
+            final int numBlocks = (int) ((file.length() - 2) / 8);
+            offsets = new long[numBlocks];
+            
+            for (int i=0; i < numBlocks; i++) {
+                offsets[i] = dis.readLong();
+            }
+        }
+    }
+    
+    @Override
+    public boolean isCompressed() {
+        return compressed;
+    }
+    
+    @Override
+    public long getBlockOffset(final int blockIndex) {
+        if ( blockIndex >= offsets.length ) {
+            return -1L;
+        }
+        return offsets[blockIndex];
+    }
+
+    @Override
+    public long getLastBlockOffset() {
+        if ( offsets.length == 0 ) {
+            return 0L;
+        }
+        return offsets[offsets.length - 1];
+    }
+    
+    @Override
+    public void close() throws IOException {
+    }
+
+	@Override
+	public int getBlockIndex(final long blockOffset) {
+		for (int i=0; i < offsets.length; i++) {
+			if ( offsets[i] > blockOffset ) {
+				return i-1;
+			}
+		}
+		
+		return offsets.length - 1;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
new file mode 100644
index 0000000..488f225
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Standard implementation of {@link TocWriter}.
+ * 
+ * Format of .toc file:
+ * byte 0: version
+ * byte 1: compressed: 0 -> not compressed, 1 -> compressed
+ * byte 2-9: long: offset of block 0
+ * byte 10-17: long: offset of block 1
+ * ...
+ * byte (N*8+2)-(N*8+9): long: offset of block N
+ */
+public class StandardTocWriter implements TocWriter {
+	private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class);
+	
+    public static final byte VERSION = 1;
+    
+    private final File file;
+    private final FileOutputStream fos;
+    private final boolean alwaysSync;
+    private int index = -1;
+    
+    /**
+     * Creates a StandardTocWriter that writes to the given file.
+     * @param file the file to write to
+     * @param compressionFlag whether or not the journal is compressed
+     * @throws FileNotFoundException 
+     */
+    public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException {
+        final File tocDir = file.getParentFile();
+        if ( !tocDir.exists() ) {
+        	Files.createDirectories(tocDir.toPath());
+        }
+        
+        this.file = file;
+        fos = new FileOutputStream(file);
+        this.alwaysSync = alwaysSync;
+
+        final byte[] header = new byte[2];
+        header[0] = VERSION;
+        header[1] = (byte) (compressionFlag ? 1 : 0);
+        fos.write(header);
+        fos.flush();
+        
+        if ( alwaysSync ) {
+            sync();
+        }
+    }
+    
+    @Override
+    public void addBlockOffset(final long offset) throws IOException {
+        final BufferedOutputStream bos = new BufferedOutputStream(fos);
+        final DataOutputStream dos = new DataOutputStream(bos);
+        dos.writeLong(offset);
+        dos.flush();
+        index++;
+        logger.debug("Adding block {} at offset {}", index, offset);
+        
+        if ( alwaysSync ) {
+            sync();
+        }
+    }
+    
+    @Override
+    public void sync() throws IOException {
+    	fos.getFD().sync();
+    }
+    
+    @Override
+    public int getCurrentBlockIndex() {
+        return index;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (alwaysSync) {
+            fos.getFD().sync();
+        }
+        
+        fos.close();
+    }
+    
+    @Override
+    public File getFile() {
+        return file;
+    }
+    
+    @Override
+    public String toString() {
+        return "TOC Writer for " + file;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
new file mode 100644
index 0000000..7c197be
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.Closeable;
+
+/**
+ * <p>
+ * Reads a Table of Contents (.toc file) for a corresponding Journal File. We use a Table of Contents
+ * to map a Block Index to an offset into the Journal file where that Block begins. We do this so that
+ * we can then persist a Block Index for an event and then compress the Journal later. This way, we can
+ * get good compression by compressing a large batch of events at once, and this way we can also look up
+ * an event in a Journal that has not been compressed by looking in the Table of Contents or lookup the
+ * event in a Journal post-compression by simply rewriting the TOC while we compress the data.
+ * </p>
+ */
+public interface TocReader extends Closeable {
+
+    /**
+     * Indicates whether or not the corresponding Journal file is compressed
+     * @return
+     */
+    boolean isCompressed();
+
+    /**
+     * Returns the byte offset into the Journal File for the Block with the given index.
+     * @param blockIndex
+     * @return
+     */
+    long getBlockOffset(int blockIndex);
+    
+    /**
+     * Returns the byte offset into the Journal File of the last Block in the given index
+     * @return
+     */
+    long getLastBlockOffset();
+    
+    /**
+     * Returns the index of the block that contains the given offset
+     * @param blockOffset
+     * @return
+     */
+    int getBlockIndex(long blockOffset);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
new file mode 100644
index 0000000..c30ac98
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.File;
+
+import org.apache.nifi.provenance.lucene.LuceneUtil;
+
+public class TocUtil {
+
+	/**
+	 * Returns the file that should be used as the Table of Contents for the given Journal File
+	 * @param journalFile
+	 * @return
+	 */
+	public static File getTocFile(final File journalFile) {
+    	final File tocDir = new File(journalFile.getParentFile(), "toc");
+    	final String basename = LuceneUtil.substringBefore(journalFile.getName(), ".");
+    	final File tocFile = new File(tocDir, basename + ".toc");
+    	return tocFile;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
new file mode 100644
index 0000000..c678053
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Writes a .toc file
+ */
+public interface TocWriter extends Closeable {
+
+    /**
+     * Adds the given block offset as the next Block Offset in the Table of Contents
+     * @param offset
+     * @throws IOException
+     */
+    void addBlockOffset(long offset) throws IOException;
+    
+    /**
+     * Returns the index of the current Block
+     * @return
+     */
+    int getCurrentBlockIndex();
+    
+    /**
+     * Returns the file that is currently being written to
+     * @return
+     */
+    File getFile();
+
+    /**
+     * Synchronizes the data with the underlying storage device
+     * @throws IOException
+     */
+    void sync() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 5be208b..25a363f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.provenance;
 
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -25,14 +26,14 @@ import java.io.FileFilter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.core.SimpleAnalyzer;
@@ -45,7 +46,6 @@ import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.provenance.lineage.EventNode;
 import org.apache.nifi.provenance.lineage.Lineage;
 import org.apache.nifi.provenance.lineage.LineageEdge;
@@ -59,8 +59,10 @@ import org.apache.nifi.provenance.search.SearchableField;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.file.FileUtils;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -72,87 +74,47 @@ public class TestPersistentProvenanceRepository {
     public TestName name = new TestName();
 
     private PersistentProvenanceRepository repo;
+    private RepositoryConfiguration config;
     
     public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
 
     private RepositoryConfiguration createConfiguration() {
-        final RepositoryConfiguration config = new RepositoryConfiguration();
+        config = new RepositoryConfiguration();
         config.addStorageDirectory(new File("target/storage/" + UUID.randomUUID().toString()));
-        config.setCompressOnRollover(false);
+        config.setCompressOnRollover(true);
         config.setMaxEventFileLife(2000L, TimeUnit.SECONDS);
+        config.setCompressionBlockBytes(100);
         return config;
     }
 
+    @BeforeClass
+    public static void setLogLevel() {
+    	System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+    }
+    
     @Before
     public void printTestName() {
         System.out.println("\n\n\n***********************  " + name.getMethodName() + "  *****************************");
     }
 
     @After
-    public void closeRepo() {
+    public void closeRepo() throws IOException {
         if (repo != null) {
             try {
                 repo.close();
             } catch (final IOException ioe) {
             }
         }
+        
+        // Delete all of the storage files. We do this in order to clean up the tons of files that
+        // we create but also to ensure that we have closed all of the file handles. If we leave any
+        // streams open, for instance, this will throw an IOException, causing our unit test to fail.
+        for ( final File storageDir : config.getStorageDirectories() ) {
+        	FileUtils.deleteFile(storageDir, true);
+        }
     }
 
-    private FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) {
-        final Map<String, String> attrCopy = new HashMap<>(attributes);
-
-        return new FlowFile() {
-            @Override
-            public long getId() {
-                return id;
-            }
-
-            @Override
-            public long getEntryDate() {
-                return System.currentTimeMillis();
-            }
-
-            @Override
-            public Set<String> getLineageIdentifiers() {
-                return new HashSet<String>();
-            }
-
-            @Override
-            public long getLineageStartDate() {
-                return System.currentTimeMillis();
-            }
-
-            @Override
-            public Long getLastQueueDate() {
-                return System.currentTimeMillis();
-            }
-
-            @Override
-            public boolean isPenalized() {
-                return false;
-            }
-
-            @Override
-            public String getAttribute(final String s) {
-                return attrCopy.get(s);
-            }
-
-            @Override
-            public long getSize() {
-                return fileSize;
-            }
-
-            @Override
-            public Map<String, String> getAttributes() {
-                return attrCopy;
-            }
-
-            @Override
-            public int compareTo(final FlowFile o) {
-                return 0;
-            }
-        };
-    }
+    
 
     private EventReporter getEventReporter() {
         return new EventReporter() {
@@ -261,6 +223,8 @@ public class TestPersistentProvenanceRepository {
             repo.registerEvent(record);
         }
 
+        Thread.sleep(1000L);
+        
         repo.close();
         Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.)
 
@@ -417,10 +381,10 @@ public class TestPersistentProvenanceRepository {
     @Test
     public void testIndexAndCompressOnRolloverAndSubsequentSearch() throws IOException, InterruptedException, ParseException {
         final RepositoryConfiguration config = createConfiguration();
-        config.setMaxRecordLife(3, TimeUnit.SECONDS);
-        config.setMaxStorageCapacity(1024L * 1024L);
+        config.setMaxRecordLife(30, TimeUnit.SECONDS);
+        config.setMaxStorageCapacity(1024L * 1024L * 10);
         config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
-        config.setMaxEventFileCapacity(1024L * 1024L);
+        config.setMaxEventFileCapacity(1024L * 1024L * 10);
         config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
@@ -923,12 +887,16 @@ public class TestPersistentProvenanceRepository {
         final PersistentProvenanceRepository secondRepo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
         secondRepo.initialize(getEventReporter());
 
-        final ProvenanceEventRecord event11 = builder.build();
-        secondRepo.registerEvent(event11);
-        secondRepo.waitForRollover();
-        final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L);
-        assertNotNull(event11Retrieved);
-        assertEquals(10, event11Retrieved.getEventId());
+        try {
+	        final ProvenanceEventRecord event11 = builder.build();
+	        secondRepo.registerEvent(event11);
+	        secondRepo.waitForRollover();
+	        final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L);
+	        assertNotNull(event11Retrieved);
+	        assertEquals(10, event11Retrieved.getEventId());
+        } finally {
+        	secondRepo.close();
+        }
     }
 
     @Test
@@ -998,6 +966,73 @@ public class TestPersistentProvenanceRepository {
         storageDirFiles = config.getStorageDirectories().get(0).listFiles(indexFileFilter);
         assertEquals(0, storageDirFiles.length);
     }
+    
+    
+    @Test
+    public void testBackPressure() throws IOException, InterruptedException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxEventFileCapacity(1L);	// force rollover on each record.
+        config.setJournalCount(1);
+        
+        final AtomicInteger journalCountRef = new AtomicInteger(0);
+        
+    	repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
+    		@Override
+    		protected int getJournalCount() {
+    			return journalCountRef.get();
+    		}
+    	};
+        repo.initialize(getEventReporter());
+
+    	final Map<String, String> attributes = new HashMap<>();
+    	final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", UUID.randomUUID().toString());
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        // ensure that we can register the events.
+        for (int i = 0; i < 10; i++) {
+            builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+            attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
+            repo.registerEvent(builder.build());
+        }
+
+        // set number of journals to 6 so that we will block.
+        journalCountRef.set(6);
+
+        final AtomicLong threadNanos = new AtomicLong(0L);
+        final Thread t = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				final long start = System.nanoTime();
+		        builder.fromFlowFile(createFlowFile(13, 3000L, attributes));
+		        attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13);
+		        repo.registerEvent(builder.build());
+		        threadNanos.set(System.nanoTime() - start);
+			}
+        });
+        t.start();
+
+        Thread.sleep(1500L);
+        
+        journalCountRef.set(1);
+        t.join();
+        
+        final int threadMillis = (int) TimeUnit.NANOSECONDS.toMillis(threadNanos.get());
+        assertTrue(threadMillis > 1200);	// use 1200 to account for the fact that the timing is not exact
+        
+        builder.fromFlowFile(createFlowFile(15, 3000L, attributes));
+        attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 15);
+        repo.registerEvent(builder.build());
+    }
+    
+    
+    // TODO: test EOF on merge
+    // TODO: Test journal with no records
 
     @Test
     public void testTextualQuery() throws InterruptedException, IOException, ParseException {


Mime
View raw message