nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [02/12] incubator-nifi git commit: implemented ability to persist and recover records
Date Fri, 27 Feb 2015 20:47:29 GMT
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventSerializer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventSerializer.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventSerializer.java
new file mode 100644
index 0000000..fae427e
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventSerializer.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+public class StandardEventSerializer implements Serializer {
+    public static final String CODEC_NAME = "StandardProvCodec";
+    
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+    
+    @Override
+    public String getCodecName() {
+        return CODEC_NAME;
+    }
+    
+    @Override
+    public void serialize(final ProvenanceEventRecord event, final DataOutputStream out) throws IOException {
+        final ProvenanceEventType recordType = event.getEventType();
+
+        out.writeUTF(event.getEventType().name());
+        out.writeLong(event.getEventTime());
+        out.writeLong(event.getFlowFileEntryDate());
+        out.writeLong(event.getEventDuration());
+
+        writeUUIDs(out, event.getLineageIdentifiers());
+        out.writeLong(event.getLineageStartDate());
+
+        writeNullableString(out, event.getComponentId());
+        writeNullableString(out, event.getComponentType());
+        writeUUID(out, event.getFlowFileUuid());
+        writeNullableString(out, event.getDetails());
+
+        // Write FlowFile attributes
+        final Map<String, String> attrs = event.getPreviousAttributes();
+        out.writeInt(attrs.size());
+        for (final Map.Entry<String, String> entry : attrs.entrySet()) {
+            writeLongString(out, entry.getKey());
+            writeLongString(out, entry.getValue());
+        }
+
+        final Map<String, String> attrUpdates = event.getUpdatedAttributes();
+        out.writeInt(attrUpdates.size());
+        for (final Map.Entry<String, String> entry : attrUpdates.entrySet()) {
+            writeLongString(out, entry.getKey());
+            writeLongNullableString(out, entry.getValue());
+        }
+
+        // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'. 
+        if (event.getContentClaimSection() != null && event.getContentClaimContainer() != null && event.getContentClaimIdentifier() != null) {
+            out.writeBoolean(true);
+            out.writeUTF(event.getContentClaimContainer());
+            out.writeUTF(event.getContentClaimSection());
+            out.writeUTF(event.getContentClaimIdentifier());
+            if (event.getContentClaimOffset() == null) {
+                out.writeLong(0L);
+            } else {
+                out.writeLong(event.getContentClaimOffset());
+            }
+            out.writeLong(event.getFileSize());
+        } else {
+            out.writeBoolean(false);
+        }
+
+        // If Previous Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
+        if (event.getPreviousContentClaimSection() != null && event.getPreviousContentClaimContainer() != null && event.getPreviousContentClaimIdentifier() != null) {
+            out.writeBoolean(true);
+            out.writeUTF(event.getPreviousContentClaimContainer());
+            out.writeUTF(event.getPreviousContentClaimSection());
+            out.writeUTF(event.getPreviousContentClaimIdentifier());
+            if (event.getPreviousContentClaimOffset() == null) {
+                out.writeLong(0L);
+            } else {
+                out.writeLong(event.getPreviousContentClaimOffset());
+            }
+
+            if (event.getPreviousFileSize() == null) {
+                out.writeLong(0L);
+            } else {
+                out.writeLong(event.getPreviousFileSize());
+            }
+        } else {
+            out.writeBoolean(false);
+        }
+
+        // write out the identifier of the destination queue.
+        writeNullableString(out, event.getSourceQueueIdentifier());
+
+        // Write type-specific info
+        if (recordType == ProvenanceEventType.FORK || recordType == ProvenanceEventType.JOIN || recordType == ProvenanceEventType.CLONE || recordType == ProvenanceEventType.REPLAY) {
+            writeUUIDs(out, event.getParentUuids());
+            writeUUIDs(out, event.getChildUuids());
+        } else if (recordType == ProvenanceEventType.RECEIVE) {
+            writeNullableString(out, event.getTransitUri());
+            writeNullableString(out, event.getSourceSystemFlowFileIdentifier());
+        } else if (recordType == ProvenanceEventType.SEND) {
+            writeNullableString(out, event.getTransitUri());
+        } else if (recordType == ProvenanceEventType.ADDINFO) {
+            writeNullableString(out, event.getAlternateIdentifierUri());
+        } else if (recordType == ProvenanceEventType.ROUTE) {
+            writeNullableString(out, event.getRelationship());
+        }
+    }
+
+    private void writeNullableString(final DataOutputStream out, final String toWrite) throws IOException {
+        if (toWrite == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            out.writeUTF(toWrite);
+        }
+    }
+
+    private void writeLongNullableString(final DataOutputStream out, final String toWrite) throws IOException {
+        if (toWrite == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            writeLongString(out, toWrite);
+        }
+    }
+
+    private void writeLongString(final DataOutputStream out, final String value) throws IOException {
+        final byte[] bytes = value.getBytes("UTF-8");
+        out.writeInt(bytes.length);
+        out.write(bytes);
+    }
+    
+    static void writeUUID(final DataOutputStream out, final String uuid) throws IOException {
+        final UUID uuidObj = UUID.fromString(uuid);
+        out.writeLong(uuidObj.getMostSignificantBits());
+        out.writeLong(uuidObj.getLeastSignificantBits());
+    }
+
+    static void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException {
+        if (list == null) {
+            out.writeInt(0);
+        } else {
+            out.writeInt(list.size());
+            for (final String value : list) {
+                writeUUID(out, value);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java
new file mode 100644
index 0000000..535d1dd
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.journals;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public interface JournalReader extends Closeable {
+
+    /**
+     * Retrieve a specific event from the journal, given the offset of the Block and the ID of the event
+     * @param blockOffset
+     * @param eventId
+     * @return
+     * @throws IOException
+     */
+    ProvenanceEventRecord getEvent(long blockOffset, long eventId) throws IOException;
+    
+    /**
+     * Retrieve the next event in the journal, or <code>null</code> if no more events exist
+     * @return
+     * @throws IOException
+     */
+    ProvenanceEventRecord nextEvent() throws IOException;
+
+    /**
+     * Returns the current byte offset into the Journal from which the next event (if any) will be read
+     * @return
+     */
+    long getPosition();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalWriter.java
new file mode 100644
index 0000000..5108f49
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalWriter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.journals;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+/**
+ * Responsible for writing events to an append-only journal, or write-ahead-log. Events are written in "Blocks."
+ * These Blocks are used so that if we are compressing data, we can compress individual Blocks. This allows us
+ * to store a "Block Index" so that we can quickly lookup the start of a Block when reading the data to quickly
+ * obtain the data that we need.
+ */
+public interface JournalWriter extends Closeable {
+    
+    /**
+     * Returns the identifier of this journal. The identifier is unique per 'section' of the repository
+     * @return
+     */
+    long getJournalId();
+    
+    /**
+     * Writes the given events to the journal and assigns the events sequential ID's starting with the
+     * ID given
+     * 
+     * @param records
+     * @param firstRecordId
+     * @return
+     * @throws IOException
+     */
+    void write(Collection<ProvenanceEventRecord> events, long firstEventId) throws IOException;
+    
+    /**
+     * Returns the File that the Journal is writing to
+     */
+    File getJournalFile();
+    
+    /**
+     * Synchronizes changes to the underlying file system
+     * @throws IOException
+     */
+    void sync() throws IOException;
+    
+    /**
+     * Returns the size of the journal
+     * @return
+     */
+    long getSize();
+    
+    /**
+     * Returns the number of events that have been written to this journal
+     * @return
+     */
+    int getEventCount();
+    
+    /**
+     * Returns the amount of time that has elapsed since the point at which the writer was created.
+     * @param timeUnit
+     * @return
+     */
+    long getAge(TimeUnit timeUnit);
+    
+    /**
+     * Marks the end of a Block in the output file. If the previous Block has been finished and no new
+     * Block has been started, this method will return silently without doing anything.
+     * @throws IOException
+     */
+    void finishBlock() throws IOException;
+    
+    /**
+     * Starts a new Block in the output file. If a Block has already been started, this method throws
+     * an IllegalStateException
+     * 
+     * @throws IOException
+     */
+    void beginNewBlock() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
new file mode 100644
index 0000000..82ef39b
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.journals;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.journaling.io.Deserializer;
+import org.apache.nifi.provenance.journaling.io.Deserializers;
+import org.apache.nifi.remote.io.CompressionInputStream;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.MinimumLengthInputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Standard implementation of {@link JournalReader}. This reader reads data that is written
+ * in the format specified by {@link StandardJournalWriter}
+ */
+public class StandardJournalReader implements JournalReader {
+    private static final Logger logger = LoggerFactory.getLogger(StandardJournalReader.class);
+    
+    private final File file;
+    
+    private ByteCountingInputStream compressedStream;
+    private ByteCountingInputStream decompressedStream;
+    
+    private Deserializer deserializer;
+    private int serializationVersion;
+    private boolean compressed;
+    
+    private long lastEventIdRead = -1L;
+    
+    
+    public StandardJournalReader(final File file) throws IOException {
+        this.file = file;
+        resetStreams();
+    }
+    
+    private void resetStreams() throws IOException {
+        final InputStream bufferedIn = new BufferedInputStream(new FileInputStream(file));
+        compressedStream = new ByteCountingInputStream(bufferedIn);
+        final DataInputStream dis = new DataInputStream(compressedStream);
+        final String codecName = dis.readUTF();
+        serializationVersion = dis.readInt();
+        compressed = dis.readBoolean();
+        deserializer = Deserializers.getDeserializer(codecName);
+        
+        resetDecompressedStream();
+    }
+    
+    private void resetDecompressedStream() throws IOException {
+        if ( compressed ) {
+            decompressedStream = new ByteCountingInputStream(new BufferedInputStream(new CompressionInputStream(compressedStream)), compressedStream.getBytesConsumed());
+        } else {
+            decompressedStream = compressedStream;
+        }
+    }
+    
+    @Override
+    public ProvenanceEventRecord nextEvent() throws IOException {
+        return nextEvent(true);
+    }
+    
+    @Override
+    public long getPosition() {
+        return decompressedStream.getBytesConsumed();
+    }
+    
+    private boolean isData(final InputStream in) throws IOException {
+        in.mark(1);
+        final int b = in.read();
+        if ( b < 0 ) {
+            return false;
+        }
+        in.reset();
+        
+        return true;
+    }
+    
+    ProvenanceEventRecord nextEvent(final boolean spanBlocks) throws IOException {
+        boolean isData = isData(decompressedStream);
+        if ( !isData ) {
+            if ( !spanBlocks ) {
+                return null;
+            }
+            
+            // we are allowed to span blocks. We're out of data but if we are compressed, it could
+            // just mean that the block has ended.
+            if ( !compressed ) {
+                return null;
+            }
+            
+            isData = isData(compressedStream);
+            if ( !isData ) {
+                return null;
+            }
+            
+            // There is no data in the compressed InputStream but there is in the underlying stream.
+            // This means we've hit the end of our block. We will create a new CompressionInputStream
+            // so that we can continue reading.
+            resetDecompressedStream();
+        }
+        
+        try {
+            final DataInputStream dis = new DataInputStream(decompressedStream);
+            final int eventLength = dis.readInt();
+            
+            final LimitingInputStream limitingInputStream = new LimitingInputStream(dis, eventLength);
+            final MinimumLengthInputStream minStream = new MinimumLengthInputStream(limitingInputStream, eventLength);
+            final ProvenanceEventRecord event = deserializer.deserialize(new DataInputStream(minStream), serializationVersion);
+            lastEventIdRead = event.getEventId();
+            return event;
+        } catch (final EOFException eof) {
+            logger.warn("{} Found unexpected End-of-File when reading from journal", this); 
+            return null;
+        }
+    }
+    
+    @Override
+    public ProvenanceEventRecord getEvent(final long blockOffset, final long eventId) throws IOException {
+        // If the requested event ID is less than the last event that we read, we need to reset to the beginning
+        // of the file. We do this because we know that the ID's are always increasing, so if we need an ID less
+        // than the previous ID, we have to go backward in the file. We can't do this with streams, so start the
+        // stream over.
+        if ( eventId < lastEventIdRead ) {
+            close();
+            resetStreams();
+        }
+        
+        final long bytesToSkip = blockOffset - compressedStream.getBytesConsumed();
+        if ( bytesToSkip > 0 ) {
+            StreamUtils.skip(compressedStream, bytesToSkip);
+            resetDecompressedStream();
+        }
+        
+        ProvenanceEventRecord event;
+        while ((event = nextEvent()) != null) {
+            if ( event.getEventId() == eventId ) {
+                return event;
+            }
+        }
+        
+        throw new IOException("Could not find event with ID " + eventId);
+    }
+
+    @Override
+    public void close() throws IOException {
+        decompressedStream.close();
+    }
+    
+    @Override
+    public String toString() {
+        return "StandardJournalReader[" + file + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
new file mode 100644
index 0000000..5a289fe
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.journals;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.journaling.io.Serializer;
+import org.apache.nifi.remote.io.CompressionOutputStream;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+
+
+/**
+ * <p>
+ * Standard implementation of {@link JournalWriter}.
+ * </p>
+ * 
+ * <p>
+ * Writes out to a journal file using the format:
+ * 
+ * <pre>
+ * &lt;header&gt;
+ * &lt;begin block 1&gt;
+ * &lt;record 1&gt;
+ * &lt;record 2&gt;
+ * &lt;record 3&gt;
+ * &lt;end block 1&gt;
+ * &lt;begin block 2&gt;
+ * &lt;record 4&gt;
+ * &lt;record 5&gt;
+ * &lt;end block 2&gt;
+ * ...
+ * &lt;begin block N&gt;
+ * &lt;record N&gt;
+ * &lt;end block N&gt;
+ * </pre>
+ * 
+ * Where &lt;header&gt; is defined as:
+ * <pre>
+ *  String: serialization codec name (retrieved from serializer)
+ *      --> 2 bytes for length of string
+ *      --> N bytes for actual serialization codec name
+ *  int: serialization version
+ *  boolean: compressed: 1 -> compressed, 0 -> not compressed
+ * </pre>
+ * 
+ * And &lt;record&gt; is defined as:
+ * <pre>
+ * bytes 0-3: int: record length
+ * bytes 4-11: long: record id
+ * bytes 12-N: serialized event according to the applied {@link Serializer}
+ * </pre>
+ * </p>
+ * 
+ * <p>
+ * The structure of the &lt;begin block&gt; and &lt;end block&gt; element depend on whether or not
+ * compression is enabled. If the journal is not compressed, these elements are 0 bytes.
+ * If the journal is compressed, these are the compression header and compression footer, respectively.
+ * </p>
+ * 
+ */
+public class StandardJournalWriter implements JournalWriter {
+    private final long journalId;
+    private final File journalFile;
+    private final boolean compressed;
+    private final Serializer serializer;
+    private final long creationTime = System.nanoTime();
+    
+    private int eventCount;
+    private boolean blockStarted = false;
+    
+    private final FileOutputStream fos;
+    private ByteCountingOutputStream uncompressedStream;
+    private OutputStream compressedStream;
+    private ByteCountingOutputStream out;
+    
+    
+    public StandardJournalWriter(final long journalId, final File journalFile, final boolean compressed, final Serializer serializer) throws IOException {
+        this.journalId = journalId;
+        this.journalFile = journalFile;
+        this.compressed = compressed;
+        this.serializer = serializer;
+        this.fos = new FileOutputStream(journalFile);
+        
+        uncompressedStream = new ByteCountingOutputStream(fos);
+        writeHeader(uncompressedStream);
+        
+        if (compressed) {
+            compressedStream = new CompressionOutputStream(uncompressedStream);
+        } else {
+            compressedStream = fos;
+        }
+        
+        this.out = new ByteCountingOutputStream(compressedStream, uncompressedStream.getBytesWritten());
+    }
+
+    private void writeHeader(final OutputStream out) throws IOException {
+        final DataOutputStream dos = new DataOutputStream(out);
+        dos.writeUTF(serializer.getCodecName());
+        dos.writeInt(serializer.getVersion());
+        dos.writeBoolean(compressed);
+        dos.flush();
+    }
+    
+    @Override
+    public long getJournalId() {
+        return journalId;
+    }
+    
+    @Override
+    public void close() throws IOException {
+        finishBlock();
+        
+        if ( compressedStream != null ) {
+            compressedStream.flush();
+            compressedStream.close();
+        }
+    }
+
+    @Override
+    public void write(final Collection<ProvenanceEventRecord> events, final long firstEventId) throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final DataOutputStream serializerDos = new DataOutputStream(baos);
+        
+        final BufferedOutputStream bos = new BufferedOutputStream(out);
+        final DataOutputStream outDos = new DataOutputStream(bos);
+        
+        try {
+            long id = firstEventId;
+            for ( final ProvenanceEventRecord event : events ) {
+                serializer.serialize(event, serializerDos);
+                serializerDos.flush();
+                
+                final int recordLength = 8 + baos.size();   // record length is length of ID (8 bytes) plus length of serialized record
+                outDos.writeInt(recordLength);
+                outDos.writeLong(id++);
+                baos.writeTo(outDos);
+                baos.reset();
+                
+                eventCount++;
+            }
+        } finally {
+            outDos.flush();
+        }
+    }
+    
+
+    @Override
+    public File getJournalFile() {
+        return journalFile;
+    }
+
+    @Override
+    public void sync() throws IOException {
+        fos.getFD().sync();
+    }
+
+    @Override
+    public long getSize() {
+        return out.getBytesWritten();
+    }
+
+    @Override
+    public int getEventCount() {
+        return eventCount;
+    }
+
+    @Override
+    public long getAge(final TimeUnit timeUnit) {
+        return timeUnit.convert(System.nanoTime() - creationTime, TimeUnit.NANOSECONDS);
+    }
+
+    @Override
+    public void finishBlock() throws IOException {
+        if ( !blockStarted ) {
+            return;
+        }
+        
+        blockStarted = false;
+        if ( !compressed ) {
+            return;
+        }
+
+        // Calling close() on CompressionOutputStream doesn't close the underlying stream -- it is designed
+        // such that calling close() will write out the Compression footer and become unusable but not
+        // close the underlying stream because the whole point of CompressionOutputStream as opposed to
+        // GZIPOutputStream is that with CompressionOutputStream we can concatenate many together on a single
+        // stream.
+        compressedStream.close();
+    }
+    
+    @Override
+    public void beginNewBlock() throws IOException {
+        if ( blockStarted ) {
+            throw new IllegalStateException("Block is already started");
+        }
+        blockStarted = true;
+        
+        if ( !compressed ) {
+            return;
+        }
+        if ( eventCount == 0 ) {
+            return;
+        }
+        
+        this.compressedStream = new CompressionOutputStream(uncompressedStream);
+        this.out = new ByteCountingOutputStream(compressedStream, uncompressedStream.getBytesWritten());
+    }
+    
+    @Override
+    public String toString() {
+        return "Journal Writer for " + journalFile;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
new file mode 100644
index 0000000..51f84a2
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.partition;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
+import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.journaling.index.EventIndexSearcher;
+import org.apache.nifi.provenance.journaling.index.LuceneIndexSearcher;
+import org.apache.nifi.provenance.journaling.index.LuceneIndexWriter;
+import org.apache.nifi.provenance.journaling.index.QueryUtils;
+import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
+import org.apache.nifi.provenance.journaling.journals.JournalReader;
+import org.apache.nifi.provenance.journaling.journals.JournalWriter;
+import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
+import org.apache.nifi.provenance.journaling.journals.StandardJournalWriter;
+import org.apache.nifi.provenance.journaling.tasks.CompressionTask;
+import org.apache.nifi.provenance.journaling.toc.StandardTocWriter;
+import org.apache.nifi.provenance.journaling.toc.TocJournalReader;
+import org.apache.nifi.provenance.journaling.toc.TocWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalingPartition implements Partition {
+    private static final Logger logger = LoggerFactory.getLogger(JournalingPartition.class);
+    private static final String JOURNAL_FILE_EXTENSION = ".journal";
+    
+    private final String containerName;
+    private final String sectionName;
+    
+    private final File section;
+    private final File journalsDir;
+    private final JournalingRepositoryConfig config;
+    private final ExecutorService executor;
+    private final LuceneIndexWriter indexWriter;
+    
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Lock readLock = rwLock.readLock();
+    private final Lock writeLock = rwLock.writeLock();
+    
+    private JournalWriter journalWriter;
+    private TocWriter tocWriter;
+    private int numEventsAtEndOfLastBlock = 0;
+    private volatile long maxEventId = -1L;
+    private volatile Long earliestEventTime = null;
+    
+    public JournalingPartition(final String containerName, final String sectionName, final File sectionDir, final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException {
+        this.containerName = containerName;
+        this.sectionName = sectionName;
+        this.section = sectionDir;
+        this.journalsDir = new File(section, "journals");
+        this.config = config;
+        this.executor = executor;
+        
+        if (!journalsDir.exists() && !journalsDir.mkdirs()) {
+            throw new IOException("Could not create directory " + section);
+        }
+        
+        if ( journalsDir.exists() && journalsDir.isFile() ) {
+            throw new IOException("Could not create directory " + section + " because a file already exists with this name");
+        }
+        
+        if ( config.isReadOnly() ) {
+            indexWriter = null;
+        } else {
+            final File indexDir = new File(section, "index");
+            indexWriter = new LuceneIndexWriter(indexDir, config);
+        }
+    }
+    
+    
+    public EventIndexSearcher newIndexSearcher() throws IOException {
+        if (config.isReadOnly()) {
+            return new LuceneIndexSearcher(new File(section, "index"));
+        }
+        
+        return indexWriter.newIndexSearcher();
+    }
+    
+    protected JournalWriter getJournalWriter(final long firstEventId) throws IOException {
+        if ( config.isReadOnly() ) {
+            throw new IllegalStateException("Cannot update repository because it is read-only");
+        }
+        
+        if (isRolloverNecessary()) {
+            rollover(firstEventId);
+        }
+        
+        return journalWriter;
+    }
+    
+    @Override
+    public List<JournaledProvenanceEvent> registerEvents(final Collection<ProvenanceEventRecord> events, final long firstEventId) throws IOException {
+        writeLock.lock();
+        try {
+            final JournalWriter writer = getJournalWriter(firstEventId);
+    
+            if ( !events.isEmpty() ) {
+                final int eventsWritten = writer.getEventCount();
+                if ( eventsWritten - numEventsAtEndOfLastBlock > config.getBlockSize() ) {
+                    writer.finishBlock();
+                    tocWriter.addBlockOffset(writer.getSize());
+                    numEventsAtEndOfLastBlock = eventsWritten;
+                    writer.beginNewBlock();
+                }
+            }
+    
+            writer.write(events, firstEventId);
+            
+            final List<JournaledProvenanceEvent> storedEvents = new ArrayList<>(events.size());
+            long id = firstEventId;
+            for (final ProvenanceEventRecord event : events) {
+                final JournaledStorageLocation location = new JournaledStorageLocation(containerName, sectionName, 
+                        String.valueOf(writer.getJournalId()), tocWriter.getCurrentBlockIndex(), id++);
+                final JournaledProvenanceEvent storedEvent = new JournaledProvenanceEvent(event, location);
+                storedEvents.add(storedEvent);
+            }
+            
+            indexWriter.index(storedEvents);
+            
+            if ( config.isAlwaysSync() ) {
+                writer.sync();
+            }
+            
+            // update the maxEventId; we don't need a compareAndSet because the AtomicLong is modified
+            // only within a write lock. But we use AtomicLong so that we 
+            if ( id > maxEventId ) {
+                maxEventId = id;
+            }
+            
+            if ( earliestEventTime == null ) {
+                Long earliest = null;
+                for ( final ProvenanceEventRecord event : events ) {
+                    if ( earliest == null || event.getEventTime() < earliest ) {
+                        earliest = event.getEventTime();
+                    }
+                }
+                
+                earliestEventTime = earliest;
+            }
+            
+            return storedEvents;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    // MUST be called with either the read lock or write lock held.
+    // determines whether or not we need to roll over the journal writer and toc writer.
+    private boolean isRolloverNecessary() {
+        if ( journalWriter == null ) {
+            return true;
+        }
+        
+        final long ageSeconds = journalWriter.getAge(TimeUnit.SECONDS);
+        final long rolloverSeconds = config.getJournalRolloverPeriod(TimeUnit.SECONDS);
+        if ( ageSeconds >= rolloverSeconds ) {
+            return true;
+        }
+        
+        if ( journalWriter.getSize() > config.getJournalCapacity() ) {
+            return true;
+        }
+        
+        return false;
+    }
+    
+    // MUST be called with write lock held.
+    private void rollover(final long firstEventId) throws IOException {
+        // if we have a writer already, close it and initiate rollover actions
+        if ( journalWriter != null ) {
+            journalWriter.finishBlock();
+            journalWriter.close();
+            tocWriter.close();
+            indexWriter.sync();
+        
+            if ( config.isCompressOnRollover() ) {
+                final File finishedFile = journalWriter.getJournalFile();
+                final File finishedTocFile = tocWriter.getFile();
+                executor.submit(new CompressionTask(finishedFile, journalWriter.getJournalId(), finishedTocFile));
+            }
+        }
+        
+        // create new writers and reset state.
+        final File journalFile = new File(journalsDir, firstEventId + JOURNAL_FILE_EXTENSION);
+        journalWriter = new StandardJournalWriter(firstEventId, journalFile, false, new StandardEventSerializer());
+        tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false);
+        numEventsAtEndOfLastBlock = 0;
+    }
+    
+
+    private Long getJournalId(final File file) {
+        long journalId;
+        final int dotIndex = file.getName().indexOf(".");
+        if ( dotIndex < 0 ) {
+            journalId = 0L;
+        } else {
+            try {
+                journalId = Long.parseLong(file.getName().substring(0, dotIndex));
+            } catch (final NumberFormatException nfe) {
+                return null;
+            }
+        }
+        
+        return journalId;
+    }
+    
+    @Override
+    public void restore() throws IOException {
+        // delete or rename files if stopped during rollover; compress any files that haven't been compressed
+        if ( !config.isReadOnly() ) {
+            final File[] children = journalsDir.listFiles();
+            if ( children != null ) {
+                // find the latest journal.
+                File latestJournal = null;
+                long latestJournalId = -1L;
+                
+                final List<File> journalFiles = new ArrayList<>();
+                
+                // find any journal files that either haven't been compressed or were partially compressed when
+                // we last shutdown and then restart compression.
+                for ( final File file : children ) {
+                    final String filename = file.getName();
+                    if ( !filename.contains(JOURNAL_FILE_EXTENSION) ) {
+                        continue;
+                    }
+                    
+                    final Long journalId = getJournalId(file);
+                    if ( journalId != null && journalId > latestJournalId ) {
+                        latestJournal = file;
+                        latestJournalId = journalId;
+                    }
+                    
+                    journalFiles.add(file);
+                    
+                    if ( !config.isCompressOnRollover() ) {
+                        continue;
+                    }
+                    
+                    if ( filename.endsWith(CompressionTask.FILE_EXTENSION) ) {
+                        final File uncompressedFile = new File(journalsDir, filename.replace(CompressionTask.FILE_EXTENSION, ""));
+                        if ( uncompressedFile.exists() ) {
+                            // both the compressed and uncompressed version of this journal exist. The Compression Task was
+                            // not complete when we shutdown. Delete the compressed journal and toc and re-start the Compression Task.
+                            final File tocFile = QueryUtils.getTocFile(uncompressedFile);
+                            executor.submit(new CompressionTask(uncompressedFile, getJournalId(uncompressedFile), tocFile));
+                        } else {
+                            // The compressed file exists but the uncompressed file does not. This means that we have finished
+                            // writing the compressed file and deleted the original journal file but then shutdown before
+                            // renaming the compressed file to the original filename. We can simply rename the compressed file
+                            // to the original file and then address the TOC file.
+                            final boolean rename = CompressionTask.rename(file, uncompressedFile);
+                            if ( !rename ) {
+                                logger.warn("{} During recovery, failed to rename {} to {}", this, file, uncompressedFile);
+                                continue;
+                            }
+                            
+                            // Check if the compressed TOC file exists. If not, we are finished.
+                            // If it does exist, then we know that it is complete, as described above, so we will go
+                            // ahead and replace the uncompressed version.
+                            final File tocFile = QueryUtils.getTocFile(uncompressedFile);
+                            final File compressedTocFile = new File(tocFile.getParentFile(), tocFile.getName() + CompressionTask.FILE_EXTENSION);
+                            if ( !compressedTocFile.exists() ) {
+                                continue;
+                            }
+                            
+                            tocFile.delete();
+                            
+                            final boolean renamedTocFile = CompressionTask.rename(compressedTocFile, tocFile);
+                            if ( !renamedTocFile ) {
+                                logger.warn("{} During recovery, failed to rename {} to {}", this, compressedTocFile, tocFile);
+                            }
+                        }
+                    }
+                }
+                
+                // Get the first event in the earliest journal file so that we know what the earliest time available is
+                Collections.sort(journalFiles, new Comparator<File>() {
+                    @Override
+                    public int compare(final File o1, final File o2) {
+                        return Long.compare(getJournalId(o1), getJournalId(o2));
+                    }
+                });
+                
+                for ( final File journal : journalFiles ) {
+                    try (final JournalReader reader = new StandardJournalReader(journal)) {
+                        final ProvenanceEventRecord record = reader.nextEvent();
+                        this.earliestEventTime = record.getEventTime();
+                        break;
+                    } catch (final IOException ioe) {
+                    }
+                }
+                
+                // Whatever was the last journal for this partition, we need to remove anything for that journal
+                // from the index and re-add them, and then sync the index. This allows us to avoid syncing
+                // the index each time (we sync only on rollover) but allows us to still ensure that we index
+                // all events.
+                if ( latestJournal != null ) {
+                    try {
+                        reindex(latestJournal);
+                    } catch (final EOFException eof) {
+                    }
+                }
+            }
+        }
+    }
+
+    
+    private void reindex(final File journalFile) throws IOException {
+        try (final TocJournalReader reader = new TocJournalReader(containerName, sectionName, String.valueOf(getJournalId(journalFile)), journalFile)) {
+            indexWriter.delete(containerName, sectionName, String.valueOf(getJournalId(journalFile)));
+            
+            long maxId = -1L;
+            final List<JournaledProvenanceEvent> storedEvents = new ArrayList<>(1000);
+            JournaledProvenanceEvent event;
+            while ((event = reader.nextJournaledEvent()) != null ) {
+                storedEvents.add(event);
+                maxId = event.getEventId();
+                
+                if ( storedEvents.size() == 1000 ) {
+                    indexWriter.index(storedEvents);
+                    storedEvents.clear();
+                }
+            }
+
+            if ( !storedEvents.isEmpty() ) {
+                indexWriter.index(storedEvents);
+            }
+            
+            indexWriter.sync();
+            this.maxEventId = maxId;
+        }
+    }
+
+    
+    @Override
+    public List<JournaledStorageLocation> getEvents(final long minEventId, final int maxRecords) throws IOException {
+        try (final EventIndexSearcher searcher = indexWriter.newIndexSearcher()) {
+            return searcher.getEvents(minEventId, maxRecords);
+        }
+    }
+    
+    @Override
+    public void shutdown() {
+        if ( journalWriter != null ) {
+            try {
+                journalWriter.finishBlock();
+            } catch (final IOException ioe) {
+                logger.warn("Failed to finish writing Block to {} due to {}", journalWriter, ioe);
+                if ( logger.isDebugEnabled() ) {
+                    logger.warn("", ioe);
+                }
+            }
+            
+            try {
+                journalWriter.close();
+            } catch (final IOException ioe) {
+                logger.warn("Failed to close {} due to {}", journalWriter, ioe);
+                if ( logger.isDebugEnabled() ) {
+                    logger.warn("", ioe);
+                }
+            }
+            
+            try {
+                tocWriter.close();
+            } catch (final IOException ioe) {
+                logger.warn("Failed to close {} due to {}", tocWriter, ioe);
+                if ( logger.isDebugEnabled() ) {
+                    logger.warn("", ioe);
+                }
+            }
+        }
+        
+        if ( indexWriter != null ) {
+            try {
+                indexWriter.close();
+            } catch (final IOException ioe) {
+                logger.warn("Failed to close {} due to {}", indexWriter, ioe);
+                if ( logger.isDebugEnabled() ) {
+                    logger.warn("", ioe);
+                }
+            }
+        }
+    }
+    
+    @Override
+    public long getMaxEventId() {
+        return maxEventId;
+    }
+
+    @Override
+    public Long getEarliestEventTime() throws IOException {
+        return earliestEventTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java
new file mode 100644
index 0000000..e77c8d5
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.partition;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
+import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+import org.apache.nifi.provenance.journaling.index.EventIndexSearcher;
+
+
+/**
+ * Represents a single Partition of the Journaling Provenance Repository. The repository is split into multiple
+ * partitions in order to provide higher throughput.
+ * 
+ * Implementations of this interface MUST be threadsafe.
+ */
+public interface Partition {
+
+    /**
+     * Returns a new EventIndexSearcher that can be used to search the events in this partition
+     * @return
+     * @throws IOException
+     */
+    EventIndexSearcher newIndexSearcher() throws IOException;
+    
+    /**
+     * Registers the given events with this partition. This includes persisting the events and indexing
+     * them so that they are searchable.
+     * @param events
+     * @return
+     */
+    List<JournaledProvenanceEvent> registerEvents(Collection<ProvenanceEventRecord> events, long firstEventId) throws IOException;
+    
+    /**
+     * Restore state after a restart of NiFi
+     */
+    void restore() throws IOException;
+    
+    /**
+     * Shuts down the Partition so that it can no longer be used
+     */
+    void shutdown();
+
+    /**
+     * Returns the largest event ID stored in this partition
+     * @return
+     */
+    long getMaxEventId();
+    
+    /**
+     * Returns the locations of events that have an id at least equal to minEventId, returning the events
+     * with the smallest ID's possible that are greater than minEventId
+     *
+     * @param minEventId
+     * @param maxRecords
+     * @return
+     */
+    List<JournaledStorageLocation> getEvents(long minEventId, int maxRecords) throws IOException;
+    
+    /**
+     * Returns the timestamp of the earliest event in this Partition, or <code>null</code> if the Partition
+     * contains no events
+     * @return
+     * @throws IOException
+     */
+    Long getEarliestEventTime() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionAction.java
new file mode 100644
index 0000000..8c680f5
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionAction.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.partition;
+
+import java.io.IOException;
+
+public interface PartitionAction<T> {
+    T perform(Partition partition) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
new file mode 100644
index 0000000..edbf75b
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.partition;
+
+import java.io.IOException;
+import java.util.Set;
+
+
+/**
+ * The PartitionManager is responsible for accessing and maintaining the Partitions so that they are
+ * are written to efficiently and in a thread-safe manner.
+ */
+public interface PartitionManager {
+
+    /**
+     * Performs the given action against one of the partitions
+     * 
+     * @param action the action to perform
+     * @param writeAction specifies whether or not the action writes to the repository
+     * @return
+     * @throws IOException
+     */
+    <T> T withPartition(PartitionAction<T> action, boolean writeAction) throws IOException;
+    
+    /**
+     * Performs the given action against one of the partitions
+     * 
+     * @param action the action to perform
+     * @param writeAction specifies whether or not the action writes to the repository
+     * @throws IOException
+     */
+    void withPartition(VoidPartitionAction action, boolean writeAction) throws IOException;
+    
+    /**
+     * Performs the given Action on each partition and returns the set of results.
+     * 
+     * @param action the action to perform
+     * @param writeAction specifies whether or not the action writes to the repository
+     * @return
+     */
+    <T> Set<T> withEachPartition(PartitionAction<T> action) throws IOException;
+    
+    /**
+     * Performs the given Action to each partition, optionally waiting for the action to complete
+     * @param action
+     * @param writeAction
+     * @param async if <code>true</code>, will perform the action asynchronously; if <code>false</code>, will
+     *  wait for the action to complete before returning
+     */
+    void withEachPartition(VoidPartitionAction action, boolean async);
+    
+    void shutdown();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
new file mode 100644
index 0000000..4ac0fc6
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.partition;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.util.Tuple;
+
+public class QueuingPartitionManager implements PartitionManager {
+
+    private final JournalingRepositoryConfig config;
+    private final BlockingQueue<Partition> partitionQueue;
+    private final JournalingPartition[] partitionArray;
+    private final ExecutorService executor;
+    private volatile boolean shutdown = false;
+    
+    private final AtomicInteger blacklistedCount = new AtomicInteger(0);
+    
+    public QueuingPartitionManager(final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException {
+        this.config = config;
+        this.partitionQueue = new LinkedBlockingQueue<>(config.getPartitionCount());
+        this.partitionArray = new JournalingPartition[config.getPartitionCount()];
+        
+        final List<Tuple<String, File>> containerTuples = new ArrayList<>(config.getContainers().size());
+        for ( final Map.Entry<String, File> entry : config.getContainers().entrySet() ) {
+            containerTuples.add(new Tuple<>(entry.getKey(), entry.getValue()));
+        }
+        
+        for (int i=0; i < config.getPartitionCount(); i++) {
+            final Tuple<String, File> tuple = containerTuples.get(i % containerTuples.size());
+            final File section = new File(tuple.getValue(), String.valueOf(i));
+            
+            final JournalingPartition partition = new JournalingPartition(tuple.getKey(), String.valueOf(i), section, config, executor);
+            partitionQueue.offer(partition);
+            partitionArray[i] = partition;
+        }
+        
+        this.executor = executor;
+    }
+    
+    @Override
+    public void shutdown() {
+        this.shutdown = true;
+        
+        for ( final Partition partition : partitionArray ) {
+            partition.shutdown();
+        }
+    }
+    
+    private Partition nextPartition() {
+        Partition partition = null;
+        
+        while(partition == null) {
+            if (shutdown) {
+                throw new RuntimeException("Journaling Provenance Repository is shutting down");
+            }
+            
+            try {
+                partition = partitionQueue.poll(1, TimeUnit.SECONDS);
+            } catch (final InterruptedException ie) {
+            }
+            
+            if ( partition == null ) {
+                if ( blacklistedCount.get() >= config.getPartitionCount() ) {
+                    throw new RuntimeException("Cannot persist to the Journal Provenance Repository because all partitions have been blacklisted due to write failures");
+                }
+            }
+        }
+        
+        return partition;
+    }
+    
+    @Override
+    public <T> T withPartition(final PartitionAction<T> action, final boolean writeAction) throws IOException {
+        final Partition partition = nextPartition();
+
+        boolean ioe = false;
+        try {
+            return action.perform(partition);
+        } catch (final IOException e) {
+            ioe = true;
+            throw e;
+        } finally {
+            if ( ioe && writeAction ) {
+                // We failed to write to this Partition. This partition will no longer be usable until NiFi is restarted!
+                blacklistedCount.incrementAndGet();
+            } else {
+                partitionQueue.offer(partition);
+            }
+        }
+    }
+    
+    @Override
+    public void withPartition(final VoidPartitionAction action, final boolean writeAction) throws IOException {
+        final Partition partition = nextPartition();
+
+        boolean ioe = false;
+        try {
+            action.perform(partition);
+        } catch (final IOException e) {
+            ioe = true;
+            throw e;
+        } finally {
+            if ( ioe && writeAction ) {
+                // We failed to write to this Partition. This partition will no longer be usable until NiFi is restarted!
+                blacklistedCount.incrementAndGet();
+            } else {
+                partitionQueue.offer(partition);
+            }
+        }
+    }
+
+    
+    @Override
+    public <T> Set<T> withEachPartition(final PartitionAction<T> action) throws IOException {
+        final Set<T> results = new HashSet<>(partitionArray.length);
+        
+        // TODO: Do not use blacklisted partitions.
+        final Map<Partition, Future<T>> futures = new HashMap<>(partitionArray.length);
+        for ( final Partition partition : partitionArray ) {
+            final Callable<T> callable = new Callable<T>() {
+                @Override
+                public T call() throws Exception {
+                    return action.perform(partition);
+                }
+            };
+            
+            final Future<T> future = executor.submit(callable);
+            futures.put(partition, future);
+        }
+        
+        for ( final Map.Entry<Partition, Future<T>> entry : futures.entrySet() ) {
+            try {
+                final T result = entry.getValue().get();
+                results.add(result);
+            } catch (final ExecutionException ee) {
+                final Throwable cause = ee.getCause();
+                if ( cause instanceof IOException ) {
+                    throw (IOException) cause;
+                } else {
+                    throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause);
+                }
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        
+        return results;
+    }
+    
+    @Override
+    public void withEachPartition(final VoidPartitionAction action, final boolean async) {
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/VoidPartitionAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/VoidPartitionAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/VoidPartitionAction.java
new file mode 100644
index 0000000..beaa187
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/VoidPartitionAction.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.partition;
+
+import java.io.IOException;
+
+public interface VoidPartitionAction {
+    void perform(Partition partition) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
new file mode 100644
index 0000000..c23a405
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.tasks;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
+import org.apache.nifi.provenance.journaling.journals.JournalReader;
+import org.apache.nifi.provenance.journaling.journals.JournalWriter;
+import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
+import org.apache.nifi.provenance.journaling.journals.StandardJournalWriter;
+import org.apache.nifi.provenance.journaling.toc.StandardTocReader;
+import org.apache.nifi.provenance.journaling.toc.StandardTocWriter;
+import org.apache.nifi.provenance.journaling.toc.TocReader;
+import org.apache.nifi.provenance.journaling.toc.TocWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompressionTask implements Runnable {
+    public static final String FILE_EXTENSION = ".compress";
+    
+    private static final Logger logger = LoggerFactory.getLogger(CompressionTask.class);
+    
+    private final File journalFile;
+    private final long journalId;
+    private final File tocFile;
+    
+    public CompressionTask(final File journalFile, final long journalId, final File tocFile) {
+        this.journalFile = journalFile;
+        this.journalId = journalId;
+        this.tocFile = tocFile;
+    }
+
+    public void compress(final JournalReader reader, final JournalWriter writer, final TocReader tocReader, final TocWriter tocWriter) throws IOException {
+        ProvenanceEventRecord event;
+        
+        int blockIndex = 0;
+        long blockOffset = tocReader.getBlockOffset(blockIndex);
+        tocWriter.addBlockOffset(blockOffset);
+        long nextBlockOffset = tocReader.getBlockOffset(blockIndex + 1);
+        
+        try {
+            while ((event = reader.nextEvent()) != null) {
+                // Check if we've gone beyond the offset of the next block. If so, write
+                // out a new block in the TOC.
+                final long newPosition = reader.getPosition();
+                if ( newPosition > nextBlockOffset && nextBlockOffset > 0 ) {
+                    blockIndex++;
+                    blockOffset = tocReader.getBlockOffset(blockIndex);
+                    tocWriter.addBlockOffset(writer.getSize());
+                    
+                    nextBlockOffset = tocReader.getBlockOffset(blockIndex + 1);
+                }
+                
+                // Write the event to the compressed writer
+                writer.write(Collections.singleton(event), event.getEventId());
+            }
+        } catch (final EOFException eof) {
+            logger.warn("Found unexpected End-of-File when compressing {}", reader);
+        }
+    }
+
+    /**
+     * Attempts to delete the given file up to 10 times, waiting a bit in between each failed
+     * iteration, in case another process (for example, a virus scanner) has the file locked
+     * 
+     * @param file
+     * @return
+     */
+    private boolean delete(final File file) {
+        for (int i=0; i < 10; i++) {
+            if ( file.delete() || !file.exists() ) {
+                return true;
+            }
+            
+            try {
+                Thread.sleep(100L);
+            } catch (final InterruptedException ie) {
+            }
+        }
+        
+        return false;
+    }
+    
+    /**
+     * Attempts to rename the given original file to the renamed file up to 20 times, waiting a bit
+     * in between each failed iteration, in case another process (for example, a virus scanner) has 
+     * the file locked
+     * 
+     * @param original
+     * @param renamed
+     * @return
+     */
+    public static boolean rename(final File original, final File renamed) {
+        for (int i=0; i < 20; i++) {
+            if ( original.renameTo(renamed) ) {
+                return true;
+            }
+            
+            try {
+                Thread.sleep(100L);
+            } catch (final InterruptedException ie) {
+            }
+        }
+        
+        return false;
+    }
+    
+    @Override
+    public void run() {
+        try {
+            final File compressedFile = new File(journalFile.getParentFile(), journalFile.getName() + FILE_EXTENSION);
+            final File compressedTocFile = new File(tocFile.getParentFile(), tocFile.getName() + FILE_EXTENSION);
+
+            try (final JournalReader journalReader = new StandardJournalReader(journalFile);
+                final JournalWriter compressedWriter = new StandardJournalWriter(journalId, compressedFile, true, new StandardEventSerializer());
+                final TocReader tocReader = new StandardTocReader(tocFile);
+                final TocWriter compressedTocWriter = new StandardTocWriter(compressedTocFile, true)) {
+                
+                compress(journalReader, compressedWriter, tocReader, compressedTocWriter);
+                compressedWriter.sync();
+            }
+
+            final boolean deletedJournal = delete(journalFile);
+            if ( !deletedJournal ) {
+                delete(compressedFile);
+                delete(compressedTocFile);
+                logger.error("Failed to remove Journal file {}; considering compression task a failure", journalFile);
+                return;
+            }
+            
+            final boolean deletedToc = delete(tocFile);
+            if ( !deletedToc ) {
+                delete(compressedFile);
+                delete(compressedTocFile);
+                logger.error("Failed to remove TOC file for {}; considering compression task a failure", journalFile);
+                return;
+            }
+            
+            final boolean renamedJournal = rename(compressedFile, journalFile);
+            if ( !renamedJournal ) {
+                logger.error("Failed to rename {} to {}; this journal file may be inaccessible until it is renamed", compressedFile, journalFile);
+            }
+            
+            final boolean renamedToc = rename(compressedTocFile, tocFile);
+            if ( !renamedToc ) {
+                logger.error("Failed to rename {} to {}; this journal file may be inaccessible until it is renamed", compressedTocFile, tocFile);
+            }
+            
+            logger.info("Successfully compressed Journal File {}");
+        } catch (final IOException ioe) {
+            logger.error("Failed to compress Journal File {} due to {}", journalFile, ioe.toString());
+            if ( logger.isDebugEnabled() ) {
+                logger.error("", ioe);
+            }
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java
new file mode 100644
index 0000000..995acf9
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.toc;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Standard implementation of TocReader.
+ * 
+ * Expects .toc file to be in the following format;
+ * 
+ * byte 0: version
+ * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed
+ * byte 2-9: long: offset of block 0
+ * byte 10-17: long: offset of block 1
+ * ...
+ * byte (N*8+2)-(N*8+9): long: offset of block N
+ */
+public class StandardTocReader implements TocReader {
+    private final boolean compressed;
+    private final long[] offsets;
+    
+    public StandardTocReader(final File file) throws IOException {
+        try (final FileInputStream fis = new FileInputStream(file);
+             final DataInputStream dis = new DataInputStream(fis)) {
+            
+            final int version = dis.read();
+            if ( version < 0 ) {
+                throw new EOFException();
+            }
+            
+            final int compressionFlag = dis.read();
+            if ( compressionFlag < 0 ) {
+                throw new EOFException();
+            }
+            
+            if ( compressionFlag == 0 ) {
+                compressed = false;
+            } else if ( compressionFlag == 1 ) {
+                compressed = true;
+            } else {
+                throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag);
+            }
+            
+            final int numBlocks = (int) ((file.length() - 2) / 8);
+            offsets = new long[numBlocks];
+            
+            for (int i=0; i < numBlocks; i++) {
+                offsets[i] = dis.readLong();
+            }
+        }
+    }
+    
+    @Override
+    public boolean isCompressed() {
+        return compressed;
+    }
+    
+    @Override
+    public long getBlockOffset(final int blockIndex) {
+        if ( blockIndex >= offsets.length ) {
+            return -1L;
+        }
+        return offsets[blockIndex];
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
new file mode 100644
index 0000000..6058282
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.toc;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+
+/**
+ * Standard implementation of {@link TocWriter}.
+ * 
+ * Format of .toc file:
+ * byte 0: version
+ * byte 1: compressed: 0 -> not compressed, 1 -> compressed
+ * byte 2-9: long: offset of block 0
+ * byte 10-17: long: offset of block 1
+ * ...
+ * byte (N*8+2)-(N*8+9): long: offset of block N
+ */
+public class StandardTocWriter implements TocWriter {
+    public static final byte VERSION = 1;
+    
+    private final File file;
+    private final FileOutputStream fos;
+    private int index = 0;
+    
+    /**
+     * Creates a StandardTocWriter that writes to the given file.
+     * @param file the file to write to
+     * @param compressionFlag whether or not the journal is compressed
+     * @throws FileNotFoundException 
+     */
+    public StandardTocWriter(final File file, final boolean compressionFlag) throws IOException {
+        if ( file.exists() ) {
+            throw new FileAlreadyExistsException(file.getAbsolutePath());
+        }
+        
+        if ( !file.getParentFile().exists() && !file.getParentFile().mkdirs() ) {
+            throw new IOException("Could not create directory " + file.getParent());
+        }
+        
+        this.file = file;
+        fos = new FileOutputStream(file);
+        
+        fos.write(VERSION);
+        fos.write(compressionFlag ? 1 : 0);
+        fos.flush();
+        fos.getFD().sync();
+    }
+    
+    @Override
+    public void addBlockOffset(final long offset) throws IOException {
+        final BufferedOutputStream bos = new BufferedOutputStream(fos);
+        final DataOutputStream dos = new DataOutputStream(bos);
+        dos.writeLong(offset);
+        dos.flush();
+        
+        fos.getFD().sync();
+    }
+    
+    @Override
+    public int getCurrentBlockIndex() {
+        return index;
+    }
+
+    @Override
+    public void close() throws IOException {
+        fos.close();
+    }
+    
+    @Override
+    public File getFile() {
+        return file;
+    }
+    
+    @Override
+    public String toString() {
+        return "TOC Writer for " + file;
+    }
+}


Mime
View raw message