nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tamas Palfy (Jira)" <j...@apache.org>
Subject [jira] [Created] (NIFI-6787) Load balanced connection can hold up for whole cluster when one node slows down
Date Thu, 17 Oct 2019 16:25:00 GMT
Tamas Palfy created NIFI-6787:
---------------------------------

             Summary: Load balanced connection can hold up for whole cluster when one node
slows down
                 Key: NIFI-6787
                 URL: https://issues.apache.org/jira/browse/NIFI-6787
             Project: Apache NiFi
          Issue Type: Improvement
          Components: Core Framework
            Reporter: Tamas Palfy


A slow processor on one node in a cluster can slow down the same processor on the other nodes
when load balanced incoming connection is used:

When a processors decides wether it can pass along a flowfile, it checks whether the outgoing
connection is full or not (if full, it yields).
Similarly when the receiving processor is to be scheduled the framework checks if the incoming
connection is empty (if empty, no reason to call {{onTrigger}}).

The problem is that when checking if it's full or not, it checks the _total_ size (which is
across the whole cluster) and compares it to the max (which is scoped to the current node).
The empty check is (correctly) done on the local partition only.

This can lead to the case where the slow node fills up its queue while the faster ones empty
theirs.
Once the slow node has a full queue, the fast ones stop receiving new input and thus stop
working after their queues get emptied.

The issue is probably the fact that {{SocketLoadBalancedFlowFileQueue.isFull}} actually calls
to {{AbstractFlowFileQueue.isFull}} which checks {{size()}}, but that returns the _total_
size. (The empty check looks fine but for reference it is done via {{SocketLoadBalancedFlowFileQueue.isActiveQueueEmpty}}).

{code:title=AbstractFlowFileQueue.java|borderStyle=solid}
...
    private MaxQueueSize getMaxQueueSize() {
        return maxQueueSize.get();
    }

    @Override
    public boolean isFull() {
        return isFull(size());
    }

    protected boolean isFull(final QueueSize queueSize) {
        final MaxQueueSize maxSize = getMaxQueueSize();

        // Check if max size is set
        if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
            return false;
        }

        if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount())
{
            return true;
        }

        if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes())
{
            return true;
        }

        return false;
    }
...
{code}

{code:title=SocketLoadBalancedFlowFileQueue.java|borderStyle=solid}
...
    @Override
    public QueueSize size() {
        return totalSize.get();
    }
...
    @Override
    public boolean isActiveQueueEmpty() {
        return localPartition.isActiveQueueEmpty();
    }
...
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message