nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [2/2] nifi git commit: NIFI-2452: This closes #771. Ensure that we keep track of how many references we have to each lucene searcher and only close the underlying index reader if there are no references to the searcher. Also updated to prefer newer pro
Date Wed, 03 Aug 2016 15:05:41 GMT
NIFI-2452: This closes #771.  Ensure that we keep track of how many references we have to each
lucene searcher and only close the underlying index reader if there are no references to the
searcher.  Also updated to prefer newer provenance events over older provenance events, and
calculate FlowFile lineage based on an event id instead of a FlowFile UUID, as it's much more
efficient


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/16348b07
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/16348b07
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/16348b07

Branch: refs/heads/master
Commit: 16348b071d361697f517397b2da2ea32c5599c11
Parents: e9b87dd
Author: Mark Payne <markap14@hotmail.com>
Authored: Tue Aug 2 09:56:05 2016 -0400
Committer: joewitt <joewitt@apache.org>
Committed: Wed Aug 3 08:05:26 2016 -0700

----------------------------------------------------------------------
 .../nifi/provenance/ProvenanceRepository.java   |  23 +++
 .../provenance/MockProvenanceRepository.java    |   5 +
 .../heartbeat/AbstractHeartbeatMonitor.java     |  28 ++--
 .../apache/nifi/controller/FlowController.java  | 139 ++++++++-----------
 .../src/main/resources/conf/logback.xml         |   1 +
 .../apache/nifi/web/api/ProvenanceResource.java |   4 +-
 .../nifi/web/controller/ControllerFacade.java   |   6 +-
 .../js/nf/provenance/nf-provenance-lineage.js   |   3 +-
 .../PersistentProvenanceRepository.java         |  23 +++
 .../nifi/provenance/lucene/DocsReader.java      |   7 +-
 .../nifi/provenance/lucene/IndexManager.java    |   3 +
 .../TestPersistentProvenanceRepository.java     |   3 +-
 .../VolatileProvenanceRepository.java           |  16 +++
 13 files changed, 160 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/16348b07/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java
