nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [72/79] [abbrv] incubator-nifi git commit: NIFI-6: Rebase from develop to include renaming of directory structure
Date Thu, 22 Jan 2015 17:05:01 GMT
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 0000000,7fc65f9..0653b03
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@@ -1,0 -1,569 +1,640 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.controller.scheduling;
+ 
+ import static java.util.Objects.requireNonNull;
+ 
+ import java.lang.reflect.InvocationTargetException;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
+ 
++import org.apache.nifi.annotation.lifecycle.OnDisabled;
++import org.apache.nifi.annotation.lifecycle.OnEnabled;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.lifecycle.OnStopped;
++import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+ import org.apache.nifi.connectable.Connectable;
+ import org.apache.nifi.connectable.Funnel;
+ import org.apache.nifi.connectable.Port;
+ import org.apache.nifi.controller.AbstractPort;
+ import org.apache.nifi.controller.ConfigurationContext;
+ import org.apache.nifi.controller.Heartbeater;
+ import org.apache.nifi.controller.ProcessScheduler;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.ReportingTaskNode;
+ import org.apache.nifi.controller.ScheduledState;
+ import org.apache.nifi.controller.annotation.OnConfigured;
++import org.apache.nifi.controller.service.ControllerServiceNode;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+ import org.apache.nifi.encrypt.StringEncryptor;
+ import org.apache.nifi.engine.FlowEngine;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.nar.NarCloseable;
+ import org.apache.nifi.processor.SchedulingContext;
+ import org.apache.nifi.processor.SimpleProcessLogger;
+ import org.apache.nifi.processor.StandardProcessContext;
+ import org.apache.nifi.processor.StandardSchedulingContext;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.OnStopped;
 -import org.apache.nifi.processor.annotation.OnUnscheduled;
