nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [08/12] incubator-nifi git commit: NIFI-388: Initial implementation of prov repo; not yet finished but pushing so that the code is not lost
Date Fri, 27 Feb 2015 20:47:35 GMT
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java
index 4cce231..0753e9e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java
@@ -19,28 +19,39 @@ package org.apache.nifi.provenance.journaling.query;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.nifi.provenance.AsyncLineageSubmission;
 import org.apache.nifi.provenance.AsyncQuerySubmission;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.StoredProvenanceEvent;
 import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
 import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
 import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
 import org.apache.nifi.provenance.journaling.index.EventIndexSearcher;
+import org.apache.nifi.provenance.journaling.index.IndexAction;
+import org.apache.nifi.provenance.journaling.index.IndexManager;
 import org.apache.nifi.provenance.journaling.index.QueryUtils;
 import org.apache.nifi.provenance.journaling.index.SearchResult;
+import org.apache.nifi.provenance.journaling.index.VoidIndexAction;
 import org.apache.nifi.provenance.journaling.journals.JournalReader;
 import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
-import org.apache.nifi.provenance.journaling.partition.Partition;
-import org.apache.nifi.provenance.journaling.partition.PartitionManager;
-import org.apache.nifi.provenance.journaling.partition.VoidPartitionAction;
 import org.apache.nifi.provenance.journaling.toc.StandardTocReader;
 import org.apache.nifi.provenance.journaling.toc.TocReader;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.lineage.LineageComputationType;
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.QuerySubmission;
 import org.slf4j.Logger;
@@ -50,14 +61,17 @@ public class StandardQueryManager implements QueryManager {
     private static final Logger logger = LoggerFactory.getLogger(StandardQueryManager.class);
     
     private final int maxConcurrentQueries;
+    private final IndexManager indexManager;
+    private final ExecutorService executor;
     private final JournalingRepositoryConfig config;
-    private final PartitionManager partitionManager;
     private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, AsyncLineageSubmission> lineageSubmissionMap = new ConcurrentHashMap<>();
     
-    public StandardQueryManager(final PartitionManager partitionManager, final JournalingRepositoryConfig config, final int maxConcurrentQueries) {
+    public StandardQueryManager(final IndexManager indexManager, final ExecutorService executor, final JournalingRepositoryConfig config, final int maxConcurrentQueries) {
         this.config = config;
         this.maxConcurrentQueries = maxConcurrentQueries;
-        this.partitionManager = partitionManager;
+        this.indexManager = indexManager;
+        this.executor = executor;
     }
     
     @Override
@@ -74,12 +88,66 @@ public class StandardQueryManager implements QueryManager {
         if (query.getSearchTerms().isEmpty() && query.getStartDate() == null && query.getEndDate() == null) {
             final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 1);
 
+            // empty query. Just get the latest events.
+            final Runnable runnable = new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        logger.debug("Fetching latest events from Provenance repo");
+                        final long indexStartNanos = System.nanoTime();
+                        
+                        // Query each index for the latest events.
+                        final Set<List<JournaledStorageLocation>> locationSet = indexManager.withEachIndex(new IndexAction<List<JournaledStorageLocation>>() {
+                            @Override
+                            public List<JournaledStorageLocation> perform(final EventIndexSearcher searcher) throws IOException {
+                                return searcher.getLatestEvents(query.getMaxResults());
+                            }
+                        });
+                        final long indexMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - indexStartNanos);
+                        final long retrievalStartNanos = System.nanoTime();
+                        
+                        final List<JournaledStorageLocation> orderedLocations = new ArrayList<>();
+                        for ( final List<JournaledStorageLocation> locations : locationSet ) {
+                            orderedLocations.addAll(locations);
+                        }
+                        
+                        Collections.sort(orderedLocations, new Comparator<JournaledStorageLocation>() {
+                            @Override
+                            public int compare(final JournaledStorageLocation o1, final JournaledStorageLocation o2) {
+                                return Long.compare(o1.getEventId(), o2.getEventId());
+                            }
+                        });
+
+                        final List<JournaledStorageLocation> locationsToKeep;
+                        if ( orderedLocations.size() > query.getMaxResults() ) {
+                            locationsToKeep = orderedLocations.subList(0, query.getMaxResults());
+                        } else {
+                            locationsToKeep = orderedLocations;
+                        }
+                        
+                        final List<StoredProvenanceEvent> matchingRecords = getEvents(locationsToKeep, new AtomicInteger(locationsToKeep.size()));
+                        
+                        final long totalNumEvents = indexManager.getNumberOfEvents();
+                        final long retrievalMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - retrievalStartNanos);
+                        logger.debug("Updated query result with {} matching records; total number of events = {}; index search took {} millis, event retrieval took {} millis", matchingRecords.size(), totalNumEvents, indexMillis, retrievalMillis);
+                        result.getResult().update(matchingRecords, totalNumEvents);
+                    } catch (final Exception e) {
+                        result.getResult().setError("Failed to obtain latest events in repository due to " + e);
+                        logger.error("Failed to obtain latest events in repository due to {}", e.toString());
+                        if ( logger.isDebugEnabled() ) {
+                            logger.error("", e);
+                        }
+                    }
+                }
+            };
+            
+            executor.submit(runnable);
             querySubmissionMap.put(query.getIdentifier(), result);
             return result;
         }
 
         final AtomicInteger retrievalCount = new AtomicInteger(query.getMaxResults());