b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java
index 7ac1a65..6a5954a 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java
@@ -107,6 +107,29 @@ public interface ProvenanceRepository extends ProvenanceEventRepository
{
     ComputeLineageSubmission submitLineageComputation(String flowFileUuid, NiFiUser user);
 
     /**
+     * Submits a Lineage Computation to be completed and returns the
+     * AsynchronousLineageResult that indicates the status of the request and
+     * the results, if the computation is complete. If the given user does not
+     * have authorization to view one of the events in the lineage, a placeholder
+     * event will be used instead that provides none of the event details except
+     * for the identifier of the component that emitted the Provenance Event. It is
+     * necessary to include this node in the lineage view so that the lineage makes
+     * sense, rather than showing disconnected graphs when the user is not authorized
+     * for all components' provenance events.
+     *
+     * This method is preferred to {@link #submitLineageComputation(String, NiFiUser)} because
+     * it is much more efficient, but the former may be used if a particular Event ID is
not
+     * available.
+     *
+     * @param eventId the numeric ID of the event that the lineage is for
+     * @param user the NiFi User to authorize events against
+     *
+     * @return a {@link ComputeLineageSubmission} object that can be used to
+     *         check if the computing is complete and if so get the results
+     */
+    ComputeLineageSubmission submitLineageComputation(long eventId, NiFiUser user);
+
+    /**
      * @param lineageIdentifier identifier of lineage to compute
      * @param user the user who is retrieving the lineage submission
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/16348b07/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java
b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java
index a13a338..9bc5f0e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java
@@ -100,6 +100,11 @@ public class MockProvenanceRepository implements ProvenanceRepository
{
     }
 
     @Override
+    public ComputeLineageSubmission submitLineageComputation(long eventId, NiFiUser user)
{
+        throw new UnsupportedOperationException("MockProvenanceRepository does not support
Lineage Computation");
+    }
+
+    @Override
     public ComputeLineageSubmission retrieveLineageSubmission(String lineageIdentifier, NiFiUser
user) {
         throw new UnsupportedOperationException("MockProvenanceRepository does not support
Lineage Computation");
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/16348b07/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 0bd84d6..22c9ab5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -203,16 +203,24 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor
{
             final DisconnectionCode disconnectionCode = connectionStatus.getDisconnectCode();
 
             // Determine whether or not the node should be allowed to be in the cluster still,
depending on its reason for disconnection.
-            if (disconnectionCode == DisconnectionCode.LACK_OF_HEARTBEAT || disconnectionCode
== DisconnectionCode.UNABLE_TO_COMMUNICATE) {
-                clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat
from node previously "
-                    + "disconnected due to " + disconnectionCode + ". Issuing reconnection
request.");
-
-                clusterCoordinator.requestNodeConnect(nodeId, null);
-            } else {
-                // disconnected nodes should not heartbeat, so we need to issue a disconnection
request.
-                logger.info("Ignoring received heartbeat from disconnected node " + nodeId
+ ".  Issuing disconnection request.");
-                clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE,
DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE.toString());
-                removeHeartbeat(nodeId);
+            switch (disconnectionCode) {
+                case LACK_OF_HEARTBEAT:
+                case UNABLE_TO_COMMUNICATE:
+                case NODE_SHUTDOWN:
+                case NOT_YET_CONNECTED:
+                case STARTUP_FAILURE: {
+                    clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat
from node previously "
+                        + "disconnected due to " + disconnectionCode + ". Issuing reconnection
request.");
+
+                    clusterCoordinator.requestNodeConnect(nodeId, null);
+                }
+                default: {
+                    // disconnected nodes should not heartbeat, so we need to issue a disconnection
request.
+                    logger.info("Ignoring received heartbeat from disconnected node " + nodeId
+ ".  Issuing disconnection request.");
+                    clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE,
+                        DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE.toString());
+                    removeHeartbeat(nodeId);
+                }
             }
 
             return;

http://git-wip-us.apache.org/repos/asf/nifi/blob/16348b07/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index f6973a5..b7b32ad 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,7 +16,38 @@
  */
 package org.apache.nifi.controller;
 
-import com.sun.jersey.api.client.ClientHandlerException;
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.net.ssl.SSLContext;
+
 import org.apache.commons.collections4.Predicate;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
@@ -183,10 +214,10 @@ import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.stream.io.LimitingInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.ComponentIdGenerator;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
-import org.apache.nifi.util.ComponentIdGenerator;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
@@ -208,37 +239,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.Objects.requireNonNull;
-import static java.util.Objects.requireNonNull;
+import com.sun.jersey.api.client.ClientHandlerException;
 
 public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider,
QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider {
 
@@ -303,7 +304,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
     private final Integer remoteInputHttpPort;
     private final Boolean isSiteToSiteSecure;
 
-    private ProcessGroup rootGroup;
+    private final AtomicReference<ProcessGroup> rootGroupRef = new AtomicReference<>();
     private final List<Connectable> startConnectablesAfterInitialization;
     private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization;
     private final LeaderElectionManager leaderElectionManager;
@@ -516,9 +517,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
 
         this.snippetManager = new SnippetManager();
 
-        rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(),
this, processScheduler,
+
+        final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(),
this, processScheduler,
             properties, encryptor, this, this.variableRegistry);
         rootGroup.setName(DEFAULT_ROOT_GROUP_NAME);
+        rootGroupRef.set(rootGroup);
         instanceId = ComponentIdGenerator.generateId().toString();
 
         controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler,
bulletinRepository, stateManagerProvider, this.variableRegistry);
@@ -1197,6 +1200,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
         return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(),
null, this, sslContext);
     }
 