+ import org.apache.nifi.reporting.ReportingTask;
+ import org.apache.nifi.scheduling.SchedulingStrategy;
+ import org.apache.nifi.util.FormatUtils;
+ import org.apache.nifi.util.NiFiProperties;
+ import org.apache.nifi.util.ReflectionUtils;
 -
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * Responsible for scheduling Processors, Ports, and Funnels to run at regular
+  * intervals
+  */
+ public final class StandardProcessScheduler implements ProcessScheduler {
+ 
+     private static final Logger LOG = LoggerFactory.getLogger(StandardProcessScheduler.class);
+ 
+     private final ControllerServiceProvider controllerServiceProvider;
+     private final Heartbeater heartbeater;
+     private final long administrativeYieldMillis;
+     private final String administrativeYieldDuration;
+ 
+     private final ConcurrentMap<Object, ScheduleState> scheduleStates = new ConcurrentHashMap<>();
+     private final ScheduledExecutorService frameworkTaskExecutor;
+     private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> strategyAgentMap = new ConcurrentHashMap<>();
+     // thread pool for starting/stopping components
+     private final ExecutorService componentLifeCycleThreadPool = new ThreadPoolExecutor(25, 50, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5000));
+     private final StringEncryptor encryptor;
+ 
+     public StandardProcessScheduler(final Heartbeater heartbeater, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor) {
+         this.heartbeater = heartbeater;
+         this.controllerServiceProvider = controllerServiceProvider;
+         this.encryptor = encryptor;
+ 
+         administrativeYieldDuration = NiFiProperties.getInstance().getAdministrativeYieldDuration();
+         administrativeYieldMillis = FormatUtils.getTimeDuration(administrativeYieldDuration, TimeUnit.MILLISECONDS);
+ 
+         frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread");
+     }
+ 
+     public void scheduleFrameworkTask(final Runnable command, final String taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) {
+         frameworkTaskExecutor.scheduleWithFixedDelay(new Runnable() {
+             @Override
+             public void run() {
+                 try {
+                     command.run();
+                 } catch (final Throwable t) {
+                     LOG.error("Failed to run Framework Task {} due to {}", command, t.toString());
+                     if (LOG.isDebugEnabled()) {
+                         LOG.error("", t);
+                     }
+                 }
+             }
+         }, initialDelay, delay, timeUnit);
+     }
+ 
+     @Override
+     public void setMaxThreadCount(final SchedulingStrategy schedulingStrategy, final int maxThreadCount) {
+         final SchedulingAgent agent = getSchedulingAgent(schedulingStrategy);
+         if (agent == null) {
+             return;
+         }
+ 
+         agent.setMaxThreadCount(maxThreadCount);
+     }
+ 
+     public void setSchedulingAgent(final SchedulingStrategy strategy, final SchedulingAgent agent) {
+         strategyAgentMap.put(strategy, agent);
+     }
+ 
+     public SchedulingAgent getSchedulingAgent(final SchedulingStrategy strategy) {
+         return strategyAgentMap.get(strategy);
+     }
+ 
+     private SchedulingAgent getSchedulingAgent(final Connectable connectable) {
+         return getSchedulingAgent(connectable.getSchedulingStrategy());
+     }
+ 
+     @Override
+     public void shutdown() {
+         for (final SchedulingAgent schedulingAgent : strategyAgentMap.values()) {
+             try {
+                 schedulingAgent.shutdown();
+             } catch (final Throwable t) {
+                 LOG.error("Failed to shutdown Scheduling Agent {} due to {}", schedulingAgent, t.toString());
+                 LOG.error("", t);
+             }
+         }
+         
+         frameworkTaskExecutor.shutdown();
+         componentLifeCycleThreadPool.shutdown();
+     }
+ 
+     public void schedule(final ReportingTaskNode taskNode) {
+         final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
+         if (scheduleState.isScheduled()) {
+             return;
+         }
+ 
+         final int activeThreadCount = scheduleState.getActiveThreadCount();
+         if (activeThreadCount > 0) {
+             throw new IllegalStateException("Reporting Task " + taskNode.getName() + " cannot be started because it has " + activeThreadCount + " threads still running");
+         }
+ 
+         if (!taskNode.isValid()) {
+             throw new IllegalStateException("Reporting Task " + taskNode.getName() + " is not in a valid state for the following reasons: " + taskNode.getValidationErrors());
+         }
+ 
+         final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
+         scheduleState.setScheduled(true);
+ 
+         final Runnable startReportingTaskRunnable = new Runnable() {
++            @SuppressWarnings("deprecation")
+             @Override
+             public void run() {
++                // Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time.
+                 while (true) {
+                     final ReportingTask reportingTask = taskNode.getReportingTask();
+ 
+                     try {
+                         try (final NarCloseable x = NarCloseable.withNarLoader()) {
 -                            ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, taskNode.getConfigurationContext());
++                            ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
+                         }
++                        
+                         break;
+                     } catch (final InvocationTargetException ite) {
 -                        LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
++                        LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+                                 new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
+                         LOG.error("", ite.getTargetException());
+ 
+                         try {
+                             Thread.sleep(administrativeYieldMillis);
+                         } catch (final InterruptedException ie) {
+                         }
+                     } catch (final Exception e) {
 -                        LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
++                        LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+                                 new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
+                         try {
+                             Thread.sleep(administrativeYieldMillis);
+                         } catch (final InterruptedException ie) {
+                         }
+                     }
+                 }
+ 
+                 agent.schedule(taskNode, scheduleState);
+             }
+         };
+ 
+         componentLifeCycleThreadPool.execute(startReportingTaskRunnable);
+     }
+ 
+     public void unschedule(final ReportingTaskNode taskNode) {
+         final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
+         if (!scheduleState.isScheduled()) {
+             return;
+         }
+ 
+         final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
+         final ReportingTask reportingTask = taskNode.getReportingTask();
+         scheduleState.setScheduled(false);
+ 
+         final Runnable unscheduleReportingTaskRunnable = new Runnable() {
++            @SuppressWarnings("deprecation")
+             @Override
+             public void run() {
+                 final ConfigurationContext configurationContext = taskNode.getConfigurationContext();
+ 
 -                while (true) {
 -                    try {
 -                        try (final NarCloseable x = NarCloseable.withNarLoader()) {
 -                            ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext);
 -                        }
 -                        break;
 -                    } catch (final InvocationTargetException ite) {
 -                        LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
 -                                new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
 -                        LOG.error("", ite.getTargetException());
++                try {
++                    try (final NarCloseable x = NarCloseable.withNarLoader()) {
++                        ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
++                    }
++                } catch (final InvocationTargetException ite) {
++                    LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
++                            new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
++                    LOG.error("", ite.getTargetException());
+ 
 -                        try {
 -                            Thread.sleep(administrativeYieldMillis);
 -                        } catch (final InterruptedException ie) {
 -                        }
 -                    } catch (final Exception e) {
 -                        LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
 -                                new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
 -                        try {
 -                            Thread.sleep(administrativeYieldMillis);
 -                        } catch (final InterruptedException ie) {
 -                        }
++                    try {
++                        Thread.sleep(administrativeYieldMillis);
++                    } catch (final InterruptedException ie) {
++                    }
++                } catch (final Exception e) {
++                    LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
++                            new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
++                    try {
++                        Thread.sleep(administrativeYieldMillis);
++                    } catch (final InterruptedException ie) {
+                     }
+                 }
+ 
+                 agent.unschedule(taskNode, scheduleState);
+ 
 -                if (scheduleState.getActiveThreadCount() == 0) {
 -                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, configurationContext);
++                if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) {
++                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext);
+                 }
+             }
+         };
+ 
+         componentLifeCycleThreadPool.execute(unscheduleReportingTaskRunnable);
+     }
+ 
+     /**
+      * Starts scheduling the given processor to run after invoking all methods
+      * on the underlying {@link nifi.processor.Processor
+      * FlowFileProcessor} that are annotated with the {@link OnScheduled}
+      * annotation.
+      */
+     @Override
+     public synchronized void startProcessor(final ProcessorNode procNode) {
+         if (procNode.getScheduledState() == ScheduledState.DISABLED) {
+             throw new IllegalStateException(procNode + " is disabled, so it cannot be started");
+         }
+         final ScheduleState scheduleState = getScheduleState(requireNonNull(procNode));
+ 
+         if (scheduleState.isScheduled()) {
+             return;
+         }
+ 
+         final int activeThreadCount = scheduleState.getActiveThreadCount();
+         if (activeThreadCount > 0) {
+             throw new IllegalStateException("Processor " + procNode.getName() + " cannot be started because it has " + activeThreadCount + " threads still running");
+         }
+ 
+         if (!procNode.isValid()) {
+             throw new IllegalStateException("Processor " + procNode.getName() + " is not in a valid state");
+         }
+ 
+         final Runnable startProcRunnable = new Runnable() {
++            @SuppressWarnings("deprecation")
+             @Override
+             public void run() {
+                 try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                     long lastStopTime = scheduleState.getLastStopTime();
+                     final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor);
+ 
+                     while (true) {
+                         try {
+                             synchronized (scheduleState) {
+                                 // if no longer scheduled to run, then we're finished. This can happen, for example,
+                                 // if the @OnScheduled method throws an Exception and the user stops the processor 
+                                 // while we're administratively yielded.
+                                 // 
+                                 // we also check if the schedule state's last start time is equal to what it was before.
+                                 // if not, then means that the processor has been stopped and started again, so we should just
+                                 // bail; another thread will be responsible for invoking the @OnScheduled methods.
+                                 if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) {
+                                     return;
+                                 }
+ 
+                                 final SchedulingContext schedulingContext = new StandardSchedulingContext(processContext, controllerServiceProvider, procNode);
 -                                ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, procNode.getProcessor(), schedulingContext);
++                                ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, org.apache.nifi.processor.annotation.OnScheduled.class, procNode.getProcessor(), schedulingContext);
+ 
+                                 getSchedulingAgent(procNode).schedule(procNode, scheduleState);
+ 
+                                 heartbeater.heartbeat();
+                                 return;
+                             }
+                         } catch (final Exception e) {
+                             final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
+ 
+                             procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {}",
+                                     new Object[]{procNode.getProcessor(), e.getCause(), administrativeYieldDuration}, e.getCause());
+                             LOG.error("Failed to invoke @OnScheduled method due to {}", e.getCause().toString(), e.getCause());
+ 
+                             ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, procNode.getProcessor(), processContext);
+                             ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, procNode.getProcessor(), processContext);
+ 
+                             Thread.sleep(administrativeYieldMillis);
+                             continue;
+                         }
+                     }
+                 } catch (final Throwable t) {
+                     final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
+                     procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run", new Object[]{procNode.getProcessor(), t});
+                     LOG.error("Failed to invoke @OnScheduled method due to {}", t.toString(), t);
+                 }
+             }
+         };
+ 
+         scheduleState.setScheduled(true);
+         procNode.setScheduledState(ScheduledState.RUNNING);
+ 
+         componentLifeCycleThreadPool.execute(startProcRunnable);
+     }
+ 
+     /**
+      * Used to delay scheduling the given Processor to run until its yield
+      * duration expires.
+      *
+      * @param procNode
+      */
+     @Override
+     public void yield(final ProcessorNode procNode) {
+         // This exists in the ProcessScheduler so that the scheduler can take advantage of the fact that
+         // the Processor was yielded and, as a result, avoid scheduling the Processor to potentially run
+         // (thereby skipping the overhead of the Context Switches) if nothing can be done.
+         //
+         // We used to implement this feature by canceling all futures for the given Processor and
+         // re-submitting them with a delay. However, this became problematic, because we have situations where
+         // a Processor will wait several seconds (often 30 seconds in the case of a network timeout), and then yield
+         // the context. If this Processor has X number of threads, we end up submitting X new tasks while the previous
+         // X-1 tasks are still running. At this point, another thread could finish and do the same thing, resulting in
+         // an additional X-1 extra tasks being submitted.
+         // 
+         // As a result, we simply removed this buggy implementation, as it was a very minor performance optimization
+         // that gave very bad results.
+     }
+ 
+     /**
+      * Stops scheduling the given processor to run and invokes all methods on
+      * the underlying {@link nifi.processor.Processor FlowFileProcessor} that
+      * are annotated with the {@link OnUnscheduled} annotation.
+      */
+     @Override
+     public synchronized void stopProcessor(final ProcessorNode procNode) {
+         final ScheduleState state = getScheduleState(requireNonNull(procNode));
+ 
+         synchronized (state) {
+             if (!state.isScheduled()) {
+                 procNode.setScheduledState(ScheduledState.STOPPED);
+                 return;
+             }
+ 
+             getSchedulingAgent(procNode).unschedule(procNode, state);
+             procNode.setScheduledState(ScheduledState.STOPPED);
+             state.setScheduled(false);
+         }
+ 
+         final Runnable stopProcRunnable = new Runnable() {
+             @Override
+             public void run() {
+                 try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                     final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor);
+ 
+                     ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, procNode.getProcessor(), processContext);
+ 
+                     // If no threads currently running, call the OnStopped methods
+                     if (state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) {
+                         ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, procNode.getProcessor(), processContext);
+                         heartbeater.heartbeat();
+                     }
+                 }
+             }
+         };
+ 
+         componentLifeCycleThreadPool.execute(stopProcRunnable);
+     }
+ 
+     @Override
+     public void registerEvent(final Connectable worker) {
+         getSchedulingAgent(worker).onEvent(worker);
+     }
+ 
+     /**
+      * Returns the number of threads that are currently active for the given
+      * <code>Connectable</code>.
+      *
+      * @return
+      */
+     @Override
+     public int getActiveThreadCount(final Object scheduled) {
+         return getScheduleState(scheduled).getActiveThreadCount();
+     }
+ 
+     /**
+      * Begins scheduling the given port to run.
+      *
+      * @throws NullPointerException if the Port is null
+      * @throws IllegalStateException if the Port is already scheduled to run or
+      * has threads running
+      */
+     @Override
+     public void startPort(final Port port) {
+         if (!port.isValid()) {
+             throw new IllegalStateException("Port " + port.getName() + " is not in a valid state");
+         }
+ 
+         port.onSchedulingStart();
+         startConnectable(port);
+     }
+ 
+     @Override
+     public void startFunnel(final Funnel funnel) {
+         startConnectable(funnel);
+         funnel.setScheduledState(ScheduledState.RUNNING);
+     }
+ 
+     @Override
+     public void stopPort(final Port port) {
+         stopConnectable(port);
+         port.shutdown();
+     }
+ 
+     @Override
+     public void stopFunnel(final Funnel funnel) {
+         stopConnectable(funnel);
+         funnel.setScheduledState(ScheduledState.STOPPED);
+     }
+ 
+     private synchronized void startConnectable(final Connectable connectable) {
+         if (connectable.getScheduledState() == ScheduledState.DISABLED) {
+             throw new IllegalStateException(connectable + " is disabled, so it cannot be started");
+         }
+ 
+         final ScheduleState scheduleState = getScheduleState(requireNonNull(connectable));
+         if (scheduleState.isScheduled()) {
+             return;
+         }
+ 
+         final int activeThreads = scheduleState.getActiveThreadCount();
+         if (activeThreads > 0) {
+             throw new IllegalStateException("Port cannot be scheduled to run until its last " + activeThreads + " threads finish");
+         }
+ 
+         getSchedulingAgent(connectable).schedule(connectable, scheduleState);
+         scheduleState.setScheduled(true);
+     }
+ 
+     private synchronized void stopConnectable(final Connectable connectable) {
+         final ScheduleState state = getScheduleState(requireNonNull(connectable));
+         if (!state.isScheduled()) {
+             return;
+         }
+         state.setScheduled(false);
+ 
+         getSchedulingAgent(connectable).unschedule(connectable, state);
+ 
+         if (!state.isScheduled() && state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) {
+             final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, encryptor);
+             try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                 ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext);
+                 heartbeater.heartbeat();
+             }
+         }
+     }
+ 
+     @Override
+     public synchronized void enableFunnel(final Funnel funnel) {
+         if (funnel.getScheduledState() != ScheduledState.DISABLED) {
+             throw new IllegalStateException("Funnel cannot be enabled because it is not disabled");
+         }
+         funnel.setScheduledState(ScheduledState.STOPPED);
+     }
+ 
+     @Override
+     public synchronized void disableFunnel(final Funnel funnel) {
+         if (funnel.getScheduledState() != ScheduledState.STOPPED) {
+             throw new IllegalStateException("Funnel cannot be disabled because its state its state is set to " + funnel.getScheduledState());
+         }
+         funnel.setScheduledState(ScheduledState.DISABLED);
+     }
+ 
+     @Override
+     public synchronized void disablePort(final Port port) {
+         if (port.getScheduledState() != ScheduledState.STOPPED) {
+             throw new IllegalStateException("Port cannot be disabled because its state is set to " + port.getScheduledState());
+         }
+ 
+         if (!(port instanceof AbstractPort)) {
+             throw new IllegalArgumentException();
+         }
+ 
+         ((AbstractPort) port).disable();
+     }
+ 
+     @Override
 -    public synchronized void disableProcessor(final ProcessorNode procNode) {
 -        if (procNode.getScheduledState() != ScheduledState.STOPPED) {
 -            throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState());
 -        }
 -        procNode.setScheduledState(ScheduledState.DISABLED);
 -    }
 -
 -    @Override
