nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbe...@apache.org
Subject [nifi] branch master updated: NIFI-6736: Create thread on demand to handle incoming request from client for load balancing. This allows us to avoid situations where we don't have enough threads and we block on the server side, waiting for data, when clients are trying to send data in another connection
Date Wed, 02 Oct 2019 18:29:30 GMT
This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 99cf87c  NIFI-6736: Create thread on demand to handle incoming request from client
for load balancing. This allows us to avoid situations where we don't have enough threads
and we block on the server side, waiting for data, when clients are trying to send data in
another connection
99cf87c is described below

commit 99cf87c330a2b27757cb188a4e806a46c31ecd1b
Author: Mark Payne <markap14@hotmail.com>
AuthorDate: Wed Oct 2 09:35:46 2019 -0400

    NIFI-6736: Create thread on demand to handle incoming request from client for load balancing.
This allows us to avoid situations where we don't have enough threads and we block on the
server side, waiting for data, when clients are trying to send data in another connection
    
    This closes #3784.
    
    Signed-off-by: Bryan Bende <bbende@apache.org>
---
 .../controller/queue/SwappablePriorityQueue.java   | 27 +++++++
 .../async/nio/NioAsyncLoadBalanceClient.java       | 45 ++++++-----
 .../clustered/partition/RemoteQueuePartition.java  |  9 ++-
 .../server/ConnectionLoadBalanceServer.java        | 90 ++++++++--------------
 .../clustered/server/LoadBalanceProtocol.java      |  8 +-
 .../server/StandardLoadBalanceProtocol.java        |  7 +-
 6 files changed, 99 insertions(+), 87 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index 8613130..b81bd3f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -410,6 +410,33 @@ public class SwappablePriorityQueue {
         return getFlowFileQueueSize().isEmpty();
     }
 
