nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [10/13] nifi git commit: Merge branch 'develop' into NIFI-744
Date Thu, 06 Aug 2015 13:02:44 GMT
Merge branch 'develop' into NIFI-744

Conflicts:
	nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fe25ae09
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fe25ae09
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fe25ae09

Branch: refs/heads/NIFI-744
Commit: fe25ae093f09d569c9a34fb4374c4b98c0734aa6
Parents: 05d4b06 15d4902
Author: Mark Payne <markap14@hotmail.com>
Authored: Tue Aug 4 09:41:39 2015 -0400
Committer: Mark Payne <markap14@hotmail.com>
Committed: Tue Aug 4 09:41:39 2015 -0400

----------------------------------------------------------------------
 .../authorization/DownloadAuthorization.java    |   2 +-
 nifi/nifi-bootstrap/pom.xml                     |  37 +-
 .../nifi-hl7-query-language/pom.xml             |   6 +-
 .../org/wali/MinimalLockingWriteAheadLog.java   |  15 +-
 .../src/test/java/org/wali/DummyRecord.java     |   5 +
 .../test/java/org/wali/DummyRecordSerde.java    |   8 +
 .../wali/TestMinimalLockingWriteAheadLog.java   | 215 +++++++
 nifi/nifi-docs/pom.xml                          |  19 +-
 .../nifi-aws-bundle/nifi-aws-processors/pom.xml |   4 -
 .../nifi-flume-bundle/nifi-flume-nar/pom.xml    |   3 +-
 .../nifi-flume-processors/pom.xml               |  32 +-
 nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml |  29 +-
 .../repository/StandardProcessSession.java      |   5 +-
 .../repository/io/LimitedInputStream.java       |   7 +-
 .../repository/io/TestLimitedInputStream.java   | 122 ++++
 .../repository/io/TestLimitedOutputStream.java  |  76 ---
 .../nifi-framework/nifi-site-to-site/pom.xml    |   2 +-
 .../src/main/webapp/WEB-INF/web.xml             |  11 +-
 .../src/main/webapp/js/nf/nf-common.js          |   2 +-
 .../nifi-geo-bundle/nifi-geo-nar/pom.xml        |   2 +-
 .../nifi-hdfs-processors/pom.xml                |   4 +-
 .../nifi-kafka-processors/pom.xml               |   1 -
 .../PersistentProvenanceRepository.java         | 170 ++++--
 .../nifi/provenance/StandardRecordReader.java   |   8 +-
 .../nifi/provenance/StandardRecordWriter.java   |  24 +-
 .../provenance/serialization/RecordWriter.java  |  12 +-
 .../TestPersistentProvenanceRepository.java     | 120 +++-
 .../nifi-twitter-processors/pom.xml             |   2 -
 .../nifi-standard-processors/pom.xml            | 553 +++++++++----------
 .../nifi-dbcp-service/pom.xml                   |  28 +-
 .../nifi-http-context-map/pom.xml               |   4 +-
 .../nifi-standard-services-api-nar/pom.xml      |  11 +-
 32 files changed, 1017 insertions(+), 522 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fe25ae09/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi/blob/fe25ae09/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 4408e3d,a1063f0..4d02d18
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@@ -1068,29 -1082,24 +1089,36 @@@ public class PersistentProvenanceReposi
              final int journalCountThreshold = configuration.getJournalCount() * 5;
              final long sizeThreshold = (long) (configuration.getMaxStorageCapacity() * 1.1D);
// do not go over 10% of max capacity
  
+             // check if we need to apply backpressure.
+             // If we have too many journal files, or if the repo becomes too large, backpressure
is necessary. Without it,
+             // if the rate at which provenance events are registered exceeds the rate at
which we can compress/merge/index them,
+             // then eventually we will end up with all of the data stored in the 'journals'
directory and not yet indexed. This
+             // would mean that the data would never even be accessible. In order to prevent
this, if we exceeds 110% of the configured
+             // max capacity for the repo, or if we have 5 sets of journal files waiting
to be merged, we will block here until
+             // that is no longer the case.
              if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold)
{
                  logger.warn("The rate of the dataflow is exceeding the provenance recording
rate. "
 -                        + "Slowing down flow to accomodate. Currently, there are {} journal
files ({} bytes) and "
 +                        + "Slowing down flow to accommodate. Currently, there are {} journal
files ({} bytes) and "
                          + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize,
journalCountThreshold, sizeThreshold);
                  eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The
rate of the dataflow is "
 -                        + "exceeding the provenance recording rate. Slowing down flow to
accomodate");
 +                        + "exceeding the provenance recording rate. Slowing down flow to
accommodate");
  
                  while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold)
{
 -                    try {
 -                        Thread.sleep(1000L);
 -                    } catch (final InterruptedException ie) {
 +                    if (repoSize > sizeThreshold) {
 +                        logger.debug("Provenance Repository has exceeded its size threshold;
will trigger purging of oldest events");
 +                        purgeOldEvents();
 +
 +                        journalFileCount = getJournalCount();
 +                        repoSize = getSize(getLogFiles(), 0L);
 +                        continue;
 +                    } else {
 +                        // if we are constrained by the number of journal files rather than
the size of the repo,
 +                        // then we will just sleep a bit because another thread is already
actively merging the journals,
 +                        // due to the runnable that we scheduled above
 +                        try {
 +                            Thread.sleep(100L);
 +                        } catch (final InterruptedException ie) {
 +                        }
                      }
  
                      logger.debug("Provenance Repository is still behind. Keeping flow slowed
down "
@@@ -1428,25 -1414,13 +1480,32 @@@
                                  recordToReaderMap.put(nextRecord, reader);
                              }
                          }
++
+                         indexWriter.commit();
+                     } catch (final Throwable t) {
+                         indexWriter.rollback();
+                         throw t;
 +                    } finally {
 +                        finishedAdding.set(true);
 +                        exec.shutdown();
 +                    }
 +
 +                    for (final Future<?> future : futures) {
 +                        try {
 +                            future.get();
 +                        } catch (final ExecutionException ee) {
 +                            final Throwable t = ee.getCause();
 +                            if (t instanceof RuntimeException) {
 +                                throw (RuntimeException) t;
 +                            }
 +
 +                            throw new RuntimeException(t);
 +                        } catch (final InterruptedException e) {
 +                            throw new RuntimeException("Thread interrupted");
 +                        }
                      }
+ 
+                     indexConfig.setMaxIdIndexed(maxId);
                  } finally {
                      indexManager.returnIndexWriter(indexingDirectory, indexWriter);
                  }


Mime
View raw message