nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [16/50] [abbrv] incubator-nifi git commit: NIFI-362: Avoid continually scheduling components to run if there is no work for them to do or if they are yielded
Date Mon, 02 Mar 2015 04:03:55 GMT
NIFI-362: Avoid continually scheduling components to run if there is no work for them to do
or if they are yielded


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4cc106a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4cc106a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4cc106a5

Branch: refs/heads/NIFI-360
Commit: 4cc106a54d9b6528e38cb99ecb15524a07a1f0c9
Parents: dde5fd5
Author: Mark Payne <markap14@hotmail.com>
Authored: Sun Feb 22 10:53:24 2015 -0500
Committer: Mark Payne <markap14@hotmail.com>
Committed: Sun Feb 22 10:53:24 2015 -0500

----------------------------------------------------------------------
 .../apache/nifi/controller/StandardFunnel.java  |   2 +-
 .../scheduling/QuartzSchedulingAgent.java       |  21 +++-
 .../controller/scheduling/ScheduleState.java    |  18 ++--
 .../scheduling/TimerDrivenSchedulingAgent.java  | 105 ++++++++++++++++---
 .../tasks/ContinuallyRunConnectableTask.java    |  32 ++++--
 .../tasks/ContinuallyRunProcessorTask.java      |  32 +++---
 6 files changed, 163 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
index e34e043..3bdfd20 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
@@ -94,7 +94,7 @@ public class StandardFunnel implements Funnel {
         position = new AtomicReference<>(new Position(0D, 0D));
         scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
         penalizationPeriod = new AtomicReference<>("30 sec");
-        yieldPeriod = new AtomicReference<>("1 sec");
+        yieldPeriod = new AtomicReference<>("250 millis");
         yieldExpiration = new AtomicLong(0L);
         schedulingPeriod = new AtomicReference<>("0 millis");
         schedulingNanos = new AtomicLong(30000);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
index ea67492..3355e73 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
@@ -21,6 +21,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -34,8 +35,9 @@ import org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask;
 import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.processor.StandardProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.FormatUtils;
-
 import org.quartz.CronExpression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,13 +132,16 @@ public class QuartzSchedulingAgent implements SchedulingAgent {
 
         final List<AtomicBoolean> triggers = new ArrayList<>();
         for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
-            final Runnable continuallyRunTask;
+            final Callable<Boolean> continuallyRunTask;
             if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
                 final ProcessorNode procNode = (ProcessorNode) connectable;
-                ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this,
procNode, flowController, contextFactory, scheduleState, encryptor);
+                
+                final StandardProcessContext standardProcContext = new StandardProcessContext(procNode,
flowController, encryptor);
+                ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this,
procNode, flowController, contextFactory, scheduleState, standardProcContext);
                 continuallyRunTask = runnableTask;
             } else {
-                continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable,
scheduleState, encryptor);
+                final ConnectableProcessContext connProcContext = new ConnectableProcessContext(connectable,
encryptor);
+                continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable,
scheduleState, connProcContext);
             }
 
             final AtomicBoolean canceled = new AtomicBoolean(false);
@@ -147,7 +152,13 @@ public class QuartzSchedulingAgent implements SchedulingAgent {
                         return;
                     }
 