-        final AsyncQuerySubmission submission = new AsyncQuerySubmission(query, config.getPartitionCount()) {
+        final AsyncQuerySubmission submission = new AsyncQuerySubmission(query, indexManager.getNumberOfIndices()) {
             @Override
             public void cancel() {
                 super.cancel();
@@ -89,56 +157,252 @@ public class StandardQueryManager implements QueryManager {
         
         querySubmissionMap.put(query.getIdentifier(), submission);
 
-        partitionManager.withEachPartition(new VoidPartitionAction() {
-            @SuppressWarnings({ "rawtypes", "unchecked" })
-            @Override
-            public void perform(final Partition partition) throws IOException {
-                logger.debug("Running {} against {}", query, partition);
-                
-                try (final EventIndexSearcher searcher = partition.newIndexSearcher()) {
-                    final SearchResult searchResult = searcher.search(query);
-                    logger.debug("{} has {} hits against {} over {} files", query, searchResult.getTotalCount(), partition, searchResult.getLocations().size());
-                    
-                    final List<StoredProvenanceEvent> matchingRecords = new ArrayList<>();
-                    final Map<File, List<JournaledStorageLocation>> locationMap = QueryUtils.orderLocations((List) searchResult.getLocations(), config);
-                    for ( final Map.Entry<File, List<JournaledStorageLocation>> entry : locationMap.entrySet() ) {
-                        final File journalFile = entry.getKey();
-                        final List<JournaledStorageLocation> locations = entry.getValue();
+        try {
+            indexManager.withEachIndex(new VoidIndexAction() {
+                @Override
+                public void perform(final EventIndexSearcher searcher) throws IOException {
+                    try {
+                        logger.debug("Running {} against {}", query, searcher);
                         
-                        if ( retrievalCount.get() <= 0 ) {
-                            break;
-                        }
+                        final long indexStart = System.nanoTime();
+                        final SearchResult searchResult = searcher.search(query);
+                        final long indexMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - indexStart);
+                        logger.debug("{} has {} hits against {} over {} files", query, searchResult.getTotalCount(), searcher, searchResult.getLocations().size());
                         
-                        try (final JournalReader reader = new StandardJournalReader(journalFile);
-                             final TocReader tocReader = new StandardTocReader(QueryUtils.getTocFile(journalFile))) {
-                            
-                            for ( final JournaledStorageLocation location : locations ) {
-                                final long blockOffset = tocReader.getBlockOffset(location.getBlockIndex());
-                                final ProvenanceEventRecord event = reader.getEvent(blockOffset, location.getEventId());
-                                matchingRecords.add(new JournaledProvenanceEvent(event, location));
-                                
-                                final int recordsLeft = retrievalCount.decrementAndGet();
-                                if ( recordsLeft <= 0 ) {
-                                    break;
-                                }
-                            }
-                        }
+                        final long retrievalStart = System.nanoTime();
+                        final List<StoredProvenanceEvent> matchingRecords = getEvents(searchResult.getLocations(), retrievalCount);
+                        final long retrievalMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - retrievalStart);
+                        
+                        logger.debug("Finished executing {} against {}; found {} total matches, retrieved {} of them; index search took {} millis, record retrieval took {} millis", 
+                                query, searcher, searchResult.getTotalCount(), matchingRecords.size(), indexMillis, retrievalMillis);
+                        submission.getResult().update(matchingRecords, searchResult.getTotalCount());
+                    } catch (final Throwable t) {
+                        submission.getResult().setError("Failed to execute query " + query + " against " + searcher + " due to " + t);
+                        throw t;
                     }
-                    
-                    logger.debug("Finished executing {} against {}", query, partition);
-                    submission.getResult().update(matchingRecords, searchResult.getTotalCount());
-                } catch (final Exception e) {
-                    submission.getResult().setError("Failed to query " + partition + " due to " + e.toString());
-                    throw e;
                 }
+            }, true);
+        } catch (final IOException ioe) {
+            // only set the error here if it's not already set because we have the least amount of information here
+            if ( submission.getResult().getError() == null ) {
+                submission.getResult().setError("Failed to execute query " + query + " due to " + ioe);
             }
-        }, true);
+        }
         
         return submission;
     }
 
+    
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private List<StoredProvenanceEvent> getEvents(final List<JournaledStorageLocation> allLocations, final AtomicInteger retrievalCount) throws IOException {
+        final List<StoredProvenanceEvent> matchingRecords = new ArrayList<>();
+        final Map<File, List<JournaledStorageLocation>> locationMap = QueryUtils.orderLocations((List) allLocations, config);
+        for ( final Map.Entry<File, List<JournaledStorageLocation>> entry : locationMap.entrySet() ) {
+            final File journalFile = entry.getKey();
+            final List<JournaledStorageLocation> locations = entry.getValue();
+            
+            if ( retrievalCount.get() <= 0 ) {
+                break;
+            }
+            
+            try (final JournalReader reader = new StandardJournalReader(journalFile);
+                 final TocReader tocReader = new StandardTocReader(QueryUtils.getTocFile(journalFile))) {
+                
+                for ( final JournaledStorageLocation location : locations ) {
+                    final long blockOffset = tocReader.getBlockOffset(location.getBlockIndex());
+                    final ProvenanceEventRecord event = reader.getEvent(blockOffset, location.getEventId());
+                    matchingRecords.add(new JournaledProvenanceEvent(event, location));
+                    
+                    final int recordsLeft = retrievalCount.decrementAndGet();
+                    if ( recordsLeft <= 0 ) {
+                        break;
+                    }
+                }
+            }
+        }
+        
+        return matchingRecords;
+    }
+    
     @Override
     public QuerySubmission retrieveQuerySubmission(final String queryIdentifier) {
         return querySubmissionMap.get(queryIdentifier);
     }
+    
+    @Override
+    public ComputeLineageSubmission retrieveLineageSubmission(final String lineageIdentifier) {
+        return lineageSubmissionMap.get(lineageIdentifier);
+    }
+    
+    @Override
+    public ComputeLineageSubmission submitLineageComputation(final String flowFileUuid) {
+        return submitLineageComputation(Collections.singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
+    }
+    
+    private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final long startTimestamp, final long endTimestamp) {
+        final AsyncLineageSubmission lineageSubmission = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, indexManager.getNumberOfIndices());
+
+        final AtomicInteger retrievalCount = new AtomicInteger(2000);
+        final Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    indexManager.withEachIndex(new VoidIndexAction() {
+                        @Override
+                        public void perform(EventIndexSearcher searcher) throws IOException {
+                            logger.debug("Obtaining lineage events for FlowFile UUIDs {} for {}", flowFileUuids, searcher);
+                            final long startNanos = System.nanoTime();
+                            
+                            final List<JournaledStorageLocation> locations = searcher.getEventsForFlowFiles(flowFileUuids, startTimestamp, endTimestamp);
+                            final List<StoredProvenanceEvent> matchingRecords = getEvents(locations, retrievalCount);
+                            lineageSubmission.getResult().update(matchingRecords);
+                            
+                            final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                            logger.debug("Finished querying for lineage events; found {} events in {} millis", matchingRecords.size(), millis);
+                        }
+                    });
+                } catch (final IOException ioe) {
+                    lineageSubmission.getResult().setError("Failed to calculate FlowFile Lineage due to " + ioe);
+                    logger.error("Failed to calculate FlowFile Lineage due to {}", ioe.toString());
+                    if ( logger.isDebugEnabled() ) {
+                        logger.error("", ioe);
+                    }
+                }
+            }
+        };
+        
+        executor.submit(runnable);
+        lineageSubmissionMap.putIfAbsent(lineageSubmission.getLineageIdentifier(), lineageSubmission);
+        return lineageSubmission;
+    }
+
+    
+    @Override
+    public ComputeLineageSubmission submitExpandChildren(final ProvenanceEventRepository eventRepo, final long eventId) {
+        final Set<String> flowFileUuids = Collections.synchronizedSet(new HashSet<String>());
+        final AsyncLineageSubmission lineageSubmission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, flowFileUuids, indexManager.getNumberOfIndices());
+
+        final Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    logger.debug("Obtaining event with id {} in order to expand children", eventId);
+                    final StoredProvenanceEvent event = eventRepo.getEvent(eventId);
+                    if ( event == null ) {
+                        lineageSubmission.getResult().setError("Cannot expand children of event with ID " + eventId + " because that event cannot be found");
+                        logger.warn("Cannot expand children of event with ID {} because that event cannot be found", eventId);
+                        return;
+                    }
+                    
+                    logger.debug("Found event with id {}; searching for children", eventId);
+                    
+                    switch (event.getEventType()) {
+                        case CLONE:
+                        case FORK:
+                        case JOIN:
+                        case REPLAY:
+                            break;
+                        default:
+                            logger.warn("Cannot expand children of event with ID {} because event type is {}", eventId, event.getEventType());
+                            lineageSubmission.getResult().setError("Cannot expand children of event with ID " + eventId + 
+                                    " because that event is of type " + event.getEventType() + 
+                                    ", and that type does not support expansion of children");
+                            return;
+                    }
+                    
+                    final List<String> childUuids = event.getChildUuids();
+                    flowFileUuids.addAll(childUuids);
+                    
+                    final AtomicInteger retrievalCount = new AtomicInteger(100);
+                    indexManager.withEachIndex(new VoidIndexAction() {
+                        @Override
+                        public void perform(EventIndexSearcher searcher) throws IOException {
+                            final long startNanos = System.nanoTime();
+                            logger.debug("Finding children of event with id {} using {}", eventId, searcher);
+                            
+                            final List<JournaledStorageLocation> locations = searcher.getEventsForFlowFiles(flowFileUuids, event.getEventTime(), Long.MAX_VALUE);
+                            final List<StoredProvenanceEvent> matchingRecords = getEvents(locations, retrievalCount);
+                            lineageSubmission.getResult().update(matchingRecords);
+                            
+                            final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                            logger.debug("Found {} children of event {} in {} millis", matchingRecords.size(), eventId, millis);
+                        }
+                    });
+                } catch (final IOException ioe) {
+                    
+                }
+            }
+        };
+        
+        executor.submit(runnable);
+        lineageSubmissionMap.putIfAbsent(lineageSubmission.getLineageIdentifier(), lineageSubmission);
+        return lineageSubmission;
+    }
+    
+    @Override
+    public ComputeLineageSubmission submitExpandParents(final ProvenanceEventRepository eventRepo, final long eventId) {
+        final Set<String> flowFileUuids = Collections.synchronizedSet(new HashSet<String>());
+        final AsyncLineageSubmission lineageSubmission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, flowFileUuids, indexManager.getNumberOfIndices());
+
+        final Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    logger.debug("Obtaining event with id {} in order to expand children", eventId);
+                    final StoredProvenanceEvent event = eventRepo.getEvent(eventId);
+                    if ( event == null ) {
+                        logger.warn("Cannot expand children of event with ID {} because that event cannot be found", eventId);
+                        lineageSubmission.getResult().setError("Cannot expand children of event with ID " + eventId + " because that event cannot be found");
+                        return;
+                    }
+                    
+                    logger.debug("Found event with id {}; searching for children", eventId);
+                    
+                    switch (event.getEventType()) {
+                        case CLONE:
+                        case FORK:
+                        case JOIN:
+                        case REPLAY:
+                            break;
+                        default:
+                            logger.warn("Cannot expand parents of event with ID {} because event type is {}", eventId, event.getEventType());
+                            lineageSubmission.getResult().setError("Cannot expand parents of event with ID " + eventId + 
+                                    " because that event is of type " + event.getEventType() + 
+                                    ", and that type does not support expansion of children");
+                            return;
+                    }
+                    
+                    final List<String> parentUuids = event.getParentUuids();
+                    flowFileUuids.addAll(parentUuids);
+                    
+                    final AtomicInteger retrievalCount = new AtomicInteger(100);
+                    indexManager.withEachIndex(new VoidIndexAction() {
+                        @Override
+                        public void perform(EventIndexSearcher searcher) throws IOException {
+                            final long startNanos = System.nanoTime();
+                            logger.debug("Finding parents of event with id {} using {}", eventId, searcher);
+                            
+                            final List<JournaledStorageLocation> locations = searcher.getEventsForFlowFiles(flowFileUuids, event.getLineageStartDate(), event.getEventTime());
+                            final List<StoredProvenanceEvent> matchingRecords = getEvents(locations, retrievalCount);
+                            lineageSubmission.getResult().update(matchingRecords);
+                            
+                            final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                            logger.debug("Found {} parents of event {} in {} millis", matchingRecords.size(), eventId, millis);
+                        }
+                    });
+                } catch (final IOException ioe) {
+                    
+                }
+            }
+        };
+        
+        executor.submit(runnable);
+        lineageSubmissionMap.putIfAbsent(lineageSubmission.getLineageIdentifier(), lineageSubmission);
+        return lineageSubmission;
+    }
+    
+    
+    @Override
+    public void close() throws IOException {
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
index a6a487b..fc9fb46 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
@@ -18,8 +18,11 @@ package org.apache.nifi.provenance.journaling.tasks;
 
 import java.io.EOFException;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
@@ -34,7 +37,10 @@ import org.apache.nifi.provenance.journaling.toc.TocWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class CompressionTask implements Runnable {
+/**
+ * Compresses a journal file and returns the new size of the journal
+ */
+public class CompressionTask implements Callable<Long> {
     public static final String FILE_EXTENSION = ".compress";
     
     private static final Logger logger = LoggerFactory.getLogger(CompressionTask.class);
@@ -56,7 +62,10 @@ public class CompressionTask implements Runnable {
         long blockOffset = tocReader.getBlockOffset(blockIndex);
         tocWriter.addBlockOffset(blockOffset);
         long nextBlockOffset = tocReader.getBlockOffset(blockIndex + 1);
-        
+
+        // we write the events one at a time here so that we can ensure that when the block
+        // changes we are able to insert a new block into the TOC, as the blocks have to contain
+        // the same number of events, since the index just knows about the block index.
         try {
             while ((event = reader.nextEvent()) != null) {
                 // Check if we've gone beyond the offset of the next block. If so, write
@@ -97,7 +106,7 @@ public class CompressionTask implements Runnable {
             }
         }
         
-        return false;
+        return !file.exists();
     }
     
     /**
@@ -125,11 +134,24 @@ public class CompressionTask implements Runnable {
     }
     
     @Override
-    public void run() {
+    public Long call() {
+        final long startNanos = System.nanoTime();
+        final long preCompressionSize = journalFile.length();
+        
         try {
             final File compressedFile = new File(journalFile.getParentFile(), journalFile.getName() + FILE_EXTENSION);
             final File compressedTocFile = new File(tocFile.getParentFile(), tocFile.getName() + FILE_EXTENSION);
 
+            if ( compressedFile.exists() && !compressedFile.delete() ) {
+                logger.error("Compressed file {} already exists and could not remove it; compression task failed", compressedFile);
+                return preCompressionSize;
+            }
+            
+            if ( compressedTocFile.exists() && !compressedTocFile.delete() ) {
+                logger.error("Compressed TOC file {} already exists and could not remove it; compression task failed", compressedTocFile);
+                return preCompressionSize;
+            }
+            
             try (final JournalReader journalReader = new StandardJournalReader(journalFile);
                 final JournalWriter compressedWriter = new StandardJournalWriter(journalId, compressedFile, true, new StandardEventSerializer());
                 final TocReader tocReader = new StandardTocReader(tocFile);
@@ -137,14 +159,19 @@ public class CompressionTask implements Runnable {
                 
                 compress(journalReader, compressedWriter, tocReader, compressedTocWriter);
                 compressedWriter.sync();
+            } catch (final FileNotFoundException fnfe) {
+                logger.info("Failed to compress Journal File {} because it has already been removed", journalFile);
+                return 0L;
             }
-
+            
+            final long postCompressionSize = compressedFile.length();
+            
             final boolean deletedJournal = delete(journalFile);
             if ( !deletedJournal ) {
                 delete(compressedFile);
                 delete(compressedTocFile);
                 logger.error("Failed to remove Journal file {}; considering compression task a failure", journalFile);
-                return;
+                return preCompressionSize;
             }
             
             final boolean deletedToc = delete(tocFile);
@@ -152,7 +179,7 @@ public class CompressionTask implements Runnable {
                 delete(compressedFile);
                 delete(compressedTocFile);
                 logger.error("Failed to remove TOC file for {}; considering compression task a failure", journalFile);
-                return;
+                return preCompressionSize;
             }
             
             final boolean renamedJournal = rename(compressedFile, journalFile);
@@ -165,12 +192,18 @@ public class CompressionTask implements Runnable {
                 logger.error("Failed to rename {} to {}; this journal file may be inaccessible until it is renamed", compressedTocFile, tocFile);
             }
             
-            logger.info("Successfully compressed Journal File {}");
+            final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            final double percent = (postCompressionSize / preCompressionSize) * 100D;
+            final String pct = String.format("%.2f", percent);
+            logger.info("Successfully compressed Journal File {} in {} millis; size changed from {} bytes to {} bytes ({}% of original size)", journalFile, millis, preCompressionSize, postCompressionSize, pct);
+            return postCompressionSize;
         } catch (final IOException ioe) {
             logger.error("Failed to compress Journal File {} due to {}", journalFile, ioe.toString());
             if ( logger.isDebugEnabled() ) {
                 logger.error("", ioe);
             }
+            
+            return preCompressionSize;
         }
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java
index 995acf9..ae4635f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java
@@ -83,6 +83,14 @@ public class StandardTocReader implements TocReader {
     }
 
     @Override
+    public long getLastBlockOffset() {
+        if ( offsets.length == 0 ) {
+            return 0L;
+        }
+        return offsets[offsets.length - 1];
+    }
+    
+    @Override
     public void close() throws IOException {
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
index fea6057..9ee07e0 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
@@ -16,12 +16,17 @@
  */
 package org.apache.nifi.provenance.journaling.toc;
 
+import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.file.FileAlreadyExistsException;
 
 /**
@@ -51,7 +56,24 @@ public class StandardTocWriter implements TocWriter {
      */
     public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException {
         if ( file.exists() ) {
-            throw new FileAlreadyExistsException(file.getAbsolutePath());
+            // Check if the header actually exists. If so, throw FileAlreadyExistsException
+            // If no data is in the file, we will just overwrite it.
+            try (final InputStream fis = new FileInputStream(file);
+                 final InputStream bis = new BufferedInputStream(fis);
+                 final DataInputStream dis = new DataInputStream(bis)) {
+                dis.read();
+                dis.read();
+
+                // we always add the first offset when the writer is created so we allow this to exist.
+                dis.readLong();
+                final int nextByte = dis.read();
+                
+                if ( nextByte > -1 ) {
+                    throw new FileAlreadyExistsException(file.getAbsolutePath());
+                }
+            } catch (final EOFException eof) {
+                // no real data. overwrite file.
+            }
         }
         
         if ( !file.getParentFile().exists() && !file.getParentFile().mkdirs() ) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java
index eca664e..c3f9df5 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java
@@ -33,13 +33,13 @@ public class TocJournalReader implements Closeable {
     
     private final String containerName;
     private final String sectionName;
-    private final String journalId;
+    private final Long journalId;
     
     private int blockIndex;
     private long nextBlockOffset;
     
     
-    public TocJournalReader(final String containerName, final String sectionName, final String journalId, final File journalFile) throws IOException {
+    public TocJournalReader(final String containerName, final String sectionName, final Long journalId, final File journalFile) throws IOException {
         this.containerName = containerName;
         this.sectionName = sectionName;
         this.journalId = journalId;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java
index 9f6a264..18d7189 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java
@@ -43,4 +43,9 @@ public interface TocReader extends Closeable {
      */
     long getBlockOffset(int blockIndex);
     
+    /**
+     * Returns the byte offset into the Journal File of the last Block in the given index
+     * @return
+     */
+    long getLastBlockOffset();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
index a547a8a..30e100a 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
@@ -23,14 +23,23 @@ import static org.junit.Assert.assertNull;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.provenance.StoredProvenanceEvent;
 import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode;
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.QueryResult;
 import org.apache.nifi.provenance.search.QuerySubmission;
@@ -93,6 +102,190 @@ public class TestJournalingProvenanceRepository {
     }
     
     
+    @Test
+    public void testStoreRestartAndRetrieve() throws IOException {
+        final JournalingRepositoryConfig config = new JournalingRepositoryConfig();
+        final Map<String, File> containers = new HashMap<>();
+        containers.put("container1", new File("target/" + UUID.randomUUID().toString()));
+        containers.put("container2", new File("target/" + UUID.randomUUID().toString()));
+        config.setContainers(containers);
+        config.setPartitionCount(3);
+        
+        try {
+            try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+                repo.initialize(null);
+                final Map<String, String> attributes = new HashMap<>();
+                
+                for (int i=0; i < 10; i++) {
+                    attributes.put("i", String.valueOf(i));
+                    repo.registerEvent(TestUtil.generateEvent(i, attributes));
+                }
+                
+                assertEquals(10L, repo.getMaxEventId().longValue());
+            }
+    
+            try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+                repo.initialize(null);
+
+                assertEquals(10L, repo.getMaxEventId().longValue());
+
+                // retrieve records one at a time.
+                for (int i=0; i < 10; i++) {
+                    final StoredProvenanceEvent event = repo.getEvent(i);
+                    assertNotNull(event);
+                    assertEquals((long) i, event.getEventId());
+                    assertEquals("00000000-0000-0000-0000-00000000000" + i, event.getFlowFileUuid());
+                }
+                
+                final List<StoredProvenanceEvent> events = repo.getEvents(0, 1000);
+                assertNotNull(events);
+                assertEquals(10, events.size());
+                for (int i=0; i < 10; i++) {
+                    final StoredProvenanceEvent event = events.get(i);
+                    assertNotNull(event);
+                    assertEquals((long) i, event.getEventId());
+                    assertEquals("00000000-0000-0000-0000-00000000000" + i, event.getFlowFileUuid());
+                }
+            }
+        } finally {
+            for ( final File file : containers.values() ) {
+                if ( file.exists() ) {
+                    FileUtils.deleteFile(file, true);
+                }
+            }
+        }
+    }
+    
+    
+    @Test
+    public void testStoreRestartRetrieveAndExpireOnTime() throws IOException, InterruptedException {
+        final JournalingRepositoryConfig config = new JournalingRepositoryConfig();
+        final Map<String, File> containers = new HashMap<>();
+        containers.put("container1", new File("target/" + UUID.randomUUID().toString()));
+        containers.put("container2", new File("target/" + UUID.randomUUID().toString()));
+        config.setContainers(containers);
+        config.setPartitionCount(3);
+        
+        try {
+            try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+                repo.initialize(null);
+                final Map<String, String> attributes = new HashMap<>();
+                
+                for (int i=0; i < 10; i++) {
+                    attributes.put("i", String.valueOf(i));
+                    repo.registerEvent(TestUtil.generateEvent(i, attributes));
+                }
+                
+                assertEquals(10L, repo.getMaxEventId().longValue());
+            }
+    
+            config.setExpirationFrequency(1, TimeUnit.SECONDS);
+            try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+                repo.initialize(null);
+
+                assertEquals(10L, repo.getMaxEventId().longValue());
+
+                // retrieve records one at a time.
+                for (int i=0; i < 10; i++) {
+                    final StoredProvenanceEvent event = repo.getEvent(i);
+                    assertNotNull(event);
+                    assertEquals((long) i, event.getEventId());
+                    assertEquals("00000000-0000-0000-0000-00000000000" + i, event.getFlowFileUuid());
+                }
+                
+                final List<StoredProvenanceEvent> events = repo.getEvents(0, 1000);
+                assertNotNull(events);
+                assertEquals(10, events.size());
+                for (int i=0; i < 10; i++) {
+                    final StoredProvenanceEvent event = events.get(i);
+                    assertNotNull(event);
+                    assertEquals((long) i, event.getEventId());
+                    assertEquals("00000000-0000-0000-0000-00000000000" + i, event.getFlowFileUuid());
+                }
+                
+                // wait a bit for the events to be expired
+                TimeUnit.SECONDS.sleep(2L);
+                
+                // retrieve records one at a time.
+                for (int i=0; i < 10; i++) {
+                    final StoredProvenanceEvent event = repo.getEvent(i);
+                    assertNull("Event " + i + " still exists", event);
+                }
+                
+                final List<StoredProvenanceEvent> allEvents = repo.getEvents(0, 1000);
+                assertNotNull(allEvents);
+                assertEquals(0, allEvents.size());
+            }
+        } finally {
+            for ( final File file : containers.values() ) {
+                if ( file.exists() ) {
+                    FileUtils.deleteFile(file, true);
+                }
+            }
+        }
+    }
+    
+    
+    @Test
+    public void testExpireOnSize() throws IOException, InterruptedException {
+        final JournalingRepositoryConfig config = new JournalingRepositoryConfig();
+        final Map<String, File> containers = new HashMap<>();
+        containers.put("container1", new File("target/" + UUID.randomUUID().toString()));
+        containers.put("container2", new File("target/" + UUID.randomUUID().toString()));
+        config.setContainers(containers);
+        config.setPartitionCount(3);
+        config.setMaxStorageCapacity(1024L * 50);
+        config.setEventExpiration(2, TimeUnit.SECONDS);
+        config.setExpirationFrequency(1, TimeUnit.SECONDS);
+        config.setJournalRolloverPeriod(1, TimeUnit.SECONDS);
+        config.setCompressOnRollover(false);
+        
+        try {
+            try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+                repo.initialize(null);
+                final Map<String, String> attributes = new HashMap<>();
+                
+                final int numEventsToInsert = 1000;
+                for (int i=0; i < numEventsToInsert; i++) {
+                    attributes.put("i", String.valueOf(i));
+                    repo.registerEvent(TestUtil.generateEvent(i, attributes));
+                }
+
+                final List<StoredProvenanceEvent> eventsBeforeExpire = repo.getEvents(0, numEventsToInsert * 2);
+                assertNotNull(eventsBeforeExpire);
+                assertEquals(numEventsToInsert, eventsBeforeExpire.size());
+                
+                // wait a bit for expiration to occur
+                TimeUnit.SECONDS.sleep(3L);
+                
+                // generate an event for each partition to force a rollover of the journals
+                for (int i=0; i < config.getPartitionCount(); i++) {
+                    repo.registerEvent(TestUtil.generateEvent(100000L));
+                }
+
+                TimeUnit.SECONDS.sleep(1L);
+  
+                // retrieve records one at a time.
+                for (int i=0; i < numEventsToInsert; i++) {
+                    final StoredProvenanceEvent event = repo.getEvent(i);
+                    assertNull("Event " + i + " still exists", event);
+                }
+                
+                final List<StoredProvenanceEvent> eventsAfterExpire = repo.getEvents(0, numEventsToInsert * 2);
+                assertNotNull(eventsAfterExpire);
+                assertEquals(3, eventsAfterExpire.size());
+            }
+        } finally {
+            for ( final File file : containers.values() ) {
+                if ( file.exists() ) {
+                    FileUtils.deleteFile(file, true);
+                }
+            }
+        }
+    }
+    
+    
+    
     @Test(timeout=10000000)
     public void testSearchByUUID() throws IOException, InterruptedException {
         final JournalingRepositoryConfig config = new JournalingRepositoryConfig();
@@ -141,4 +334,98 @@ public class TestJournalingProvenanceRepository {
         }
     }
     
+    
+    @Test(timeout=10000)
+    public void testReceiveDropLineage() throws IOException, InterruptedException {
+        final JournalingRepositoryConfig config = new JournalingRepositoryConfig();
+        final Map<String, File> containers = new HashMap<>();
+        containers.put("container1", new File("target/" + UUID.randomUUID().toString()));
+        containers.put("container2", new File("target/" + UUID.randomUUID().toString()));
+        config.setContainers(containers);
+        
+        config.setPartitionCount(3);
+        config.setSearchableFields(Arrays.asList(new SearchableField[] {
+                SearchableFields.FlowFileUUID
+        }));
+        
+        try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+            repo.initialize(null);
+        
+            final String uuid = "00000000-0000-0000-0000-000000000001";
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put("abc", "xyz");
+            attributes.put("uuid", uuid);
+            attributes.put("filename", "file-" + uuid);
+    
+            final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder()
+                .setEventType(ProvenanceEventType.RECEIVE)
+                .setFlowFileUUID(uuid)
+                .setComponentType("Unit Test")
+                .setComponentId(UUID.randomUUID().toString())
+                .setEventTime(System.currentTimeMillis())
+                .setFlowFileEntryDate(System.currentTimeMillis() - 1000L)
+                .setLineageStartDate(System.currentTimeMillis() - 2000L)
+                .setCurrentContentClaim(null, null, null, null, 0L)
+                .setAttributes(null, attributes == null ? Collections.<String, String>emptyMap() : attributes);
+
+            builder.setTransitUri("nifi://unit-test");
+            attributes.put("uuid", uuid);
+            builder.setComponentId("1234");
+            builder.setComponentType("dummy processor");
+    
+            // Add RECEIVE Event
+            repo.registerEvent(builder.build());
+    
+            builder.setEventTime(System.currentTimeMillis() + 1);
+            builder.setEventType(ProvenanceEventType.DROP);
+            builder.setTransitUri(null);
+            
+            // Add DROP event
+            repo.registerEvent(builder.build());
+            
+            // register unrelated even to make sure we don't get this one.
+            builder.setFlowFileUUID("00000000-0000-0000-0000-000000000002");
+            repo.registerEvent(builder.build());
+            
+            final ComputeLineageSubmission submission = repo.submitLineageComputation(uuid);
+            assertNotNull(submission);
+            
+            final ComputeLineageResult result = submission.getResult();
+            while ( !result.isFinished() ) {
+                Thread.sleep(50L);
+            }
+            
+            assertNull(result.getError());
+            
+            final List<LineageNode> nodes = result.getNodes();
+            assertEquals(3, nodes.size());  // RECEIVE, FlowFile node, DROP
+            
+            int receiveCount = 0;
+            int dropCount = 0;
+            int flowFileNodeCount = 0;
+            for ( final LineageNode node : nodes ) {
+                assertEquals(uuid, node.getFlowFileUuid());
+                
+                if ( LineageNodeType.PROVENANCE_EVENT_NODE.equals(node.getNodeType()) ) {
+                    final ProvenanceEventLineageNode eventNode = (ProvenanceEventLineageNode) node;
+                    if ( eventNode.getEventType() == ProvenanceEventType.RECEIVE ) {
+                        receiveCount++;
+                    } else if ( eventNode.getEventType() == ProvenanceEventType.DROP ) {
+                        dropCount++;
+                    }
+                } else {
+                    flowFileNodeCount++;
+                }
+            }
+            
+            assertEquals(1, receiveCount);
+            assertEquals(1, dropCount);
+            assertEquals(1, flowFileNodeCount);
+        } finally {
+            for ( final File file : containers.values() ) {
+                FileUtils.deleteFile(file, true);
+            }
+        }
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java
index dfaeb1a..e611aaa 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java
@@ -35,6 +35,7 @@ import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.SearchTerms;
 import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.util.file.FileUtils;
 import org.junit.Test;
 
 public class TestEventIndexWriter {
@@ -51,11 +52,9 @@ public class TestEventIndexWriter {
         
         final File indexDir = new File("target/" + UUID.randomUUID().toString());
         
-        final File journalFile = new File("target/" + UUID.randomUUID().toString());
         try (final LuceneIndexWriter indexWriter = new LuceneIndexWriter(indexDir, config)) {
-            
             final ProvenanceEventRecord event = TestUtil.generateEvent(23L);
-            final JournaledStorageLocation location = new JournaledStorageLocation("container", "section", "journalId", 2, 23L);
+            final JournaledStorageLocation location = new JournaledStorageLocation("container", "section", 1L, 2, 23L);
             final JournaledProvenanceEvent storedEvent = new JournaledProvenanceEvent(event, location);
             indexWriter.index(Collections.singleton(storedEvent));
             
@@ -72,12 +71,12 @@ public class TestEventIndexWriter {
                 assertNotNull(found);
                 assertEquals("container", found.getContainerName());
                 assertEquals("section", found.getSectionName());
-                assertEquals("journalId", found.getJournalId());
+                assertEquals(1L, found.getJournalId().longValue());
                 assertEquals(2, found.getBlockIndex());
                 assertEquals(23L, found.getEventId());
             }
         } finally {
-            journalFile.delete();
+            FileUtils.deleteFile(indexDir, true);
         }
         
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
index d5eab8e..43efa7e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
@@ -17,11 +17,13 @@
 package org.apache.nifi.provenance.journaling.journals;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.util.Collections;
 import java.util.UUID;
@@ -32,11 +34,47 @@ import org.apache.nifi.provenance.journaling.TestUtil;
 import org.apache.nifi.provenance.journaling.io.StandardEventDeserializer;
 import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
 import org.apache.nifi.remote.io.CompressionInputStream;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestStandardJournalWriter {
 
     @Test
+    public void testOverwriteEmptyFile() throws IOException {
+        final File journalFile = new File("target/" + UUID.randomUUID().toString());
+        try {
+            assertTrue( journalFile.createNewFile() );
+            
+            try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) {
+                
+            }
+        } finally {
+            FileUtils.deleteFile(journalFile, false);
+        }
+    }
+    
+    @Test
+    public void testDoNotOverwriteNonEmptyFile() throws IOException {
+        final File journalFile = new File("target/" + UUID.randomUUID().toString());
+        try {
+            assertTrue( journalFile.createNewFile() );
+            
+            try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) {
+                writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L);
+            }
+            
+            try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) {
+                Assert.fail("StandardJournalWriter attempted to overwrite existing file");
+            } catch (final FileAlreadyExistsException faee) {
+                // expected
+            }
+        } finally {
+            FileUtils.deleteFile(journalFile, false);
+        }
+    }
+    
+    @Test
     public void testOneBlockOneRecordWriteCompressed() throws IOException {
         final File journalFile = new File("target/" + UUID.randomUUID().toString());
         

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocWriter.java
new file mode 100644
index 0000000..3cddb5e
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.toc;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.UUID;
+
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestStandardTocWriter {
+    @Test
+    public void testOverwriteEmptyFile() throws IOException {
+        final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc");
+        try {
+            assertTrue( tocFile.createNewFile() );
+            
+            try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
+            }
+        } finally {
+            FileUtils.deleteFile(tocFile, false);
+        }
+    }
+    
+    @Test
+    public void testDoNotOverwriteNonEmptyFile() throws IOException {
+        final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc");
+        try {
+            assertTrue( tocFile.createNewFile() );
+            
+            try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
+                writer.addBlockOffset(0L);
+                writer.addBlockOffset(34L);
+            }
+            
+            try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
+                Assert.fail("StandardTocWriter attempted to overwrite existing file");
+            } catch (final FileAlreadyExistsException faee) {
+                // expected
+            }
+        } finally {
+            FileUtils.deleteFile(tocFile, false);
+        }
+    }
+}


Mime
View raw message