+    public ProcessGroup getRootGroup() {
+        return rootGroupRef.get();
+    }
+
     /**
      * Verifies that no output port exists with the given id or name. If this
      * does not hold true, throws an IllegalStateException
@@ -1205,6 +1212,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
      * @throws IllegalStateException port already exists
      */
     private void verifyPortIdDoesNotExist(final String id) {
+        final ProcessGroup rootGroup = getRootGroup();
         Port port = rootGroup.findOutputPort(id);
         if (port != null) {
             throw new IllegalStateException("An Input Port already exists with ID " + id);
@@ -1220,12 +1228,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
      * Group.
      */
     public String getName() {
-        readLock.lock();
-        try {
-            return rootGroup.getName();
-        } finally {
-            readLock.unlock();
-        }
+        return getRootGroup().getName();
     }
 
     /**
@@ -1235,12 +1238,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
      * @param name of root group
      */
     public void setName(final String name) {
-        readLock.lock();
-        try {
-            rootGroup.setName(name);
-        } finally {
-            readLock.unlock();
-        }
+        getRootGroup().setName(name);
     }
 
     /**
@@ -1248,12 +1246,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
      * Root Group
      */
     public String getComments() {
-        readLock.lock();
-        try {
-            return rootGroup.getComments();
-        } finally {
-            readLock.unlock();
-        }
+        return getRootGroup().getComments();
     }
 
     /**
@@ -1263,12 +1256,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
      * the controller
      */
     public void setComments(final String comments) {
-        readLock.lock();
-        try {
-            rootGroup.setComments(comments);
-        } finally {
-            readLock.unlock();
-        }
+        getRootGroup().setComments(comments);
     }
 
     /**
@@ -1327,7 +1315,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
             }
 
             // Trigger any processors' methods marked with @OnShutdown to be called
-            rootGroup.shutdown();
+            getRootGroup().shutdown();
 
             stateManagerProvider.shutdown();
 
@@ -1495,12 +1483,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
      * @return the ID of the root group
      */
     public String getRootGroupId() {
-        readLock.lock();
-        try {
-            return rootGroup.getIdentifier();
-        } finally {
-            readLock.unlock();
-        }
+        return getRootGroup().getIdentifier();
     }
 
     /**
@@ -1519,14 +1502,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
 
         writeLock.lock();
         try {
-            rootGroup = group;
-
+            rootGroupRef.set(group);
             for (final RemoteSiteListener listener : externalSiteListeners) {
-                listener.setRootGroup(rootGroup);
+                listener.setRootGroup(group);
             }
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary()));
+            this.heartbeatBeanRef.set(new HeartbeatBean(group, isPrimary()));
         } finally {
             writeLock.unlock();
         }
@@ -2198,14 +2180,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
      */
     public ProcessGroup getGroup(final String id) {
         requireNonNull(id);
-        final ProcessGroup root;
-        readLock.lock();
-        try {
-            root = rootGroup;
-        } finally {
-            readLock.unlock();
-        }
-
+        final ProcessGroup root = getRootGroup();
         final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id;
         return root == null ? null : root.findProcessGroup(searchId);
     }
@@ -3458,7 +3433,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
             }
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary()));
+            this.heartbeatBeanRef.set(new HeartbeatBean(getRootGroup(), isPrimary()));
         } finally {
             writeLock.unlock();
         }
@@ -3876,7 +3851,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
             this.connectionStatus = connectionStatus;
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary()));
+            this.heartbeatBeanRef.set(new HeartbeatBean(getRootGroup(), isPrimary()));
         } finally {
             rwLock.writeLock().unlock();
         }
