nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mark Payne (Jira)" <>
Subject [jira] [Commented] (NIFI-6787) Load balanced connection can hold up for whole cluster when one node slows down
Date Thu, 17 Oct 2019 17:19:00 GMT


Mark Payne commented on NIFI-6787:

Hey [~tpalfy] this is certainly something that is important to understand and ensure that
it is working optimally.

The statement "when checking if it's full or not, it checks the _total_ size (which is across
the whole cluster) and compares it to the max (which is scoped to the current node)" though
is not exactly accurate. The difference here between looking at the total queue size vs. local
queue size is important, and perhaps the naming convention used in the code is a bit confusing,
so let me try to explain that better.

The queue is divided into N "partitions" where N = the number of nodes in the cluster (+1
for the Rebalancing Partition, but that doesn't really come into play here). This is a "local
partition" and N-1 "Remote partitions". A "Remote Partition" is NOT a partition that lives
on another node, but rather a partition that is holding FlowFiles waiting to be pushed to
another node.

A Processor (or port or funnel) can only pull data from the Local Partition. It does not pull
a FlowFile that is waiting to be pushed to another node in the cluster. So this is why we
check if active queue is empty or not on the local partition.

But when we consider whether or not a queue is full, in order to provide backpressure, what
we are looking at is not the FlowFiles across the entire cluster - we are looking at the FlowFiles
in the Local Partition + the FlowFiles on this node that are waiting to go to other nodes
in the cluster. It's important to take this into account, because if we look only at whether
or not the Local Partition is full, we can get into a situation where backpressure never is
appropriately applied. Consider, for example, if the Partitioner is set to Load Balance to
a single Node in the cluster. Then, on most nodes, all data ends up in the RemoteQueuePartition,
not in the Local Partition. We still want to have backpressure applied in this instance.

If there is one slow node in the cluster, all other nodes should still behave nicely with
one another, though they will of course be limited in how well they can distributed data between
themselves and the slow node. I wonder if perhaps what you are running into can be described
by NIFI-6353, NIFI-6517, or NIFI-6736? If not, can you attach a template to help illustrate
the issue?

> Load balanced connection can hold up for whole cluster when one node slows down
> -------------------------------------------------------------------------------
>                 Key: NIFI-6787
>                 URL:
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Tamas Palfy
>            Assignee: Tamas Palfy
>            Priority: Major
> A slow processor on one node in a cluster can slow down the same processor on the other
nodes when load balanced incoming connection is used:
> When a processors decides wether it can pass along a flowfile, it checks whether the
outgoing connection is full or not (if full, it yields).
> Similarly when the receiving processor is to be scheduled the framework checks if the
incoming connection is empty (if empty, no reason to call {{onTrigger}}).
> The problem is that when checking if it's full or not, it checks the _total_ size (which
is across the whole cluster) and compares it to the max (which is scoped to the current node).
> The empty check is (correctly) done on the local partition only.
> This can lead to the case where the slow node fills up its queue while the faster ones
empty theirs.
> Once the slow node has a full queue, the fast ones stop receiving new input and thus
stop working after their queues get emptied.
> The issue is probably the fact that {{SocketLoadBalancedFlowFileQueue.isFull}} actually
calls to {{AbstractFlowFileQueue.isFull}} which checks {{size()}}, but that returns the _total_
size. (The empty check looks fine but for reference it is done via {{SocketLoadBalancedFlowFileQueue.isActiveQueueEmpty}}).
> {|borderStyle=solid}
> ...
>     private MaxQueueSize getMaxQueueSize() {
>         return maxQueueSize.get();
>     }
>     @Override
>     public boolean isFull() {
>         return isFull(size());
>     }
>     protected boolean isFull(final QueueSize queueSize) {
>         final MaxQueueSize maxSize = getMaxQueueSize();
>         // Check if max size is set
>         if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
>             return false;
>         }
>         if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >=
maxSize.getMaxCount()) {
>             return true;
>         }
>         if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes())
>             return true;
>         }
>         return false;
>     }
> ...
> {code}
> {|borderStyle=solid}
> ...
>     @Override
>     public QueueSize size() {
>         return totalSize.get();
>     }
> ...
>     @Override
>     public boolean isActiveQueueEmpty() {
>         return localPartition.isActiveQueueEmpty();
>     }
> ...
> {code}

This message was sent by Atlassian Jira

View raw message