+     public synchronized void enablePort(final Port port) {
+         if (port.getScheduledState() != ScheduledState.DISABLED) {
+             throw new IllegalStateException("Funnel cannot be enabled because it is not disabled");
+         }
+ 
+         if (!(port instanceof AbstractPort)) {
+             throw new IllegalArgumentException();
+         }
+ 
+         ((AbstractPort) port).enable();
+     }
+ 
+     @Override
+     public synchronized void enableProcessor(final ProcessorNode procNode) {
+         if (procNode.getScheduledState() != ScheduledState.DISABLED) {
+             throw new IllegalStateException("Processor cannot be enabled because it is not disabled");
+         }
++        
+         procNode.setScheduledState(ScheduledState.STOPPED);
++        
++        try (final NarCloseable x = NarCloseable.withNarLoader()) {
++            final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
++            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, procNode.getProcessor(), processorLog);
++        }
+     }
+ 
+     @Override
++    public synchronized void disableProcessor(final ProcessorNode procNode) {
++        if (procNode.getScheduledState() != ScheduledState.STOPPED) {
++            throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState());
++        }
++        
++        procNode.setScheduledState(ScheduledState.DISABLED);
++        
++        try (final NarCloseable x = NarCloseable.withNarLoader()) {
++            final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
++            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, procNode.getProcessor(), processorLog);
++        }
++    }
++
++    public synchronized void enableReportingTask(final ReportingTaskNode taskNode) {
++        if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) {
++            throw new IllegalStateException("Reporting Task cannot be enabled because it is not disabled");
++        }
++
++        taskNode.setScheduledState(ScheduledState.STOPPED);
++        
++        try (final NarCloseable x = NarCloseable.withNarLoader()) {
++            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, taskNode.getReportingTask());
++        }
++    }
++    
++    public synchronized void disableReportingTask(final ReportingTaskNode taskNode) {
++        if ( taskNode.getScheduledState() != ScheduledState.STOPPED ) {
++            throw new IllegalStateException("Reporting Task cannot be disabled because its state is set to " + taskNode.getScheduledState() + " but transition to DISABLED state is allowed only from the STOPPED state");
++        }
++
++        taskNode.setScheduledState(ScheduledState.DISABLED);
++        
++        try (final NarCloseable x = NarCloseable.withNarLoader()) {
++            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, taskNode.getReportingTask());
++        }
++    }
++
++    public synchronized void enableControllerService(final ControllerServiceNode serviceNode) {
++        if ( !serviceNode.isDisabled() ) {
++            throw new IllegalStateException("Controller Service cannot be enabled because it is not disabled");
++        }
++
++        // we set the service to enabled before invoking the @OnEnabled methods. We do this because it must be
++        // done in this order for disabling (serviceNode.setDisabled(true) will throw Exceptions if the service
++        // is currently known to be in use) and we want to be consistent with the ordering of calling setDisabled
++        // before annotated methods.
++        serviceNode.setDisabled(false);
++        
++        try (final NarCloseable x = NarCloseable.withNarLoader()) {
++            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, serviceNode.getControllerServiceImplementation());
++        }
++    }
++    
++    public synchronized void disableControllerService(final ControllerServiceNode serviceNode) {
++        if ( serviceNode.isDisabled() ) {
++            throw new IllegalStateException("Controller Service cannot be disabled because it is already disabled");
++        }
++
++        // We must set the service to disabled before we invoke the OnDisabled methods because the service node
++        // can throw Exceptions if we attempt to disable the service while it's known to be in use.
++        serviceNode.setDisabled(true);
++        
++        try (final NarCloseable x = NarCloseable.withNarLoader()) {
++            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, serviceNode.getControllerServiceImplementation());
++        }
++    }
++    
++    
++    @Override
+     public boolean isScheduled(final Object scheduled) {
+         final ScheduleState scheduleState = scheduleStates.get(scheduled);
+         return (scheduleState == null) ? false : scheduleState.isScheduled();
+     }
+ 
+     /**
 -     * Returns the ScheduleState that is registered for the given ProcessorNode;
++     * Returns the ScheduleState that is registered for the given component;
+      * if no ScheduleState current is registered, one is created and registered
+      * atomically, and then that value is returned.
+      *
+      * @param schedulable
+      * @return
+      */
+     private ScheduleState getScheduleState(final Object schedulable) {
+         ScheduleState scheduleState = scheduleStates.get(schedulable);
+         if (scheduleState == null) {
+             scheduleState = new ScheduleState();
+             ScheduleState previous = scheduleStates.putIfAbsent(schedulable, scheduleState);
+             if (previous != null) {
+                 scheduleState = previous;
+             }
+         }
+         return scheduleState;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 0000000,42bd55f..9fec307
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@@ -1,0 -1,156 +1,154 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.controller.service;
+ 
+ import java.io.BufferedInputStream;
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.net.URL;
+ import java.nio.file.Files;
+ import java.nio.file.Path;
+ import java.nio.file.StandardOpenOption;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ 
+ import javax.xml.XMLConstants;
+ import javax.xml.parsers.DocumentBuilder;
+ import javax.xml.parsers.DocumentBuilderFactory;
+ import javax.xml.parsers.ParserConfigurationException;
+ import javax.xml.validation.Schema;
+ import javax.xml.validation.SchemaFactory;
+ 
+ import org.apache.nifi.util.file.FileUtils;
+ import org.apache.nifi.util.DomUtils;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.w3c.dom.Document;
+ import org.w3c.dom.Element;
+ import org.w3c.dom.NodeList;
+ import org.xml.sax.SAXException;
+ import org.xml.sax.SAXParseException;
+ 
+ /**
+  *
+  */
+ public class ControllerServiceLoader {
+ 
+     private static final Log logger = LogFactory.getLog(ControllerServiceLoader.class);
+ 
+     private final Path serviceConfigXmlPath;
+ 
+     public ControllerServiceLoader(final Path serviceConfigXmlPath) throws IOException {
+         final File serviceConfigXmlFile = serviceConfigXmlPath.toFile();
+         if (!serviceConfigXmlFile.exists() || !serviceConfigXmlFile.canRead()) {
+             throw new IOException(serviceConfigXmlPath + " does not appear to exist or cannot be read. Cannot load configuration.");
+         }
+ 
+         this.serviceConfigXmlPath = serviceConfigXmlPath;
+     }
+ 
+     public List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider) throws IOException {
+         final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+         final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
+         InputStream fis = null;
+         BufferedInputStream bis = null;
+         documentBuilderFactory.setNamespaceAware(true);
+ 
+         final List<ControllerServiceNode> services = new ArrayList<>();
+ 
+         try {
+             final URL configurationResource = this.getClass().getResource("/ControllerServiceConfiguration.xsd");
+             if (configurationResource == null) {
+                 throw new NullPointerException("Unable to load XML Schema for ControllerServiceConfiguration");
+             }
+             final Schema schema = schemaFactory.newSchema(configurationResource);
+             documentBuilderFactory.setSchema(schema);
+             final DocumentBuilder builder = documentBuilderFactory.newDocumentBuilder();
+ 
+             builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
+ 
+                 @Override
+                 public void fatalError(final SAXParseException err) throws SAXException {
+                     logger.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage());
+                     if (logger.isDebugEnabled()) {
+                         logger.error("Error Stack Dump", err);
+                     }
+                     throw err;
+                 }
+ 
+                 @Override
+                 public void error(final SAXParseException err) throws SAXParseException {
+                     logger.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage());
+                     if (logger.isDebugEnabled()) {
+                         logger.error("Error Stack Dump", err);
+                     }
+                     throw err;
+                 }
+ 
+                 @Override
+                 public void warning(final SAXParseException err) throws SAXParseException {
+                     logger.warn(" Config file line " + err.getLineNumber() + ", uri " + err.getSystemId() + " : message : " + err.getMessage());
+                     if (logger.isDebugEnabled()) {
+                         logger.warn("Warning stack dump", err);
+                     }
+                     throw err;
+                 }
+             });
+ 
+             //if controllerService.xml does not exist, create an empty file...
+             fis = Files.newInputStream(this.serviceConfigXmlPath, StandardOpenOption.READ);
+             bis = new BufferedInputStream(fis);
+             if (Files.size(this.serviceConfigXmlPath) > 0) {
+                 final Document document = builder.parse(bis);
+                 final NodeList servicesNodes = document.getElementsByTagName("services");
+                 final Element servicesElement = (Element) servicesNodes.item(0);
+ 
+                 final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(servicesElement, "service");
+                 for (final Element serviceElement : serviceNodes) {
 -                    //add global properties common to all tasks
 -                    Map<String, String> properties = new HashMap<>();
 -
+                     //get properties for the specific controller task - id, name, class,
+                     //and schedulingPeriod must be set
+                     final String serviceId = DomUtils.getChild(serviceElement, "identifier").getTextContent().trim();
+                     final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
+ 
++                    //set the class to be used for the configured controller task
++                    final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, false);
++
+                     //optional task-specific properties
+                     for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {
+                         final String name = optionalProperty.getAttribute("name").trim();
+                         final String value = optionalProperty.getTextContent().trim();
 -                        properties.put(name, value);
++                        serviceNode.setProperty(name, value);
+                     }
+ 
 -                    //set the class to be used for the configured controller task
 -                    final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, properties);