@@ -4018,9 +3993,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
         // is set to the root group and otherwise assume that the ID is that of a component.
         final DataAuthorizable authorizable;
         if (rootGroupId.equals(componentId)) {
-            authorizable = new DataAuthorizable(rootGroup);
+            authorizable = new DataAuthorizable(getRootGroup());
         } else {
-            final Connectable connectable = rootGroup.findConnectable(componentId);
+            final Connectable connectable = getRootGroup().findConnectable(componentId);
 
             if (connectable == null) {
                 throw new ResourceNotFoundException("The component that generated this event
is no longer part of the data flow.");

http://git-wip-us.apache.org/repos/asf/nifi/blob/16348b07/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
index f67a6cd..871265e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
@@ -98,6 +98,7 @@
     <logger name="org.apache.zookeeper.ZooKeeper" level="ERROR" />
 
     <logger name="org.apache.curator.framework.recipes.leader.LeaderSelector" level="OFF"
/>
+    <logger name="org.apache.curator.ConnectionState" level="OFF" />
     
     <!-- Logger for managing logging statements for nifi clusters. -->
     <logger name="org.apache.nifi.cluster" level="INFO"/>

http://git-wip-us.apache.org/repos/asf/nifi/blob/16348b07/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java
index 24e303d..e618e8a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java
@@ -464,8 +464,8 @@ public class ProvenanceResource extends ApplicationResource {
                 break;
             case FLOWFILE:
                 // ensure the uuid has been specified
-                if (requestDto.getUuid() == null) {
-                    throw new IllegalArgumentException("The flowfile uuid must be specified
when the event type is FLOWFILE.");
+                if (requestDto.getUuid() == null && requestDto.getEventId() == null)
{
+                    throw new IllegalArgumentException("The flowfile uuid or event id must
be specified when the event type is FLOWFILE.");
                 }
                 break;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/16348b07/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 2152c76..bed66ab 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -1013,7 +1013,11 @@ public class ControllerFacade implements Authorizable {
         // submit the event
         if (LineageRequestType.FLOWFILE.equals(requestDto.getLineageRequestType())) {
             // submit uuid
-            result = provenanceRepository.submitLineageComputation(requestDto.getUuid(),
NiFiUserUtils.getNiFiUser());
+            if (requestDto.getEventId() == null) {
+                result = provenanceRepository.submitLineageComputation(requestDto.getUuid(),
NiFiUserUtils.getNiFiUser());
+            } else {
+                result = provenanceRepository.submitLineageComputation(requestDto.getEventId(),
NiFiUserUtils.getNiFiUser());
+            }
         } else {
             // submit event... (parents or children)
             if (LineageRequestType.PARENTS.equals(requestDto.getLineageRequestType())) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/16348b07/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-lineage.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-lineage.js
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-lineage.js
index 8287bff..6b038c7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-lineage.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-lineage.js
@@ -1293,7 +1293,8 @@ nf.ng.ProvenanceLineage = function () {
             var lineageRequest = {
                 lineageRequestType: 'FLOWFILE',
                 uuid: flowFileUuid,
-                clusterNodeId: clusterNodeId
+                clusterNodeId: clusterNodeId,
+                eventId: eventId
             };
 
             // update the progress bar value

http://git-wip-us.apache.org/repos/asf/nifi/blob/16348b07/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 87b617f..f8bb667 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -35,6 +35,7 @@ import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.provenance.expiration.ExpirationAction;
 import org.apache.nifi.provenance.expiration.FileRemovalAction;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
 import org.apache.nifi.provenance.lineage.FlowFileLineage;
 import org.apache.nifi.provenance.lineage.Lineage;
 import org.apache.nifi.provenance.lineage.LineageComputationType;
@@ -2170,6 +2171,28 @@ public class PersistentProvenanceRepository implements ProvenanceRepository
{
     }
 
     @Override
+    public ComputeLineageSubmission submitLineageComputation(final long eventId, final NiFiUser
user) {
+        final ProvenanceEventRecord event;
+        try {
+            event = getEvent(eventId);
+        } catch (final Exception e) {
+            logger.error("Failed to retrieve Provenance Event with ID " + eventId + " to
calculate data lineage due to: " + e, e);
+            final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE,
eventId, Collections.<String> emptySet(), 1, user.getIdentity());
+            result.getResult().setError("Failed to retrieve Provenance Event with ID " +
eventId + ". See logs for more information.");
+            return result;
+        }
+
+        if (event == null) {
+            final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE,
eventId, Collections.<String> emptySet(), 1, user.getIdentity());
+            result.getResult().setError("Could not find Provenance Event with ID " + eventId);
+            lineageSubmissionMap.put(result.getLineageIdentifier(), result);
+            return result;
+        }
+
+        return submitLineageComputation(Collections.singleton(event.getFlowFileUuid()), user,
LineageComputationType.FLOWFILE_LINEAGE, eventId, event.getLineageStartDate(), Long.MAX_VALUE);
+    }
+
+    @Override
     public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid, final
NiFiUser user) {
         return submitLineageComputation(Collections.singleton(flowFileUuid), user, LineageComputationType.FLOWFILE_LINEAGE,
null, 0L, Long.MAX_VALUE);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/16348b07/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index e448f27..ce62152 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -55,11 +55,12 @@ class DocsReader {
         }
 
         final long start = System.nanoTime();
-        final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults);
+        final ScoreDoc[] scoreDocs = topDocs.scoreDocs;
+        final int numDocs = Math.min(scoreDocs.length, maxResults);
         final List<Document> docs = new ArrayList<>(numDocs);
 
-        for (final ScoreDoc scoreDoc : topDocs.scoreDocs) {
-            final int docId = scoreDoc.doc;
+        for (int i = numDocs - 1; i >= 0; i--) {
+            final int docId = scoreDocs[i].doc;
             final Document d = indexReader.document(docId);
             docs.add(d);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/16348b07/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 07cd190..b93d3b7 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -234,6 +234,9 @@ public class IndexManager implements Closeable {
                 }
             }
 
+            // We found no cached Index Readers. Create a new one. To do this, we need to
check
+            // if we have an Index Writer, and if so create a Reader based on the Index Writer.
+            // This will provide us a 'near real time' index reader.
             final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
             if ( writerCount == null ) {
                 final Directory directory = FSDirectory.open(absoluteFile);

http://git-wip-us.apache.org/repos/asf/nifi/blob/16348b07/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index b78dfcd..12f4a73 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -485,8 +485,7 @@ public class TestPersistentProvenanceRepository {
         assertTrue(newRecordSet.getMatchingEvents().isEmpty());
     }
 
-    // TODO: Switch to 10,000.
-    @Test(timeout = 1000000)
+    @Test(timeout = 10000)
     public void testModifyIndexWhileSearching() throws IOException, InterruptedException,
ParseException {
         final RepositoryConfiguration config = createConfiguration();
         config.setMaxRecordLife(30, TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/nifi/blob/16348b07/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
index 10026cf..79f7d9f 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
@@ -181,6 +181,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository
{
         return records.isEmpty() ? null : records.get(0);
     }
 
+    @Override
     public ProvenanceEventRecord getEvent(final long id) {
         final List<ProvenanceEventRecord> records = ringBuffer.getSelectedElements(new
Filter<ProvenanceEventRecord>() {
             @Override
@@ -192,6 +193,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository
{
         return records.isEmpty() ? null : records.get(0);
     }
 
+    @Override
     public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) {
         final ProvenanceEventRecord event = getEvent(id);
         if (event == null) {
@@ -474,6 +476,20 @@ public class VolatileProvenanceRepository implements ProvenanceRepository
{
     }
 
     @Override
+    public ComputeLineageSubmission submitLineageComputation(final long eventId, final NiFiUser
user) {
+        final ProvenanceEventRecord event = getEvent(eventId);
+        if (event == null) {
+            final String userId = user.getIdentity();
+            final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE,
eventId, Collections.<String> emptySet(), 1, userId);
+            result.getResult().setError("Could not find event with ID " + eventId);
+            lineageSubmissionMap.put(result.getLineageIdentifier(), result);
+            return result;
+        }
+
+        return submitLineageComputation(event.getFlowFileUuid(), user);
+    }
+
+    @Override
     public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid, final
NiFiUser user) {
         return submitLineageComputation(Collections.singleton(flowFileUuid), user, LineageComputationType.FLOWFILE_LINEAGE,
null);
     }


Mime
View raw message