+    public boolean isFlowFileAvailable() {
+        if (isEmpty()) {
+            return false;
+        }
+
+        readLock.lock();
+        try {
+            // If we have data in the active or swap queue that is penalized, then we know
that all FlowFiles
+            // are penalized. As a result, we can say that no FlowFile is available.
+            FlowFileRecord firstRecord = activeQueue.peek();
+            if (firstRecord == null && !swapQueue.isEmpty()) {
+                firstRecord = swapQueue.get(0);
+            }
+
+            if (firstRecord == null) {
+                // If the queue is not empty, then all data is swapped out. We don't actually
know whether or not the swapped out data is penalized, so we assume
+                // that it is not penalized and is therefore available.
+                return !isEmpty();
+            }
+
+            // We do have a FlowFile that was retrieved from the active or swap queue. It
is available if it is not penalized.
+            return !firstRecord.isPenalized();
+        } finally {
+            readLock.unlock("isFlowFileAvailable");
+        }
+    }
+
     public boolean isActiveQueueEmpty() {
         final FlowFileQueueSize queueSize = getFlowFileQueueSize();
         return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() ==
0;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
index 855db8e..1257a3c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
@@ -385,38 +385,37 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient
{
         return selector != null && channel != null && channel.isConnected();
     }
 
-    private synchronized void establishConnection() throws IOException {
+    private void establishConnection() throws IOException {
         SocketChannel socketChannel = null;
 
         try {
-            selector = Selector.open();
-            socketChannel = createChannel();
+            final PeerChannel peerChannel;
+            synchronized (this) {
+                if (isConnectionEstablished()) {
+                    return;
+                }
 
-            socketChannel.configureBlocking(true);
+                selector = Selector.open();
+                socketChannel = createChannel();
+                socketChannel.configureBlocking(true);
 
-            channel = createPeerChannel(socketChannel, nodeIdentifier.toString());
-            channel.performHandshake();
+                peerChannel = createPeerChannel(socketChannel, nodeIdentifier.toString());
+                channel = peerChannel;
+            }
 
-            socketChannel.configureBlocking(false);
-            selectionKey = socketChannel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
-        } catch (Exception e) {
-            logger.error("Unable to connect to {} for load balancing", nodeIdentifier, e);
+            // Perform handshake outside of the synchronized block. We do this because if
the server-side is not very responsive,
+            // the handshake may take some time. We don't want to block any other threads
from interacting with this Client in
+            // the meantime, especially web threads that may be calling #register or #unregister.
+            peerChannel.performHandshake();
 
-            if (selector != null) {
-                try {
-                    selector.close();
-                } catch (final Exception e1) {
-                    e.addSuppressed(e1);
-                }
+            synchronized (this) {
+                socketChannel.configureBlocking(false);
+                selectionKey = socketChannel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
             }
+        } catch (Exception e) {
+            logger.error("Unable to connect to {} for load balancing", nodeIdentifier, e);
 
-            if (channel != null) {
-                try {
-                    channel.close();
-                } catch (final Exception e1) {
-                    e.addSuppressed(e1);
-                }
-            }
+            close();
 
             if (socketChannel != null) {
                 try {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
index 106ab26..144a043 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
@@ -56,6 +56,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
 import java.util.stream.Collectors;
 
 /**
@@ -214,7 +215,13 @@ public class RemoteQueuePartition implements QueuePartition {
             }
         };
 
-        clientRegistry.register(flowFileQueue.getIdentifier(), nodeIdentifier, priorityQueue::isEmpty,
this::getFlowFile,
+        // Consider the queue empty unless a FlowFile is available. This means that if the
queue has only penalized FlowFiles, it will be considered empty.
+        // This is what we want for the purpose of load balancing the data. Otherwise, we
would have a situation where we create a connection to the other node,
+        // determine that now FlowFile is available to send, and then notify the node of
this and close the connection. And then this would repeat over and over
+        // until the FlowFile is no longer penalized. Instead, we want to consider the queue
empty until a FlowFile is actually available, and only then bother
+        // creating the connection to send data.
+        final BooleanSupplier emptySupplier = () -> !priorityQueue.isFlowFileAvailable();
+        clientRegistry.register(flowFileQueue.getIdentifier(), nodeIdentifier, emptySupplier,
this::getFlowFile,
             failureCallback, successCallback, flowFileQueue::getLoadBalanceCompression, flowFileQueue::isPropagateBackpressureAcrossNodes);
 
         running = true;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
index 93fc2d7..49094c5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
@@ -17,7 +17,6 @@
 
 package org.apache.nifi.controller.queue.clustered.server;
 
-import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.reporting.Severity;
 import org.slf4j.Logger;
@@ -25,34 +24,33 @@ import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLServerSocket;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class ConnectionLoadBalanceServer {
     private static final Logger logger = LoggerFactory.getLogger(ConnectionLoadBalanceServer.class);
+    private static final AtomicLong threadCounter = new AtomicLong(1L);
 
     private final String hostname;
     private final int port;
     private final SSLContext sslContext;
-    private final ExecutorService threadPool;
     private final LoadBalanceProtocol loadBalanceProtocol;
     private final int connectionTimeoutMillis;
-    private final int numThreads;
     private final EventReporter eventReporter;
 
-    private volatile Set<CommunicateAction> communicationActions = Collections.emptySet();
-    private final BlockingQueue<Socket> connectionQueue = new LinkedBlockingQueue<>();
+    private final List<CommunicateAction> communicationActions = Collections.synchronizedList(new
ArrayList<>());
 
     private volatile AcceptConnection acceptConnection;
     private volatile ServerSocket serverSocket;
@@ -65,10 +63,7 @@ public class ConnectionLoadBalanceServer {
         this.sslContext = sslContext;
         this.loadBalanceProtocol = loadBalanceProtocol;
         this.connectionTimeoutMillis = connectionTimeoutMillis;
-        this.numThreads = numThreads;
         this.eventReporter = eventReporter;
-
-        threadPool = new FlowEngine(numThreads, "Load Balance Server");
     }
 
     public void start() throws IOException {
@@ -88,15 +83,6 @@ public class ConnectionLoadBalanceServer {
                     "'nifi.cluster.load.balance.port' and 'nifi.cluster.load.balance.host'
properties as well as the 'nifi.security.*' properties", e);
         }
 
-        final Set<CommunicateAction> actions = new HashSet<>(numThreads);
-        for (int i=0; i < numThreads; i++) {
-            final CommunicateAction action = new CommunicateAction(loadBalanceProtocol);
-            actions.add(action);
-            threadPool.submit(action);
-        }
-
-        this.communicationActions = actions;
-
         acceptConnection = new AcceptConnection(serverSocket);
         final Thread receiveConnectionThread = new Thread(acceptConnection);
         receiveConnectionThread.setName("Receive Queue Load-Balancing Connections");
@@ -109,22 +95,15 @@ public class ConnectionLoadBalanceServer {
 
     public void stop() {
         stopped = false;
-        threadPool.shutdown();
 
         if (acceptConnection != null) {
             acceptConnection.stop();
         }
 
-        communicationActions.forEach(CommunicateAction::stop);
-
-        Socket socket;
-        while ((socket = connectionQueue.poll()) != null) {
-            try {
-                socket.close();
-                logger.info("{} Closed connection to {} on Server stop", this, socket.getRemoteSocketAddress());
-            } catch (final IOException ioe) {
-                logger.warn("Failed to properly close socket to " + socket.getRemoteSocketAddress(),
ioe);
-            }
+        final Iterator<CommunicateAction> itr = communicationActions.iterator();
+        while (itr.hasNext()) {
+            itr.next().stop();
+            itr.remove();
         }
     }
 
@@ -143,10 +122,18 @@ public class ConnectionLoadBalanceServer {
 
     private class CommunicateAction implements Runnable {
         private final LoadBalanceProtocol loadBalanceProtocol;
+        private final Socket socket;
+        private final InputStream in;
+        private final OutputStream out;
+
         private volatile boolean stopped = false;
 
-        public CommunicateAction(final LoadBalanceProtocol loadBalanceProtocol) {
+        public CommunicateAction(final LoadBalanceProtocol loadBalanceProtocol, final Socket
socket) throws IOException {
             this.loadBalanceProtocol = loadBalanceProtocol;
+            this.socket = socket;
+
+            this.in = new BufferedInputStream(socket.getInputStream());
+            this.out = new BufferedOutputStream(socket.getOutputStream());
         }
 
         public void stop() {
@@ -158,28 +145,15 @@ public class ConnectionLoadBalanceServer {
             String peerDescription = "<Unknown Client>";
 
             while (!stopped) {
-                Socket socket = null;
                 try {
-                    socket = connectionQueue.poll(1, TimeUnit.SECONDS);
-                    if (socket == null) {
-                        continue;
-                    }
-
                     peerDescription = socket.getRemoteSocketAddress().toString();
 
-                    if (socket.isClosed()) {
-                        logger.debug("Connection to Peer {} is closed. Will not attempt to
communicate over this Socket.", peerDescription);
-                        continue;
-                    }
-
                     logger.debug("Receiving FlowFiles from Peer {}", peerDescription);
-                    loadBalanceProtocol.receiveFlowFiles(socket);
+                    loadBalanceProtocol.receiveFlowFiles(socket, in, out);
 
-                    if (socket.isConnected()) {
-                        logger.debug("Finished receiving FlowFiles from Peer {}. Will recycle
connection.", peerDescription);
-                        connectionQueue.offer(socket);
-                    } else {
-                        logger.debug("Finished receiving FlowFiles from Peer {}. Socket is
no longer connected so will not recycle connection.", peerDescription);
+                    if (socket.isClosed()) {
+                        logger.debug("Finished Receiving FlowFiles from Peer {}", peerDescription);
+                        break;
                     }
                 } catch (final Exception e) {
                     if (socket != null) {
@@ -194,8 +168,6 @@ public class ConnectionLoadBalanceServer {
                     eventReporter.reportEvent(Severity.ERROR, "Load Balanced Connection",
"Failed to receive FlowFiles for Load Balancing due to " + e);
                 }
             }
-
-            logger.info("Connection Load Balance Server shutdown. Will no longer handle incoming
requests.");
         }
     }
 
@@ -230,7 +202,13 @@ public class ConnectionLoadBalanceServer {
                     }
 
                     socket.setSoTimeout(connectionTimeoutMillis);
-                    connectionQueue.offer(socket);
+
+                    final CommunicateAction communicateAction = new CommunicateAction(loadBalanceProtocol,
socket);
+                    final Thread commsThread = new Thread(communicateAction);
+                    commsThread.setName("Load-Balance Server Thread-" + threadCounter.getAndIncrement());
+                    commsThread.start();
+
+                    communicationActions.add(communicateAction);
                 } catch (final Exception e) {
                     logger.error("{} Failed to accept connection from other node in cluster",
ConnectionLoadBalanceServer.this, e);
                 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java
index 5a74ebc..d5d0778 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java
@@ -18,6 +18,8 @@
 package org.apache.nifi.controller.queue.clustered.server;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.Socket;
 
 public interface LoadBalanceProtocol {
@@ -26,10 +28,14 @@ public interface LoadBalanceProtocol {
      * Receives FlowFiles from the peer attached to the socket
      *
      * @param socket the socket to read from and write to
+     * @param in the InputStream to read from. The Socket's InputStream is wrapped with a
BufferedInputStream, which is provided
+     * here. If this method were to wrap the InputStream itself, a second call to the method
may discard some data that was consumed
+     * by the previous call's BufferedInputStream
+     * @param out the OutputStream to write to
      *
      * @throws TransactionAbortedException if the transaction was aborted
      * @throws IOException if unable to communicate with the peer
      */
-    void receiveFlowFiles(Socket socket) throws IOException;
+    void receiveFlowFiles(Socket socket, InputStream in, OutputStream out) throws IOException;
 
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
index 2168f3e..d9fdd2a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
@@ -46,8 +46,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLSocket;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -112,10 +110,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol
{
 
 
     @Override
-    public void receiveFlowFiles(final Socket socket) throws IOException {
-        final InputStream in = new BufferedInputStream(socket.getInputStream());
-        final OutputStream out = new BufferedOutputStream(socket.getOutputStream());
-
+    public void receiveFlowFiles(final Socket socket, final InputStream in, final OutputStream
out) throws IOException {
         String peerDescription = socket.getInetAddress().getHostName();
         if (socket instanceof SSLSocket) {
             logger.debug("Connection received from peer {}", peerDescription);


Mime
View raw message