+                     services.add(serviceNode);
+                     serviceNode.setDisabled(false);
+                 }
+             }
+         } catch (SAXException | ParserConfigurationException sxe) {
+             throw new IOException(sxe);
+         } finally {
+             FileUtils.closeQuietly(fis);
+             FileUtils.closeQuietly(bis);
+         }
+ 
+         return services;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 0000000,455eac1..4681293
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@@ -1,0 -1,125 +1,195 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.controller.service;
+ 
+ import java.util.HashSet;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReadWriteLock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+ 
+ import org.apache.nifi.controller.AbstractConfiguredComponent;
+ import org.apache.nifi.controller.Availability;
++import org.apache.nifi.controller.ConfigurationContext;
+ import org.apache.nifi.controller.ConfiguredComponent;
+ import org.apache.nifi.controller.ControllerService;
+ import org.apache.nifi.controller.ValidationContextFactory;
++import org.apache.nifi.controller.annotation.OnConfigured;
++import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
++import org.apache.nifi.nar.NarCloseable;
++import org.apache.nifi.util.ReflectionUtils;
+ 
+ public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode {
+ 
 -    private final ControllerService controllerService;
++    private final ControllerService proxedControllerService;
++    private final ControllerService implementation;
++    private final ControllerServiceProvider serviceProvider;
+ 
+     private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
+     private final AtomicBoolean disabled = new AtomicBoolean(true);
+ 
+     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+     private final Lock readLock = rwLock.readLock();
+     private final Lock writeLock = rwLock.writeLock();
+ 
+     private final Set<ConfiguredComponent> referencingComponents = new HashSet<>();
+ 
 -    public StandardControllerServiceNode(final ControllerService controllerService, final String id,
++    public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
+             final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
 -        super(controllerService, id, validationContextFactory, serviceProvider);
 -        this.controllerService = controllerService;
++        super(proxiedControllerService, id, validationContextFactory, serviceProvider);
++        this.proxedControllerService = proxiedControllerService;
++        this.implementation = implementation;
++        this.serviceProvider = serviceProvider;
+     }
+ 
+     @Override
+     public boolean isDisabled() {
+         return disabled.get();
+     }
+ 
+     @Override
+     public void setDisabled(final boolean disabled) {
+         if (!disabled && !isValid()) {
 -            throw new IllegalStateException("Cannot enable Controller Service " + controllerService + " because it is not valid");
++            throw new IllegalStateException("Cannot enable Controller Service " + implementation + " because it is not valid");
+         }
+ 
+         if (disabled) {
+             // do not allow a Controller Service to be disabled if it's currently being used.
+             final Set<ConfiguredComponent> runningRefs = getReferences().getRunningReferences();
+             if (!runningRefs.isEmpty()) {
+                 throw new IllegalStateException("Cannot disable Controller Service because it is referenced (either directly or indirectly) by " + runningRefs.size() + " different components that are currently running");
+             }
+         }
+ 
+         this.disabled.set(disabled);
+     }
+ 
+     @Override
+     public Availability getAvailability() {
+         return availability.get();
+     }
+ 
+     @Override
+     public void setAvailability(final Availability availability) {
+         this.availability.set(availability);
+     }
+ 
+     @Override
 -    public ControllerService getControllerService() {
 -        return controllerService;
++    public ControllerService getProxiedControllerService() {
++        return proxedControllerService;
++    }
++    
++    @Override
++    public ControllerService getControllerServiceImplementation() {
++        return implementation;
+     }
+ 
+     @Override
+     public ControllerServiceReference getReferences() {
+         readLock.lock();
+         try {
+             return new StandardControllerServiceReference(this, referencingComponents);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void addReference(final ConfiguredComponent referencingComponent) {
+         writeLock.lock();
+         try {
+             referencingComponents.add(referencingComponent);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void removeReference(final ConfiguredComponent referencingComponent) {
+         writeLock.lock();
+         try {
+             referencingComponents.remove(referencingComponent);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void verifyModifiable() throws IllegalStateException {
+         if (!isDisabled()) {
+             throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first.");
+         }
+     }
++    
++    @Override
++    public void setProperty(final String name, final String value) {
++        super.setProperty(name, value);
++        
++        onConfigured();
++    }
++    
++    @Override
++    public boolean removeProperty(String name) {
++        final boolean removed = super.removeProperty(name);
++        if ( removed ) {
++            onConfigured();
++        }
++        
++        return removed;
++    }
++    
++    private void onConfigured() {
++        try (final NarCloseable x = NarCloseable.withNarLoader()) {
++            final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceProvider);
++            ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, implementation, configContext);
++        } catch (final Exception e) {
++            throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + implementation, e);
++        }
++    }
++    
++    @Override
++    public void verifyCanDelete() {
++        if ( !isDisabled() ) {
++            throw new IllegalStateException(implementation + " cannot be deleted because it is not disabled");
++        }
++    }
++    
++    @Override
++    public void verifyCanDisable() {
++        final ControllerServiceReference references = getReferences();
++        final int numRunning = references.getRunningReferences().size();
++        if ( numRunning > 0 ) {
++            throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by " + numRunning + " components that are currently running");
++        }
++    }
++    
++    @Override
++    public void verifyCanEnable() {
++        if ( !isDisabled() ) {
++            throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled");
++        }
++    }
++    
++    @Override
++    public void verifyCanUpdate() {
++        if ( !isDisabled() ) {
++            throw new IllegalStateException(implementation + " cannot be updated because it is not disabled");
++        }
++    }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 0000000,fc07ce1..cc7a18a
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@@ -1,0 -1,196 +1,219 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.controller.service;
+ 
+ import static java.util.Objects.requireNonNull;
+ 
+ import java.lang.reflect.InvocationHandler;
+ import java.lang.reflect.InvocationTargetException;
+ import java.lang.reflect.Method;
+ import java.lang.reflect.Proxy;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.ConcurrentHashMap;
+ 
++import org.apache.nifi.annotation.lifecycle.OnAdded;
++import org.apache.nifi.annotation.lifecycle.OnRemoved;
++import org.apache.nifi.controller.ConfigurationContext;
+ import org.apache.nifi.controller.ControllerService;
+ import org.apache.nifi.controller.ValidationContextFactory;
 -import org.apache.nifi.controller.annotation.OnConfigured;
+ import org.apache.nifi.controller.exception.ControllerServiceAlreadyExistsException;
+ import org.apache.nifi.controller.exception.ControllerServiceNotFoundException;
++import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+ import org.apache.nifi.nar.ExtensionManager;
+ import org.apache.nifi.nar.NarCloseable;
+ import org.apache.nifi.processor.StandardValidationContextFactory;
+ import org.apache.nifi.util.ObjectHolder;
+ import org.apache.nifi.util.ReflectionUtils;
 -
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  *
+  */
+ public class StandardControllerServiceProvider implements ControllerServiceProvider {
+ 
+     private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class);
+ 
+     private final Map<String, ControllerServiceNode> controllerServices;
+     private static final Set<Method> validDisabledMethods;
+ 
+     static {
+         // methods that are okay to be called when the service is disabled.
+         final Set<Method> validMethods = new HashSet<>();
+         for (final Method method : ControllerService.class.getMethods()) {
+             validMethods.add(method);
+         }
+         for (final Method method : Object.class.getMethods()) {
+             validMethods.add(method);
+         }
+         validDisabledMethods = Collections.unmodifiableSet(validMethods);
+     }
+ 
+     public StandardControllerServiceProvider() {
+         // the following 2 maps must be updated atomically, but we do not lock around them because they are modified
+         // only in the createControllerService method, and both are modified before the method returns
+         this.controllerServices = new ConcurrentHashMap<>();
+     }
+ 
+     private Class<?>[] getInterfaces(final Class<?> cls) {
+         final List<Class<?>> allIfcs = new ArrayList<>();
+         populateInterfaces(cls, allIfcs);
+         return allIfcs.toArray(new Class<?>[allIfcs.size()]);
+     }
+ 
+     private void populateInterfaces(final Class<?> cls, final List<Class<?>> interfacesDefinedThusFar) {
+         final Class<?>[] ifc = cls.getInterfaces();
+         if (ifc != null && ifc.length > 0) {
+             for (final Class<?> i : ifc) {
+                 interfacesDefinedThusFar.add(i);
+             }
+         }
+ 
+         final Class<?> superClass = cls.getSuperclass();
+         if (superClass != null) {
+             populateInterfaces(superClass, interfacesDefinedThusFar);
+         }
+     }
+ 
+     @Override
 -    public ControllerServiceNode createControllerService(final String type, final String id, final Map<String, String> properties) {
++    public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
+         if (type == null || id == null) {
+             throw new NullPointerException();
+         }
+         if (controllerServices.containsKey(id)) {
+             throw new ControllerServiceAlreadyExistsException(id);
+         }
+ 
+         final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
+         try {
+             final ClassLoader cl = ExtensionManager.getClassLoader(type);
+             Thread.currentThread().setContextClassLoader(cl);
+             final Class<?> rawClass = Class.forName(type, false, cl);
+             final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class);
+ 
+             final ControllerService originalService = controllerServiceClass.newInstance();
+             final ObjectHolder<ControllerServiceNode> serviceNodeHolder = new ObjectHolder<>(null);
+             final InvocationHandler invocationHandler = new InvocationHandler() {
+                 @Override
+                 public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
+                     final ControllerServiceNode node = serviceNodeHolder.get();
+                     if (node.isDisabled() && !validDisabledMethods.contains(method)) {
++                        // Use nar class loader here because we are implicitly calling toString() on the original implementation.
+                         try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                             throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService + " because the Controller Service is disabled");
+                         } catch (final Throwable e) {
+                             throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service with identifier " + id + " because the Controller Service is disabled");
+                         }
+                     }
+ 
+                     try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                         return method.invoke(originalService, args);
+                     } catch (final InvocationTargetException e) {
+                         // If the ControllerService throws an Exception, it'll be wrapped in an InvocationTargetException. We want
+                         // to instead re-throw what the ControllerService threw, so we pull it out of the InvocationTargetException.
+                         throw e.getCause();
+                     }
+                 }
+             };
+ 
+             final ControllerService proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler);
+             logger.info("Loaded service {} as configured.", type);
+ 
+             originalService.initialize(new StandardControllerServiceInitializationContext(id, this));
+ 
+             final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this);
+ 
 -            final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, id, validationContextFactory, this);
