nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Marc Parisi (Jira)" <j...@apache.org>
Subject [jira] [Created] (NIFI-6781) ThreadPoolRequestReplicator
Date Wed, 16 Oct 2019 16:47:00 GMT
Marc Parisi created NIFI-6781:
---------------------------------

             Summary: ThreadPoolRequestReplicator
                 Key: NIFI-6781
                 URL: https://issues.apache.org/jira/browse/NIFI-6781
             Project: Apache NiFi
          Issue Type: Improvement
          Components: Core Framework
            Reporter: Marc Parisi
            Assignee: Marc Parisi


I've noticed that replication is attempted locally. I tested a simple change to eliminate
the local node; however, I suspect this is not a big deal or I've missed something or the
cluster states does not include the local identifier. All tests allow for local instances
with different ports, implying that pruning is potentially unnecessary or incorrect logic.
Therefore I've created this as a an "Improvement" as I dive further into the code to validate
my change. If anyone has the immediate answer regarding this code I'm happy to close this
as OBE.

 
{code:java}
 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -217,11 +217,17 @@ public class ThreadPoolRequestReplicator implements RequestReplicator
{
             }
         }
 
-        final List<NodeIdentifier> nodeIds = stateMap.get(NodeConnectionState.CONNECTED);
+        // get nodes that do not match this node.
+        final List<NodeIdentifier> nodeIds = stateMap.get(NodeConnectionState.CONNECTED).stream().filter(x
-> {
+            return clusterCoordinator.getLocalNodeIdentifier() == null || x != clusterCoordinator.getLocalNodeIdentifier();
+        }).collect(Collectors.toList());
+
         if (nodeIds == null || nodeIds.isEmpty()) {
             throw new NoConnectedNodesException();
         }
 
+        logger.debug("Attempting to replicate to {} nodes", nodeIds.size());
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message