nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject nifi git commit: NIFI-2890 Provenance Repository Corruption (1.x) * Corrected handling of corrupt journal file records that prevents instance startup and loss of records from corrupt files. Specifically, exception handling was expanded to cover failures
Date Tue, 14 Feb 2017 15:02:27 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 93ea34815 -> bd3e0438a


NIFI-2890 Provenance Repository Corruption (1.x)
* Corrected handling of corrupt journal file records that prevents instance startup and loss
of records from corrupt files.  Specifically, exception handling was expanded to cover failures
on records after the first the same as failures on the first record.
* Adjusted log messages  to reflect that the remainder or all of the journal will be skipped,
not just the current record.

This closes #1485.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bd3e0438
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bd3e0438
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bd3e0438

Branch: refs/heads/master
Commit: bd3e0438a32a7107d91d2e98e47dcf64af2021b4
Parents: 93ea348
Author: Joe Skora <jskora@apache.org>
Authored: Tue Feb 7 20:43:08 2017 +0000
Committer: Mark Payne <markap14@hotmail.com>
Committed: Tue Feb 14 10:02:16 2017 -0500

----------------------------------------------------------------------
 .../PersistentProvenanceRepository.java         |  26 +-
 .../TestPersistentProvenanceRepository.java     | 394 +++++++++++++++++--
 2 files changed, 393 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bd3e0438/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 282fabc..3037e66 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -1688,16 +1688,19 @@ public class PersistentProvenanceRepository implements ProvenanceRepository
{
                 try {
                     record = reader.nextRecord();
                 } catch (final EOFException eof) {
+                    // record will be null and reader can no longer be used
                 } catch (final Exception e) {
-                    logger.warn("Failed to generate Provenance Event Record from Journal
due to " + e + "; it's possible that the record wasn't "
-                            + "completely written to the file. This record will be skipped.");
+                    logger.warn("Failed to generate Provenance Event Record from Journal
due to " + e + "; it's "
+                            + "possible that the record wasn't completely written to the
file. This journal will be "
+                            + "skipped.");
                     if (logger.isDebugEnabled()) {
                         logger.warn("", e);
                     }
 
                     if (eventReporter != null) {
-                        eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed
to read Provenance Event Record from Journal due to " + e
-                                + "; it's possible that hte record wasn't completely written
to the file. This record will be skipped.");
+                        eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed
to read Provenance Event "
+                                + "Record from Journal due to " + e + "; it's possible that
the record wasn't "
+                                + "completely written to the file. This journal will be skipped.");
                     }
                 }
 