++            final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this);
+             serviceNodeHolder.set(serviceNode);
+             serviceNode.setAnnotationData(null);
+             serviceNode.setName(id);
 -            for (final Map.Entry<String, String> entry : properties.entrySet()) {
 -                serviceNode.setProperty(entry.getKey(), entry.getValue());
++            
++            if ( firstTimeAdded ) {
++                try (final NarCloseable x = NarCloseable.withNarLoader()) {
++                    ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService);
++                } catch (final Exception e) {
++                    throw new ProcessorLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + originalService, e);
++                }
+             }
 -            final StandardConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this);
 -            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigured.class, originalService, configurationContext);
+ 
+             this.controllerServices.put(id, serviceNode);
+             return serviceNode;
+         } catch (final Throwable t) {
+             throw new ControllerServiceNotFoundException(t);
+         } finally {
+             if (currentContextClassLoader != null) {
+                 Thread.currentThread().setContextClassLoader(currentContextClassLoader);
+             }
+         }
+     }
+ 
+     @Override
+     public ControllerService getControllerService(final String serviceIdentifier) {
+         final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
 -        return (node == null) ? null : node.getControllerService();
++        return (node == null) ? null : node.getProxiedControllerService();
+     }
+ 
+     @Override
+     public boolean isControllerServiceEnabled(final ControllerService service) {
+         return isControllerServiceEnabled(service.getIdentifier());
+     }
+ 
+     @Override
+     public boolean isControllerServiceEnabled(final String serviceIdentifier) {
+         final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
+         return (node == null) ? false : !node.isDisabled();
+     }
+ 
+     @Override
+     public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) {
+         return controllerServices.get(serviceIdentifier);
+     }
+ 
+     @Override
+     public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
+         final Set<String> identifiers = new HashSet<>();
+         for (final Map.Entry<String, ControllerServiceNode> entry : controllerServices.entrySet()) {
 -            if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getControllerService().getClass())) {
++            if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getProxiedControllerService().getClass())) {
+                 identifiers.add(entry.getKey());
+             }
+         }
+ 
+         return identifiers;
+     }
++    
++    @Override
++    public void removeControllerService(final ControllerServiceNode serviceNode) {
++        final ControllerServiceNode existing = controllerServices.get(serviceNode.getIdentifier());
++        if ( existing == null || existing != serviceNode ) {
++            throw new IllegalStateException("Controller Service " + serviceNode + " does not exist in this Flow");
++        }
++        
++        serviceNode.verifyCanDelete();
++        
++        try (final NarCloseable x = NarCloseable.withNarLoader()) {
++            final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this);
++            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, serviceNode.getControllerServiceImplementation(), configurationContext);
++        }
++        
++        controllerServices.remove(serviceNode.getIdentifier());
++    }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
index 0000000,c04a04f..aca870b
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
@@@ -1,0 -1,97 +1,97 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.controller.tasks;
+ 
+ import java.util.concurrent.atomic.AtomicLong;
+ 
++import org.apache.nifi.annotation.lifecycle.OnStopped;
+ import org.apache.nifi.connectable.Connectable;
+ import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
+ import org.apache.nifi.controller.scheduling.ConnectableProcessContext;
+ import org.apache.nifi.controller.scheduling.ProcessContextFactory;
+ import org.apache.nifi.controller.scheduling.ScheduleState;
+ import org.apache.nifi.encrypt.StringEncryptor;
+ import org.apache.nifi.nar.NarCloseable;
+ import org.apache.nifi.processor.ProcessSessionFactory;
 -import org.apache.nifi.processor.annotation.OnStopped;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.util.Connectables;
