Return-Path: X-Original-To: apmail-nifi-commits-archive@minotaur.apache.org Delivered-To: apmail-nifi-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 19DBE17E94 for ; Mon, 2 Mar 2015 04:04:29 +0000 (UTC) Received: (qmail 92379 invoked by uid 500); 2 Mar 2015 04:04:16 -0000 Delivered-To: apmail-nifi-commits-archive@nifi.apache.org Received: (qmail 92320 invoked by uid 500); 2 Mar 2015 04:04:16 -0000 Mailing-List: contact commits-help@nifi.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.incubator.apache.org Delivered-To: mailing list commits@nifi.incubator.apache.org Received: (qmail 92311 invoked by uid 99); 2 Mar 2015 04:04:16 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Mar 2015 04:04:16 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 02 Mar 2015 04:03:48 +0000 Received: (qmail 88522 invoked by uid 99); 2 Mar 2015 04:03:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Mar 2015 04:03:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6EF7FE10B2; Mon, 2 Mar 2015 04:03:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: joewitt@apache.org To: commits@nifi.incubator.apache.org Date: Mon, 02 Mar 2015 04:04:12 -0000 Message-Id: <2dccb07c3bda4ae2a89767639510e26b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [33/50] [abbrv] incubator-nifi git commit: NIFI-381: Ensure that we always properly account for number of active threads X-Virus-Checked: Checked by ClamAV on apache.org NIFI-381: Ensure that we always properly account for number of active threads Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/1af8c1e2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/1af8c1e2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/1af8c1e2 Branch: refs/heads/NIFI-360 Commit: 1af8c1e22a32b2e4024a655a31735be1d170b5df Parents: ca23ad8 Author: Mark Payne Authored: Wed Feb 25 14:07:21 2015 -0500 Committer: Mark Payne Committed: Wed Feb 25 14:07:21 2015 -0500 ---------------------------------------------------------------------- .../tasks/ContinuallyRunProcessorTask.java | 50 ++++++++++---------- 1 file changed, 26 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1af8c1e2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java index f4be855..cff8744 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java @@ -159,31 +159,33 @@ public class ContinuallyRunProcessorTask implements Callable { procNode.yield(schedulingAgent.getAdministrativeYieldDuration(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); } } finally { - if (batch) { - rawSession.commit(); - } - - final long processingNanos = System.nanoTime() - startNanos; - - // if the processor is no longer scheduled to run and this is the last thread, - // invoke the OnStopped methods - if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { - try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, procNode.getProcessor(), processContext); - flowController.heartbeat(); - } - } - - scheduleState.decrementActiveThreadCount(); - try { - final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(procNode.getIdentifier()); - procEvent.setProcessingNanos(processingNanos); - procEvent.setInvocations(invocationCount); - context.getFlowFileEventRepository().updateRepository(procEvent); - } catch (final IOException e) { - logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", procNode.getProcessor(), e.toString()); - logger.error("", e); + if (batch) { + rawSession.commit(); + } + + final long processingNanos = System.nanoTime() - startNanos; + + // if the processor is no longer scheduled to run and this is the last thread, + // invoke the OnStopped methods + if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { + try (final NarCloseable x = NarCloseable.withNarLoader()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, procNode.getProcessor(), processContext); + flowController.heartbeat(); + } + } + + try { + final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(procNode.getIdentifier()); + procEvent.setProcessingNanos(processingNanos); + procEvent.setInvocations(invocationCount); + context.getFlowFileEventRepository().updateRepository(procEvent); + } catch (final IOException e) { + logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", procNode.getProcessor(), e.toString()); + logger.error("", e); + } + } finally { + scheduleState.decrementActiveThreadCount(); } }