-                    continuallyRunTask.run();
+                    try {
+                        continuallyRunTask.call();
+                    } catch (final RuntimeException re) {
+                        throw re;
+                    } catch (final Exception e) {
+                        throw new ProcessException(e);
+                    }
 
                     if (canceled.get()) {
                         return;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
index eb5a437..ff17912 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
@@ -16,9 +16,10 @@
  */
 package org.apache.nifi.controller.scheduling;
 
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -27,7 +28,7 @@ public class ScheduleState {
 
     private final AtomicInteger activeThreadCount = new AtomicInteger(0);
     private final AtomicBoolean scheduled = new AtomicBoolean(false);
-    private final List<ScheduledFuture<?>> futures = new ArrayList<>();
+    private final Set<ScheduledFuture<?>> futures = new HashSet<ScheduledFuture<?>>();
     private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false);
     private volatile long lastStopTime = -1;
 
@@ -79,12 +80,17 @@ public class ScheduleState {
      *
      * @param newFutures
      */
-    public void setFutures(final List<ScheduledFuture<?>> newFutures) {
+    public synchronized void setFutures(final Collection<ScheduledFuture<?>>
newFutures) {
         futures.clear();
         futures.addAll(newFutures);
     }
 
-    public List<ScheduledFuture<?>> getFutures() {
-        return Collections.unmodifiableList(futures);
+    public synchronized void replaceFuture(final ScheduledFuture<?> oldFuture, final
ScheduledFuture<?> newFuture) {
+        futures.remove(oldFuture);
+        futures.add(newFuture);
+    }
+    
+    public synchronized Set<ScheduledFuture<?>> getFutures() {
+        return Collections.unmodifiableSet(futures);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
index db06151..efa8acd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
@@ -18,8 +18,10 @@ package org.apache.nifi.controller.scheduling;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
@@ -31,15 +33,17 @@ import org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask;
 import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.StandardProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.FormatUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TimerDrivenSchedulingAgent implements SchedulingAgent {
-
     private static final Logger logger = LoggerFactory.getLogger(TimerDrivenSchedulingAgent.class);
-
+    private static final long NO_WORK_YIELD_NANOS = TimeUnit.MILLISECONDS.toNanos(10L);
+    
     private final FlowController flowController;
     private final FlowEngine flowEngine;
     private final ProcessContextFactory contextFactory;
@@ -72,20 +76,95 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
         logger.info("{} started.", taskNode.getReportingTask());
     }
 
+    
     @Override
     public void schedule(final Connectable connectable, final ScheduleState scheduleState)
{
-        final Runnable runnable;
-        if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
-            final ProcessorNode procNode = (ProcessorNode) connectable;
-            ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this,
procNode, flowController, contextFactory, scheduleState, encryptor);
-            runnable = runnableTask;
-        } else {
-            runnable = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState,
encryptor);
-        }
-
+        
         final List<ScheduledFuture<?>> futures = new ArrayList<>();
         for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
-            final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(runnable,
0L, connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+            final Callable<Boolean> continuallyRunTask;
+            final ProcessContext processContext;
+            
+            // Determine the task to run and create it.
+            if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
+                final ProcessorNode procNode = (ProcessorNode) connectable;
+                final StandardProcessContext standardProcContext = new StandardProcessContext(procNode,
flowController, encryptor);
+                final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this,
procNode, flowController, 
+                        contextFactory, scheduleState, standardProcContext);
+                
+                continuallyRunTask = runnableTask;
+                processContext = standardProcContext;
+            } else {
+                processContext = new ConnectableProcessContext(connectable, encryptor);
+                continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable,
scheduleState, processContext);
+            }
+            
+            final AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>();
+            
+            final Runnable yieldDetectionRunnable = new Runnable() {
+                @Override
+                public void run() {
+                    // Call the continually run task. It will return a boolean indicating
whether or not we should yield
+                    // based on a lack of work for to do for the component.
+                    final boolean shouldYield;
+                    try {
+                        shouldYield = continuallyRunTask.call();
+                    } catch (final RuntimeException re) {
+                        throw re;
+                    } catch (final Exception e) {
+                        throw new ProcessException(e);
+                    }
+                    
+                    // If the component is yielded, cancel its future and re-submit it to
run again
+                    // after the yield has expired.
+                    final long newYieldExpiration = connectable.getYieldExpiration();
+                    if ( newYieldExpiration > System.currentTimeMillis() ) {
+                        final long yieldMillis = System.currentTimeMillis() - newYieldExpiration;
+                        final ScheduledFuture<?> scheduledFuture = futureRef.get();
+                        if ( scheduledFuture == null ) {
+                            return;
+                        }
+                        
+                        // If we are able to cancel the future, create a new one and update
the ScheduleState so that it has
+                        // an accurate accounting of which futures are outstanding; we must
then also update the futureRef
+                        // so that we can do this again the next time that the component
is yielded.
+                        if (scheduledFuture.cancel(false)) {
+                            final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis);
+                            final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this,
yieldNanos, 
+                                    connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS),
TimeUnit.NANOSECONDS);
+                            scheduleState.replaceFuture(scheduledFuture, newFuture);
+                            futureRef.set(newFuture);
+                        }
+                    } else if ( shouldYield ) {
+                        // Component itself didn't yield but there was no work to do, so
the framework will choose
+                        // to yield the component automatically for a short period of time.
+                        final ScheduledFuture<?> scheduledFuture = futureRef.get();
+                        if ( scheduledFuture == null ) {
+                            return;
+                        }
+                        
+                        // If we are able to cancel the future, create a new one and update
the ScheduleState so that it has
+                        // an accurate accounting of which futures are outstanding; we must
then also update the futureRef
+                        // so that we can do this again the next time that the component
is yielded.
+                        if (scheduledFuture.cancel(false)) {
+                            final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this,
NO_WORK_YIELD_NANOS, 
+                                    connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS),
TimeUnit.NANOSECONDS);
+                            scheduleState.replaceFuture(scheduledFuture, newFuture);
+                            futureRef.set(newFuture);
+                        }
+                    }
+                }
+            };
+
+            // Schedule the task to run
+            final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(yieldDetectionRunnable,
0L, 
+                    connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+            
+            // now that we have the future, set the atomic reference so that if the component
is yielded we
+            // are able to then cancel this future.
+            futureRef.set(future);
+            
+            // Keep track of the futures so that we can update the ScheduleState.
             futures.add(future);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
index aca870b..408032c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
@@ -16,16 +16,16 @@
  */
 package org.apache.nifi.controller.tasks;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
-import org.apache.nifi.controller.scheduling.ConnectableProcessContext;
 import org.apache.nifi.controller.scheduling.ProcessContextFactory;
 import org.apache.nifi.controller.scheduling.ScheduleState;
-import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.Connectables;
@@ -33,28 +33,33 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ContinuallyRunConnectableTask implements Runnable {
+/**
+ * Continually runs a Connectable as long as the processor has work to do. {@link #call()}
will return
+ * <code>true</code> if the Connectable should be yielded, <code>false</code>
otherwise.
+ */
+public class ContinuallyRunConnectableTask implements Callable<Boolean> {
 
     private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunConnectableTask.class);
 
     private final Connectable connectable;
     private final ScheduleState scheduleState;
     private final ProcessSessionFactory sessionFactory;
-    private final ConnectableProcessContext processContext;
+    private final ProcessContext processContext;
 
-    public ContinuallyRunConnectableTask(final ProcessContextFactory contextFactory, final
Connectable connectable, final ScheduleState scheduleState, final StringEncryptor encryptor)
{
+    public ContinuallyRunConnectableTask(final ProcessContextFactory contextFactory, final
Connectable connectable, final ScheduleState scheduleState, final ProcessContext processContext)
{
         this.connectable = connectable;
         this.scheduleState = scheduleState;
         this.sessionFactory = new StandardProcessSessionFactory(contextFactory.newProcessContext(connectable,
new AtomicLong(0L)));
-        this.processContext = new ConnectableProcessContext(connectable, encryptor);
+        this.processContext = processContext;
     }
 
-    @SuppressWarnings("deprecation")
     @Override
-    public void run() {
+    @SuppressWarnings("deprecation")
+    public Boolean call() {
         if (!scheduleState.isScheduled()) {
-            return;
+            return false;
         }
+        
         // Connectable should run if the following conditions are met:
         // 1. It's an Input Port or or is a Remote Input Port or has incoming FlowFiles queued
         // 2. Any relationship is available (since there's only 1
@@ -62,8 +67,9 @@ public class ContinuallyRunConnectableTask implements Runnable {
         // it means the same thing)
         // 3. It is not yielded.
         final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty();
+        boolean flowFilesQueued = true;
         final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis())
-                && (triggerWhenEmpty || Connectables.flowFilesQueued(connectable))
&& (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable));
+                && (triggerWhenEmpty || (flowFilesQueued = Connectables.flowFilesQueued(connectable)))
&& (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable));
 
         if (shouldRun) {
             scheduleState.incrementActiveThreadCount();
@@ -92,6 +98,12 @@ public class ContinuallyRunConnectableTask implements Runnable {
 
                 scheduleState.decrementActiveThreadCount();
             }
+        } else if (!flowFilesQueued) {
+            // FlowFiles must be queued in order to run but there are none queued;
+            // yield for just a bit.
+            return true;
         }
+        
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index 33bd327..f4be855 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller.tasks;
 
 import java.io.IOException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -31,7 +32,6 @@ import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
 import org.apache.nifi.controller.scheduling.ProcessContextFactory;
 import org.apache.nifi.controller.scheduling.ScheduleState;
 import org.apache.nifi.controller.scheduling.SchedulingAgent;
-import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.ProcessSessionFactory;
@@ -43,7 +43,12 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ContinuallyRunProcessorTask implements Runnable {
+
+/**
+ * Continually runs a processor as long as the processor has work to do. {@link #call()}
will return
+ * <code>true</code> if the processor should be yielded, <code>false</code>
otherwise.
+ */
+public class ContinuallyRunProcessorTask implements Callable<Boolean> {
 
     private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunProcessorTask.class);
 
@@ -56,7 +61,8 @@ public class ContinuallyRunProcessorTask implements Runnable {
     private final int numRelationships;
 
     public ContinuallyRunProcessorTask(final SchedulingAgent schedulingAgent, final ProcessorNode
procNode,
-            final FlowController flowController, final ProcessContextFactory contextFactory,
final ScheduleState scheduleState, final StringEncryptor encryptor) {
+            final FlowController flowController, final ProcessContextFactory contextFactory,
final ScheduleState scheduleState, 
+            final StandardProcessContext processContext) {
 
         this.schedulingAgent = schedulingAgent;
         this.procNode = procNode;
@@ -65,28 +71,28 @@ public class ContinuallyRunProcessorTask implements Runnable {
         this.flowController = flowController;
 
         context = contextFactory.newProcessContext(procNode, new AtomicLong(0L));
-        this.processContext = new StandardProcessContext(procNode, flowController, encryptor);
+        this.processContext = processContext;
     }
 
-    @SuppressWarnings("deprecation")
     @Override
-    public void run() {
+    @SuppressWarnings("deprecation")
+    public Boolean call() {
         // make sure processor is not yielded
         boolean shouldRun = (procNode.getYieldExpiration() < System.currentTimeMillis());
         if (!shouldRun) {
-            return;
+            return false;
         }
 
         // make sure that either we're not clustered or this processor runs on all nodes
or that this is the primary node
         shouldRun = !procNode.isIsolated() || !flowController.isClustered() || flowController.isPrimary();
         if (!shouldRun) {
-            return;
+            return false;
         }
 
         // make sure that either proc has incoming FlowFiles or has no incoming connections
or is annotated with @TriggerWhenEmpty
         shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() ||
Connectables.flowFilesQueued(procNode);
         if (!shouldRun) {
-            return;
+            return true;
         }
 
         if (numRelationships > 0) {
@@ -109,7 +115,7 @@ public class ContinuallyRunProcessorTask implements Runnable {
         }
 
         if (!shouldRun) {
-            return;
+            return false;
         }
 
         scheduleState.incrementActiveThreadCount();
@@ -124,11 +130,11 @@ public class ContinuallyRunProcessorTask implements Runnable {
                     invocationCount++;
 
                     if (!batch) {
-                        return;
+                        return false;
                     }
 
                     if (System.nanoTime() > finishNanos) {
-                        return;
+                        return false;
                     }
 
                     shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection()
|| Connectables.flowFilesQueued(procNode);
@@ -180,6 +186,8 @@ public class ContinuallyRunProcessorTask implements Runnable {
                 logger.error("", e);
             }
         }
+        
+        return false;
     }
 
 }


Mime
View raw message