Return-Path: X-Original-To: apmail-nifi-commits-archive@minotaur.apache.org Delivered-To: apmail-nifi-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2BAA910BDF for ; Wed, 10 Dec 2014 16:54:34 +0000 (UTC) Received: (qmail 34171 invoked by uid 500); 10 Dec 2014 16:54:33 -0000 Delivered-To: apmail-nifi-commits-archive@nifi.apache.org Received: (qmail 34129 invoked by uid 500); 10 Dec 2014 16:54:33 -0000 Mailing-List: contact commits-help@nifi.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.incubator.apache.org Delivered-To: mailing list commits@nifi.incubator.apache.org Received: (qmail 34120 invoked by uid 99); 10 Dec 2014 16:54:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Dec 2014 16:54:33 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 10 Dec 2014 16:54:32 +0000 Received: (qmail 33990 invoked by uid 99); 10 Dec 2014 16:54:12 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Dec 2014 16:54:12 +0000 Date: Wed, 10 Dec 2014 16:54:12 +0000 (UTC) From: "Mark Payne (JIRA)" To: commits@nifi.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (NIFI-54) Event Driven Processors can be scheduled to use more threads than Max Concurrent Threads allows MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 X-Virus-Checked: Checked by ClamAV on apache.org [ https://issues.apache.org/jira/browse/NIFI-54?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Payne updated NIFI-54: --------------------------- Description: EventDrivenSchedulingAgent, when incrementing schedule state's active thread count should check the result and if it's greater than what is allowed should just decrement active thread count and return: This must be done for trigger of Connectable and ProcessorNode, and ScheduleState must return the new value, rather than being void. private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) { final int newThreadCount = scheduleState.incremenActiveThreadCount(); if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) { // its possible that the worker queue could give us a worker node that is eligible to run based // on the number of threads but another thread has already incremented the thread count, result in // reaching the maximum number of threads. we won't know this until we atomically increment the thread count // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would // result in using more than the maximum number of defined threads scheduleState.decrementActiveThreadCount(); return; } ... was: EventDrivenSchedulingAgent, when incrementing schedule state's active thread count should check the result and if it's greater then what is allowed should just decrement active thread count and return: This must be done for trigger of Connectable and ProcessorNode, and ScheduleState must return the new value, rather than being void. private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) { final int newThreadCount = scheduleState.incremenActiveThreadCount(); if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) { // its possible that the worker queue could give us a worker node that is eligible to run based // on the number of threads but another thread has already incremented the thread count, result in // reaching the maximum number of threads. we won't know this until we atomically increment the thread count // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would // result in using more than the maximum number of defined threads scheduleState.decrementActiveThreadCount(); return; } ... > Event Driven Processors can be scheduled to use more threads than Max Concurrent Threads allows > ----------------------------------------------------------------------------------------------- > > Key: NIFI-54 > URL: https://issues.apache.org/jira/browse/NIFI-54 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework > Reporter: Matt Gilman > Fix For: 0.0.1 > > > EventDrivenSchedulingAgent, when incrementing schedule state's active thread count should check the result and if it's greater than what is allowed should just decrement active thread count and return: > This must be done for trigger of Connectable and ProcessorNode, and ScheduleState must return the new value, rather than being void. > private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) { > final int newThreadCount = scheduleState.incremenActiveThreadCount(); > if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) { > // its possible that the worker queue could give us a worker node that is eligible to run based > // on the number of threads but another thread has already incremented the thread count, result in > // reaching the maximum number of threads. we won't know this until we atomically increment the thread count > // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would > // result in using more than the maximum number of defined threads > scheduleState.decrementActiveThreadCount(); > return; > } > ... -- This message was sent by Atlassian JIRA (v6.3.4#6332)