@@ -1834,6 +1837,21 @@ public class PersistentProvenanceRepository implements ProvenanceRepository
{
                             try {
                                 nextRecord = reader.nextRecord();
                             } catch (final EOFException eof) {
+                                // record will be null and reader can no longer be used
+                            } catch (final Exception e) {
+                                logger.warn("Failed to generate Provenance Event Record from
Journal due to " + e
+                                        + "; it's possible that the record wasn't completely
written to the file. "
+                                        + "The remainder of this journal will be skipped.");
+                                if (logger.isDebugEnabled()) {
+                                    logger.warn("", e);
+                                }
+
+                                if (eventReporter != null) {
+                                    eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY,
"Failed to read "
+                                            + "Provenance Event Record from Journal due to
" + e + "; it's possible "
+                                            + "that the record wasn't completely written
to the file. The remainder "
+                                            + "of this journal will be skipped.");
+                                }
                             }
 
                             if (nextRecord != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/bd3e0438/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 831584b..00f4617 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -25,11 +25,17 @@ import static org.mockito.Mockito.mock;
 
 import java.io.File;
 import java.io.FileFilter;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -37,6 +43,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -74,16 +81,20 @@ import org.apache.nifi.provenance.search.SearchableField;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.serialization.RecordWriters;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.stream.io.DataOutputStream;
+import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.file.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,14 +104,21 @@ public class TestPersistentProvenanceRepository {
     @Rule
     public TestName name = new TestName();
 
+    @ClassRule
+    public static TemporaryFolder tempFolder = new TemporaryFolder();
+
     private PersistentProvenanceRepository repo;
-    private RepositoryConfiguration config;
+    private static RepositoryConfiguration config;
 
     public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
     private EventReporter eventReporter;
     private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>());
 
-    private RepositoryConfiguration createConfiguration() {
+    private static int headerSize;
+    private static int recordSize;
+    private static int recordSize2;
+
+    private static RepositoryConfiguration createConfiguration() {
         config = new RepositoryConfiguration();
         config.addStorageDirectory(new File("target/storage/" + UUID.randomUUID().toString()));
         config.setCompressOnRollover(true);
@@ -114,6 +132,43 @@ public class TestPersistentProvenanceRepository {
         System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
     }
 
+    @BeforeClass
+    public static void findJournalSizes() throws IOException {
+        // determine header and record size
+
+        final Map<String, String> attributes = new HashMap<>();
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+        final ProvenanceEventRecord record = builder.build();
+        builder.setComponentId("2345");
+        final ProvenanceEventRecord record2 = builder.build();
+
+        final File tempRecordFile = tempFolder.newFile("record.tmp");
+        System.out.println("findJournalSizes position 0 = " + tempRecordFile.length());
+
+        final RecordWriter writer = RecordWriters.newSchemaRecordWriter(tempRecordFile, false,
false);
+        writer.writeHeader(12345L);
+        writer.flush();
+        headerSize = Long.valueOf(tempRecordFile.length()).intValue();
+        writer.writeRecord(record, 12345L);
+        writer.flush();
+        recordSize = Long.valueOf(tempRecordFile.length()).intValue() - headerSize;
+        writer.writeRecord(record2, 23456L);
+        writer.flush();
+        recordSize2 = Long.valueOf(tempRecordFile.length()).intValue() - headerSize - recordSize;
+        writer.close();
+
+        System.out.println("headerSize =" + headerSize);
+        System.out.println("recordSize =" + recordSize);
+        System.out.println("recordSize2=" + recordSize2);
+    }
+
     @Before
     public void printTestName() {
         System.out.println("\n\n\n***********************  " + name.getMethodName() + " 
*****************************");
@@ -139,24 +194,29 @@ public class TestPersistentProvenanceRepository {
             }
         }
 
-        // Delete all of the storage files. We do this in order to clean up the tons of files
that
-        // we create but also to ensure that we have closed all of the file handles. If we
leave any
-        // streams open, for instance, this will throw an IOException, causing our unit test
to fail.
-        for (final File storageDir : config.getStorageDirectories()) {
-            int i;
-            for (i = 0; i < 3; i++) {
-                try {
-                    FileUtils.deleteFile(storageDir, true);
-                    break;
-                } catch (final IOException ioe) {
-                    // if there is a virus scanner, etc. running in the background we may
not be able to
-                    // delete the file. Wait a sec and try again.
-                    if (i == 2) {
-                        throw ioe;
-                    } else {
+        if (config != null) {
+            // Delete all of the storage files. We do this in order to clean up the tons
of files that
+            // we create but also to ensure that we have closed all of the file handles.
If we leave any
+            // streams open, for instance, this will throw an IOException, causing our unit
test to fail.
+            for (final File storageDir : config.getStorageDirectories()) {
+                if (storageDir.exists()) {
+                    int i;
+                    for (i = 0; i < 3; i++) {
                         try {
-                            Thread.sleep(1000L);
-                        } catch (final InterruptedException ie) {
+                            System.out.println("file: " + storageDir.toString() + " exists="
+ storageDir.exists());
+                            FileUtils.deleteFile(storageDir, true);
+                            break;
+                        } catch (final IOException ioe) {
+                            // if there is a virus scanner, etc. running in the background
we may not be able to
+                            // delete the file. Wait a sec and try again.
+                            if (i == 2) {
+                                throw ioe;
+                            } else {
+                                try {
+                                    Thread.sleep(1000L);
+                                } catch (final InterruptedException ie) {
+                                }
+                            }
                         }
                     }
                 }
@@ -242,6 +302,48 @@ public class TestPersistentProvenanceRepository {
         repo.close();
     }
 
+    private NiFiProperties properties = new NiFiProperties() {
+        @Override
+        public String getProperty(String key) {
+            if (key.equals(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER)) {
+                return "true";
+            } else if (key.equals(NiFiProperties.PROVENANCE_ROLLOVER_TIME)) {
+                return "2000 millis";
+            } else if (key.equals(NiFiProperties.PROVENANCE_REPO_DIRECTORY_PREFIX + ".default"))
{
+                createConfiguration();
+                return config.getStorageDirectories().get(0).getAbsolutePath();
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        public Set<String> getPropertyKeys() {
+            return new HashSet<>(Arrays.asList(
+                    NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER,
+                    NiFiProperties.PROVENANCE_ROLLOVER_TIME,
+                    NiFiProperties.PROVENANCE_REPO_DIRECTORY_PREFIX + ".default"));
+        }
+    };
+
+    @Test
+    public void constructorNoArgs() {
+        TestablePersistentProvenanceRepository tppr = new TestablePersistentProvenanceRepository();
+        assertEquals(0, tppr.getRolloverCheckMillis());
+    }
+
+    @Test
+    public void constructorNiFiProperties() throws IOException {
+        TestablePersistentProvenanceRepository tppr = new TestablePersistentProvenanceRepository(properties);
+        assertEquals(10000, tppr.getRolloverCheckMillis());
+    }
+
+    @Test
+    public void constructorConfig() throws IOException {
+        RepositoryConfiguration configuration = createTestableRepositoryConfiguration(properties);
+        TestablePersistentProvenanceRepository tppr = new TestablePersistentProvenanceRepository(configuration,
20000);
+    }
+
     @Test
     public void testAddAndRecover() throws IOException, InterruptedException {
         final RepositoryConfiguration config = createConfiguration();
@@ -1589,10 +1691,6 @@ public class TestPersistentProvenanceRepository {
         Thread.sleep(3000L);
     }
 
-
-    // TODO: test EOF on merge
-    // TODO: Test journal with no records
-
     @Test
     public void testTextualQuery() throws InterruptedException, IOException, ParseException
{
         final RepositoryConfiguration config = createConfiguration();
@@ -1661,6 +1759,28 @@ public class TestPersistentProvenanceRepository {
         }
     }
 
+    private long checkJournalRecords(final File storageDir, final Boolean exact) throws IOException
{
+        File[] storagefiles = storageDir.listFiles();
+        long counter = 0;
+        assertNotNull(storagefiles);
+        for (final File file : storagefiles) {
+            if (file.isFile()) {
+                try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048))
{
+                    ProvenanceEventRecord r;
+                    ProvenanceEventRecord last = null;
+                    while ((r = reader.nextRecord()) != null) {
+                        if (exact) {
+                            assertTrue(counter++ == r.getEventId());
+                        } else {
+                            assertTrue(counter++ <= r.getEventId());
+                        }
+                    }
+                }
+            }
+        }
+        return counter;
+    }
+
     @Test
     public void testMergeJournals() throws IOException, InterruptedException {
         final RepositoryConfiguration config = createConfiguration();
@@ -1711,6 +1831,176 @@ public class TestPersistentProvenanceRepository {
         assertEquals(10000, counter);
     }
 
+    private void corruptJournalFile(final File journalFile, final int position,
+                                    final String original, final String replacement) throws
IOException {
+        final int journalLength = Long.valueOf(journalFile.length()).intValue();
+        final byte[] origBytes = original.getBytes();
+        final byte[] replBytes = replacement.getBytes();
+        FileInputStream journalIn = new FileInputStream(journalFile);
+        byte[] content = new byte[journalLength];
+        assertEquals(journalLength, journalIn.read(content, 0, journalLength));
+        journalIn.close();
+        assertEquals(original, new String(Arrays.copyOfRange(content, position, position
+ origBytes.length)));
+        System.arraycopy(replBytes, 0, content, position, replBytes.length);
+        FileOutputStream journalOut = new FileOutputStream(journalFile);
+        journalOut.write(content, 0, journalLength);
+        journalOut.flush();
+        journalOut.close();
+    }
+
+    @Test
+    public void testMergeJournalsBadFirstRecord() throws IOException, InterruptedException
{
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+        TestablePersistentProvenanceRepository testRepo = new TestablePersistentProvenanceRepository(config,
DEFAULT_ROLLOVER_MILLIS);
+        testRepo.initialize(getEventReporter(), null, null);
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        final ProvenanceEventRecord record = builder.build();
+
+        final ExecutorService exec = Executors.newFixedThreadPool(10);
+        final List<Future> futures = new ArrayList<>();
+        for (int i = 0; i < 10000; i++) {
+            futures.add(exec.submit(new Runnable() {
+                @Override
+                public void run() {
+                    testRepo.registerEvent(record);
+                }
+            }));
+        }
+
+        // wait for writers to finish and then corrupt the first record of the first journal
file
+        for (Future future : futures) {
+            while (!future.isDone()) {
+                Thread.sleep(10);
+            }
+        }
+        RecordWriter firstWriter = testRepo.getWriters()[0];
+        corruptJournalFile(firstWriter.getFile(), headerSize + 15,"RECEIVE", "BADTYPE");
+
+        testRepo.recoverJournalFiles();
+
+        assertEquals("mergeJournals() should report a skipped journal", 1, reportedEvents.size());
+        assertEquals("mergeJournals() should report a skipped journal",
+                "Failed to read Provenance Event Record from Journal due to java.lang.IllegalArgumentException:
"
+                        + "No enum constant org.apache.nifi.provenance.ProvenanceEventType.BADTYPE;
it's possible "
+                        + "that the record wasn't completely written to the file. This journal
will be skipped.",
+                reportedEvents.get(reportedEvents.size() - 1).getMessage());
+
+        final File storageDir = config.getStorageDirectories().get(0);
+        assertTrue(checkJournalRecords(storageDir, false) < 10000);
+    }
+
+    @Test
+    public void testMergeJournalsBadRecordAfterFirst() throws IOException, InterruptedException
{
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+        TestablePersistentProvenanceRepository testRepo = new TestablePersistentProvenanceRepository(config,
DEFAULT_ROLLOVER_MILLIS);
+        testRepo.initialize(getEventReporter(), null, null);
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        final ProvenanceEventRecord record = builder.build();
+
+        final ExecutorService exec = Executors.newFixedThreadPool(10);
+        final List<Future> futures = new ArrayList<>();
+        for (int i = 0; i < 10000; i++) {
+            futures.add(exec.submit(new Runnable() {
+                @Override
+                public void run() {
+                    testRepo.registerEvent(record);
+                }
+            }));
+        }
+
+        // corrupt the first record of the first journal file
+        for (Future future : futures) {
+            while (!future.isDone()) {
+                Thread.sleep(10);
+            }
+        }
+        RecordWriter firstWriter = testRepo.getWriters()[0];
+        corruptJournalFile(firstWriter.getFile(), headerSize + 15 + recordSize, "RECEIVE",
"BADTYPE");
+
+        testRepo.recoverJournalFiles();
+
+        assertEquals("mergeJournals should report a skipped journal", 1, reportedEvents.size());
+        assertEquals("mergeJournals should report a skipped journal",
+                "Failed to read Provenance Event Record from Journal due to java.lang.IllegalArgumentException:
"
+                        + "No enum constant org.apache.nifi.provenance.ProvenanceEventType.BADTYPE;
it's possible "
+                        + "that the record wasn't completely written to the file. The remainder
of this journal will "
+                        + "be skipped.",
+                reportedEvents.get(reportedEvents.size() - 1).getMessage());
+
+        final File storageDir = config.getStorageDirectories().get(0);
+        assertTrue(checkJournalRecords(storageDir, false) < 10000);
+    }
+
+    @Test
+    public void testMergeJournalsEmptyJournal() throws IOException, InterruptedException
{
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+        TestablePersistentProvenanceRepository testRepo = new TestablePersistentProvenanceRepository(config,
DEFAULT_ROLLOVER_MILLIS);
+        testRepo.initialize(getEventReporter(), null, null);
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        final ProvenanceEventRecord record = builder.build();
+
+        final ExecutorService exec = Executors.newFixedThreadPool(10);
+        final List<Future> futures = new ArrayList<>();
+        for (int i = 0; i < config.getJournalCount() - 1; i++) {
+            futures.add(exec.submit(new Runnable() {
+                @Override
+                public void run() {
+                    testRepo.registerEvent(record);
+                }
+            }));
+        }
+
+        // wait for writers to finish and then corrupt the first record of the first journal
file
+        for (Future future : futures) {
+            while (!future.isDone()) {
+                Thread.sleep(10);
+            }
+        }
+
+        testRepo.recoverJournalFiles();
+
+        assertEquals("mergeJournals() should not error on empty journal", 0, reportedEvents.size());
+
+        final File storageDir = config.getStorageDirectories().get(0);
+        assertEquals(config.getJournalCount() - 1, checkJournalRecords(storageDir, true));
+    }
+
     @Test
     public void testRolloverRetry() throws IOException, InterruptedException {
         final AtomicInteger retryAmount = new AtomicInteger(0);
@@ -1967,4 +2257,62 @@ public class TestPersistentProvenanceRepository {
         };
     }
 
+    private static class TestablePersistentProvenanceRepository extends PersistentProvenanceRepository
{
+
+        TestablePersistentProvenanceRepository() {
+            super();
+        }
+
+        TestablePersistentProvenanceRepository(final NiFiProperties nifiProperties) throws
IOException {
+            super(nifiProperties);
+        }
+
+        TestablePersistentProvenanceRepository(final RepositoryConfiguration configuration,
final int rolloverCheckMillis) throws IOException {
+            super(configuration, rolloverCheckMillis);
+        }
+
+        RecordWriter[] getWriters() {
+            Class klass = PersistentProvenanceRepository.class;
+            Field writersField;
+            RecordWriter[] writers = null;
+            try {
+                writersField = klass.getDeclaredField("writers");
+                writersField.setAccessible(true);
+                writers = (RecordWriter[]) writersField.get(this);
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                e.printStackTrace();
+            }
+            return writers;
+        }
+
+        int getRolloverCheckMillis() {
+            Class klass = PersistentProvenanceRepository.class;
+            java.lang.reflect.Field rolloverCheckMillisField;
+            int rolloverCheckMillis = -1;
+            try {
+                rolloverCheckMillisField = klass.getDeclaredField("rolloverCheckMillis");
+                rolloverCheckMillisField.setAccessible(true);
+                rolloverCheckMillis = (int) rolloverCheckMillisField.get(this);
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                e.printStackTrace();
+            }
+            return rolloverCheckMillis;
+        }
+
+    }
+
+    private RepositoryConfiguration createTestableRepositoryConfiguration(final NiFiProperties
properties) {
+        Class klass = PersistentProvenanceRepository.class;
+        Method createRepositoryConfigurationMethod;
+        RepositoryConfiguration configuration = null;
+        try {
+            createRepositoryConfigurationMethod = klass.getDeclaredMethod("createRepositoryConfiguration",
NiFiProperties.class);
+            createRepositoryConfigurationMethod.setAccessible(true);
+            configuration = (RepositoryConfiguration)createRepositoryConfigurationMethod.invoke(null,
properties);
+        } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException
e) {
+            e.printStackTrace();
+        }
+        return configuration;
+    }
+
 }


Mime
View raw message