+ import org.apache.nifi.util.ReflectionUtils;
 -
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public class ContinuallyRunConnectableTask implements Runnable {
+ 
+     private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunConnectableTask.class);
+ 
+     private final Connectable connectable;
+     private final ScheduleState scheduleState;
+     private final ProcessSessionFactory sessionFactory;
+     private final ConnectableProcessContext processContext;
+ 
+     public ContinuallyRunConnectableTask(final ProcessContextFactory contextFactory, final Connectable connectable, final ScheduleState scheduleState, final StringEncryptor encryptor) {
+         this.connectable = connectable;
+         this.scheduleState = scheduleState;
+         this.sessionFactory = new StandardProcessSessionFactory(contextFactory.newProcessContext(connectable, new AtomicLong(0L)));
+         this.processContext = new ConnectableProcessContext(connectable, encryptor);
+     }
+ 
++    @SuppressWarnings("deprecation")
+     @Override
+     public void run() {
+         if (!scheduleState.isScheduled()) {
+             return;
+         }
+         // Connectable should run if the following conditions are met:
+         // 1. It's an Input Port or or is a Remote Input Port or has incoming FlowFiles queued
+         // 2. Any relationship is available (since there's only 1
+         // relationship for a Connectable, we can say "any" or "all" and
+         // it means the same thing)
+         // 3. It is not yielded.
+         final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty();
+         final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis())
+                 && (triggerWhenEmpty || Connectables.flowFilesQueued(connectable)) && (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable));
+ 
+         if (shouldRun) {
+             scheduleState.incrementActiveThreadCount();
+             try {
+                 try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
+                     connectable.onTrigger(processContext, sessionFactory);
+                 } catch (final ProcessException pe) {
+                     logger.error("{} failed to process session due to {}", connectable, pe.toString());
+                 } catch (final Throwable t) {
+                     logger.error("{} failed to process session due to {}", connectable, t.toString());
+                     logger.error("", t);
+ 
+                     logger.warn("{} Administratively Pausing for 10 seconds due to processing failure: {}", connectable, t.toString(), t);
+                     try {
+                         Thread.sleep(10000L);
+                     } catch (final InterruptedException e) {
+                     }
+ 
+                 }
+             } finally {
+                 if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
+                     try (final NarCloseable x = NarCloseable.withNarLoader()) {
 -                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext);
++                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, connectable, processContext);
+                     }
+                 }
+ 
+                 scheduleState.decrementActiveThreadCount();
+             }
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index 0000000,65c375f..33bd327
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@@ -1,0 -1,185 +1,185 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.controller.tasks;
+ 
+ import java.io.IOException;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+ 
++import org.apache.nifi.annotation.lifecycle.OnStopped;
+ import org.apache.nifi.controller.FlowController;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.repository.BatchingSessionFactory;
+ import org.apache.nifi.controller.repository.ProcessContext;
+ import org.apache.nifi.controller.repository.StandardFlowFileEvent;
+ import org.apache.nifi.controller.repository.StandardProcessSession;
+ import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
+ import org.apache.nifi.controller.scheduling.ProcessContextFactory;
+ import org.apache.nifi.controller.scheduling.ScheduleState;
+ import org.apache.nifi.controller.scheduling.SchedulingAgent;
+ import org.apache.nifi.encrypt.StringEncryptor;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.nar.NarCloseable;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.SimpleProcessLogger;
+ import org.apache.nifi.processor.StandardProcessContext;
 -import org.apache.nifi.processor.annotation.OnStopped;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.util.Connectables;
+ import org.apache.nifi.util.ReflectionUtils;
 -
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public class ContinuallyRunProcessorTask implements Runnable {
+ 
+     private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunProcessorTask.class);
+ 
+     private final SchedulingAgent schedulingAgent;
+     private final ProcessorNode procNode;
+     private final ProcessContext context;
+     private final ScheduleState scheduleState;
+     private final StandardProcessContext processContext;
+     private final FlowController flowController;
+     private final int numRelationships;
+ 
+     public ContinuallyRunProcessorTask(final SchedulingAgent schedulingAgent, final ProcessorNode procNode,
+             final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState, final StringEncryptor encryptor) {
+ 
+         this.schedulingAgent = schedulingAgent;
+         this.procNode = procNode;
+         this.scheduleState = scheduleState;
+         this.numRelationships = procNode.getRelationships().size();
+         this.flowController = flowController;
+ 
+         context = contextFactory.newProcessContext(procNode, new AtomicLong(0L));
+         this.processContext = new StandardProcessContext(procNode, flowController, encryptor);
+     }
+ 
++    @SuppressWarnings("deprecation")
+     @Override
+     public void run() {
+         // make sure processor is not yielded
+         boolean shouldRun = (procNode.getYieldExpiration() < System.currentTimeMillis());
+         if (!shouldRun) {
+             return;
+         }
+ 
+         // make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node
+         shouldRun = !procNode.isIsolated() || !flowController.isClustered() || flowController.isPrimary();
+         if (!shouldRun) {
+             return;
+         }
+ 
+         // make sure that either proc has incoming FlowFiles or has no incoming connections or is annotated with @TriggerWhenEmpty
+         shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
+         if (!shouldRun) {
+             return;
+         }
+ 
+         if (numRelationships > 0) {
+             final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
+             shouldRun = context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships);
+         }
+ 
+         final long batchNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
+         final ProcessSessionFactory sessionFactory;
+         final StandardProcessSession rawSession;
+         final boolean batch;
+         if (procNode.isHighThroughputSupported() && batchNanos > 0L) {
+             rawSession = new StandardProcessSession(context);
+             sessionFactory = new BatchingSessionFactory(rawSession);
+             batch = true;
+         } else {
+             rawSession = null;
+             sessionFactory = new StandardProcessSessionFactory(context);
+             batch = false;
+         }
+ 
+         if (!shouldRun) {
+             return;
+         }
+ 
+         scheduleState.incrementActiveThreadCount();
+ 
+         final long startNanos = System.nanoTime();
+         final long finishNanos = startNanos + batchNanos;
+         int invocationCount = 0;
+         try {
+             try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
+                 while (shouldRun) {
+                     procNode.onTrigger(processContext, sessionFactory);
+                     invocationCount++;
+ 
+                     if (!batch) {
+                         return;
+                     }
+ 
+                     if (System.nanoTime() > finishNanos) {
+                         return;
+                     }
+ 
+                     shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
+                     shouldRun = shouldRun && (procNode.getYieldExpiration() < System.currentTimeMillis());
+ 
+                     if (shouldRun && numRelationships > 0) {
+                         final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
+                         shouldRun = context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships);
+                     }
+                 }
+             } catch (final ProcessException pe) {
+                 final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
+                 procLog.error("Failed to process session due to {}", new Object[]{pe});
+             } catch (final Throwable t) {
+                 // Use ProcessorLog to log the event so that a bulletin will be created for this processor
+                 final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
+                 procLog.error("{} failed to process session due to {}", new Object[]{procNode.getProcessor(), t});
+                 procLog.warn("Processor Administratively Yielded for {} due to processing failure", new Object[]{schedulingAgent.getAdministrativeYieldDuration()});
+                 logger.warn("Administratively Yielding {} due to uncaught Exception: {}", procNode.getProcessor(), t.toString());
+                 logger.warn("", t);
+ 
+                 procNode.yield(schedulingAgent.getAdministrativeYieldDuration(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+             }
+         } finally {
+             if (batch) {
+                 rawSession.commit();
+             }
+ 
+             final long processingNanos = System.nanoTime() - startNanos;
+ 
+             // if the processor is no longer scheduled to run and this is the last thread,
+             // invoke the OnStopped methods
+             if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
+                 try (final NarCloseable x = NarCloseable.withNarLoader()) {
 -                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, procNode.getProcessor(), processContext);
++                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, procNode.getProcessor(), processContext);
+                     flowController.heartbeat();
+                 }
+             }
+ 
+             scheduleState.decrementActiveThreadCount();
+ 
+             try {
+                 final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(procNode.getIdentifier());
+                 procEvent.setProcessingNanos(processingNanos);
+                 procEvent.setInvocations(invocationCount);
+                 context.getFlowFileEventRepository().updateRepository(procEvent);
+             } catch (final IOException e) {
+                 logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", procNode.getProcessor(), e.toString());
+                 logger.error("", e);
+             }
+         }
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
index 0000000,36aa9dd..9b70581
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
@@@ -1,0 -1,63 +1,63 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.controller.tasks;
+ 
++import org.apache.nifi.annotation.lifecycle.OnStopped;
+ import org.apache.nifi.controller.ReportingTaskNode;
+ import org.apache.nifi.controller.scheduling.ScheduleState;
+ import org.apache.nifi.nar.NarCloseable;
 -import org.apache.nifi.processor.annotation.OnStopped;
+ import org.apache.nifi.util.ReflectionUtils;
 -
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public class ReportingTaskWrapper implements Runnable {
+ 
+     private static final Logger logger = LoggerFactory.getLogger(ReportingTaskWrapper.class);
+ 
+     private final ReportingTaskNode taskNode;
+     private final ScheduleState scheduleState;
+ 
+     public ReportingTaskWrapper(final ReportingTaskNode taskNode, final ScheduleState scheduleState) {
+         this.taskNode = taskNode;
+         this.scheduleState = scheduleState;
+     }
+ 
++    @SuppressWarnings("deprecation")
+     @Override
+     public synchronized void run() {
+         scheduleState.incrementActiveThreadCount();
+         try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+             taskNode.getReportingTask().onTrigger(taskNode.getReportingContext());
+         } catch (final Throwable t) {
+             logger.error("Error running task {} due to {}", taskNode.getReportingTask(), t.toString());
+             if (logger.isDebugEnabled()) {
+                 logger.error("", t);
+             }
+         } finally {
+             // if the processor is no longer scheduled to run and this is the last thread,
+             // invoke the OnStopped methods
+             if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
+                 try (final NarCloseable x = NarCloseable.withNarLoader()) {
 -                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, taskNode.getReportingTask(), taskNode.getReportingContext());
++                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, taskNode.getReportingTask(), taskNode.getReportingContext());
+                 }
+             }
+ 
+             scheduleState.decrementActiveThreadCount();
+         }
+     }
+ 
+ }


Mime
View raw message