nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [04/11] incubator-nifi git commit: NIFI-271 checkpoint
Date Wed, 22 Apr 2015 15:46:47 GMT
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index 8deda3c..65756f4 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -103,7 +103,7 @@ public class VolatileContentRepository implements ContentRepository {
     private final ConcurrentMap<ContentClaim, ContentClaim> backupRepoClaimMap = new ConcurrentHashMap<>(256);
     private final AtomicReference<ContentRepository> backupRepositoryRef = new AtomicReference<>(null);
 
-    private ContentClaimManager claimManager;	// effectively final
+    private ContentClaimManager claimManager; // effectively final
 
     public VolatileContentRepository() {
         this(NiFiProperties.getInstance());
@@ -137,7 +137,7 @@ public class VolatileContentRepository implements ContentRepository {
     public void initialize(final ContentClaimManager claimManager) {
         this.claimManager = claimManager;
     }
-    
+
     @Override
     public void shutdown() {
         executor.shutdown();
@@ -147,7 +147,7 @@ public class VolatileContentRepository implements ContentRepository {
      * Specifies a Backup Repository where data should be written if this
      * Repository fills up
      *
-     * @param backup
+     * @param backup repo backup
      */
     public void setBackupRepository(final ContentRepository backup) {
         final boolean updated = backupRepositoryRef.compareAndSet(null, backup);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
index a7020a6..9e429d6 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
@@ -34,7 +34,7 @@ import org.apache.nifi.controller.repository.claim.ContentClaimManager;
 public class VolatileFlowFileRepository implements FlowFileRepository {
 
     private final AtomicLong idGenerator = new AtomicLong(0L);
-    private ContentClaimManager claimManager;	// effectively final
+    private ContentClaimManager claimManager; // effectively final
 
     @Override
     public void initialize(final ContentClaimManager claimManager) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 292c258..0779c4d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -102,7 +102,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
     // synced with disk.
     //
     // This is required due to the following scenario, which could exist if we did not do this:
-    // 
+    //
     // A Processor modifies a FlowFile (whose content is in ContentClaim A), writing the new content to ContentClaim B.
     // The processor removes ContentClaim A, which deletes the backing file.
     // The FlowFile Repository writes out this change but has not yet synced the update to disk.
@@ -112,12 +112,12 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
     // ContentClaim A does not exist anymore because the Session Commit destroyed the data.
     // This results in Data Loss!
     // However, the comment in the class's JavaDocs regarding sync'ing should also be considered.
-    // 
+    //
     // In order to avoid this, instead of destroying ContentClaim A, the ProcessSession puts the claim on the Claim Destruction Queue.
     // We periodically force a sync of the FlowFile Repository to the backing storage mechanism.
     // We can then destroy the data. If we end up syncing the FlowFile Repository to the backing storage mechanism and then restart
     // before the data is destroyed, it's okay because the data will be unknown to the Content Repository, so it will be destroyed
-    // on restart. 
+    // on restart.
     private final ConcurrentMap<Integer, BlockingQueue<ContentClaim>> claimsAwaitingDestruction = new ConcurrentHashMap<>();
 
     public WriteAheadFlowFileRepository() {
@@ -129,7 +129,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         flowFileRepositoryPath = properties.getFlowFileRepositoryPath();
         numPartitions = properties.getFlowFileRepositoryPartitions();
         checkpointDelayMillis = FormatUtils.getTimeDuration(properties.getFlowFileRepositoryCheckpointInterval(), TimeUnit.MILLISECONDS);
-        
+
         checkpointExecutor = Executors.newSingleThreadScheduledExecutor();
     }
 
@@ -267,9 +267,9 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
      * the specified Swap File and returns the number of FlowFiles that were
      * persisted.
      *
-     * @param queue
-     * @param swapLocation
-     * @throws IOException
+     * @param queue queue to swap out
+     * @param swapLocation location to swap to
+     * @throws IOException ioe
      */
     @Override
     public void swapFlowFilesOut(final List<FlowFileRecord> swappedOut, final FlowFileQueue queue, final String swapLocation) throws IOException {
@@ -289,14 +289,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{swappedOut.size(), queue, swapLocation});
     }
 
-    /**
-     * Swaps FlowFiles into memory space from the given Swap File
-     *
-     * @param swapLocation
-     * @param swapRecords
-     * @param queue
-     * @throws IOException
-     */
     @Override
     public void swapFlowFilesIn(final String swapLocation, final List<FlowFileRecord> swapRecords, final FlowFileQueue queue) throws IOException {
         final List<RepositoryRecord> repoRecords = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
index c43f3fe..54a1b2c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger;
  * <p>
  * Must be thread safe</p>
  *
- * @author none
  */
 public final class StandardContentClaim implements ContentClaim, Comparable<ContentClaim> {
 
@@ -38,14 +37,6 @@ public final class StandardContentClaim implements ContentClaim, Comparable<Cont
     private final AtomicInteger claimantCount = new AtomicInteger(0);
     private final int hashCode;
 
-    /**
-     * Constructs a content claim
-     *
-     * @param container
-     * @param section
-     * @param id
-     * @param lossTolerant
-     */
     StandardContentClaim(final String container, final String section, final String id, final boolean lossTolerant) {
         this.container = container.intern();
         this.section = section.intern();
@@ -100,7 +91,7 @@ public final class StandardContentClaim implements ContentClaim, Comparable<Cont
      * Provides the natural ordering for ContentClaim objects. By default they
      * are sorted by their id, then container, then section
      *
-     * @param other
+     * @param other other claim
      * @return x such that x <=1 if this is less than other;
      * x=0 if this.equals(other);
      * x >= 1 if this is greater than other

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
index 43bbb5a..b68f95e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
@@ -34,16 +34,6 @@ public class StandardContentClaimManager implements ContentClaimManager {
 
     private static final BlockingQueue<ContentClaim> destructableClaims = new LinkedBlockingQueue<>(50000);
 
-    /**
-     * Creates a new Content Claim with the given id, container, section, and
-     * loss tolerance.
-     *
-     * @param id
-     * @param container
-     * @param section
-     * @param lossTolerant
-     * @return
-     */
     @Override
     public ContentClaim newContentClaim(final String container, final String section, final String id, final boolean lossTolerant) {
         return new StandardContentClaim(container, section, id, lossTolerant);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java
index f7da136..4e727e9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java
@@ -40,9 +40,7 @@ public class ByteCountingOutputStream extends OutputStream {
         write(b, 0, b.length);
     }
 
-    ;
-	
-	@Override
+    @Override
     public void write(byte[] b, int off, int len) throws IOException {
         out.write(b, off, len);
         bytesWrittenHolder.increment(len);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/FlowFileAccessInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/FlowFileAccessInputStream.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/FlowFileAccessInputStream.java
index f349887..a710070 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/FlowFileAccessInputStream.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/FlowFileAccessInputStream.java
@@ -56,10 +56,8 @@ public class FlowFileAccessInputStream extends FilterInputStream {
     }
 
     /**
-     * Returns the ContentNotFoundException that was thrown by this stream, or
-     * <code>null</code> if no such Exception was thrown.
-     *
-     * @return
+     * @return the ContentNotFoundException that was thrown by this stream, or
+     * <code>null</code> if no such Exception was thrown
      */
     public ContentNotFoundException getContentNotFoundException() {
         return thrown;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
index acb3a01..01285b0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
@@ -179,14 +179,14 @@ public class ConnectableProcessContext implements ProcessContext {
 
     @Override
     public Set<Relationship> getAvailableRelationships() {
-        for ( final Connection connection : connectable.getConnections() ) {
-            if ( connection.getFlowFileQueue().isFull() ) {
+        for (final Connection connection : connectable.getConnections()) {
+            if (connection.getFlowFileQueue().isFull()) {
                 return Collections.emptySet();
             }
         }
-        
+
         final Collection<Relationship> relationships = connectable.getRelationships();
-        if ( relationships instanceof Set ) {
+        if (relationships instanceof Set) {
             return (Set<Relationship>) relationships;
         }
         return new HashSet<>(connectable.getRelationships());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 7455bf8..77ae686 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -65,7 +65,8 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent {
     private final ConcurrentMap<Connectable, AtomicLong> connectionIndexMap = new ConcurrentHashMap<>();
     private final ConcurrentMap<Connectable, ScheduleState> scheduleStates = new ConcurrentHashMap<>();
 
-    public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider flowController, final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) {
+    public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider flowController,
+            final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) {
         this.flowEngine = flowEngine;
         this.controllerServiceProvider = flowController;
         this.workerQueue = workerQueue;
@@ -265,15 +266,15 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent {
         private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) {
             final int newThreadCount = scheduleState.incrementActiveThreadCount();
             if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
-                 // its possible that the worker queue could give us a worker node that is eligible to run based
-                 // on the number of threads but another thread has already incremented the thread count, result in
-                 // reaching the maximum number of threads. we won't know this until we atomically increment the thread count 
-                 // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
-                 // result in using more than the maximum number of defined threads
-                 scheduleState.decrementActiveThreadCount();
-                 return;
+                // its possible that the worker queue could give us a worker node that is eligible to run based
+                // on the number of threads but another thread has already incremented the thread count, result in
+                // reaching the maximum number of threads. we won't know this until we atomically increment the thread count
+                // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
+                // result in using more than the maximum number of defined threads
+                scheduleState.decrementActiveThreadCount();
+                return;
             }
-            
+
             try {
                 try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
                     worker.onTrigger(processContext, sessionFactory);
@@ -302,18 +303,19 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent {
             }
         }
 
-        private void trigger(final ProcessorNode worker, final ProcessContext context, final ScheduleState scheduleState, final StandardProcessContext processContext, final ProcessSessionFactory sessionFactory) {
+        private void trigger(final ProcessorNode worker, final ProcessContext context, final ScheduleState scheduleState,
+                final StandardProcessContext processContext, final ProcessSessionFactory sessionFactory) {
             final int newThreadCount = scheduleState.incrementActiveThreadCount();
             if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
-                 // its possible that the worker queue could give us a worker node that is eligible to run based
-                 // on the number of threads but another thread has already incremented the thread count, result in
-                 // reaching the maximum number of threads. we won't know this until we atomically increment the thread count 
-                 // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
-                 // result in using more than the maximum number of defined threads
-                 scheduleState.decrementActiveThreadCount();
-                 return;
+                // its possible that the worker queue could give us a worker node that is eligible to run based
+                // on the number of threads but another thread has already incremented the thread count, result in
+                // reaching the maximum number of threads. we won't know this until we atomically increment the thread count
+                // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
+                // result in using more than the maximum number of defined threads
+                scheduleState.decrementActiveThreadCount();
+                return;
             }
-            
+
             try {
                 try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
                     worker.onTrigger(processContext, sessionFactory);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
index 3355e73..4278cee 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
@@ -135,7 +135,7 @@ public class QuartzSchedulingAgent implements SchedulingAgent {
             final Callable<Boolean> continuallyRunTask;
             if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
                 final ProcessorNode procNode = (ProcessorNode) connectable;
-                
+
                 final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor);
                 ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext);
                 continuallyRunTask = runnableTask;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
index ff17912..cb7f55f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
@@ -28,7 +28,7 @@ public class ScheduleState {
 
     private final AtomicInteger activeThreadCount = new AtomicInteger(0);
     private final AtomicBoolean scheduled = new AtomicBoolean(false);
-    private final Set<ScheduledFuture<?>> futures = new HashSet<ScheduledFuture<?>>();
+    private final Set<ScheduledFuture<?>> futures = new HashSet<>();
     private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false);
     private volatile long lastStopTime = -1;
 
@@ -78,7 +78,7 @@ public class ScheduleState {
      * Establishes the list of relevant futures for this processor. Replaces any
      * previously held futures.
      *
-     * @param newFutures
+     * @param newFutures futures
      */
     public synchronized void setFutures(final Collection<ScheduledFuture<?>> newFutures) {
         futures.clear();
@@ -89,7 +89,7 @@ public class ScheduleState {
         futures.remove(oldFuture);
         futures.add(newFuture);
     }
-    
+
     public synchronized Set<ScheduledFuture<?>> getFutures() {
         return Collections.unmodifiableSet(futures);
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 7725823..bb565cb 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -147,12 +147,11 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                 LOG.error("", t);
             }
         }
-        
+
         frameworkTaskExecutor.shutdown();
         componentLifeCycleThreadPool.shutdown();
     }
 
-    
     @Override
     public void schedule(final ReportingTaskNode taskNode) {
         final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
@@ -184,13 +183,13 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                         try (final NarCloseable x = NarCloseable.withNarLoader()) {
                             ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
                         }
-                        
+
                         break;
                     } catch (final Exception e) {
                         final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
                         final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
                         componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
-                        
+
                         LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
                                 new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
                         try {
@@ -208,20 +207,19 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         taskNode.setScheduledState(ScheduledState.RUNNING);
     }
 
-    
     @Override
     public void unschedule(final ReportingTaskNode taskNode) {
         final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
         if (!scheduleState.isScheduled()) {
             return;
         }
-        
+
         taskNode.verifyCanStop();
         final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
         final ReportingTask reportingTask = taskNode.getReportingTask();
         scheduleState.setScheduled(false);
         taskNode.setScheduledState(ScheduledState.STOPPED);
-        
+
         final Runnable unscheduleReportingTaskRunnable = new Runnable() {
             @SuppressWarnings("deprecation")
             @Override
@@ -240,7 +238,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                     LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
                             reportingTask, cause.toString(), administrativeYieldDuration);
                     LOG.error("", cause);
-                    
+
                     try {
                         Thread.sleep(administrativeYieldMillis);
                     } catch (final InterruptedException ie) {
@@ -293,32 +291,32 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                     final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor);
 
                     final Set<String> serviceIds = new HashSet<>();
-                    for ( final PropertyDescriptor descriptor : processContext.getProperties().keySet() ) {
+                    for (final PropertyDescriptor descriptor : processContext.getProperties().keySet()) {
                         final Class<? extends ControllerService> serviceDefinition = descriptor.getControllerServiceDefinition();
-                        if ( serviceDefinition != null ) {
+                        if (serviceDefinition != null) {
                             final String serviceId = processContext.getProperty(descriptor).getValue();
-                            if ( serviceId != null ) {
-                            	serviceIds.add(serviceId);
+                            if (serviceId != null) {
+                                serviceIds.add(serviceId);
                             }
                         }
                     }
-                    
-                    attemptOnScheduled: while (true) {
+
+                    attemptOnScheduled:
+                    while (true) {
                         try {
                             synchronized (scheduleState) {
-                                for ( final String serviceId : serviceIds ) {
+                                for (final String serviceId : serviceIds) {
                                     final boolean enabled = processContext.isControllerServiceEnabled(serviceId);
-                                    if ( !enabled ) {
+                                    if (!enabled) {
                                         LOG.debug("Controller Service with ID {} is not yet enabled, so will not start {} yet", serviceId, procNode);
                                         Thread.sleep(administrativeYieldMillis);
                                         continue attemptOnScheduled;
                                     }
                                 }
-                                
+
                                 // if no longer scheduled to run, then we're finished. This can happen, for example,
-                                // if the @OnScheduled method throws an Exception and the user stops the processor 
+                                // if the @OnScheduled method throws an Exception and the user stops the processor
                                 // while we're administratively yielded.
-                                // 
                                 // we also check if the schedule state's last start time is equal to what it was before.
                                 // if not, then means that the processor has been stopped and started again, so we should just
                                 // bail; another thread will be responsible for invoking the @OnScheduled methods.
@@ -363,12 +361,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         componentLifeCycleThreadPool.execute(startProcRunnable);
     }
 
-    /**
-     * Used to delay scheduling the given Processor to run until its yield
-     * duration expires.
-     *
-     * @param procNode
-     */
     @Override
     public void yield(final ProcessorNode procNode) {
         // This exists in the ProcessScheduler so that the scheduler can take advantage of the fact that
@@ -381,7 +373,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         // the context. If this Processor has X number of threads, we end up submitting X new tasks while the previous
         // X-1 tasks are still running. At this point, another thread could finish and do the same thing, resulting in
         // an additional X-1 extra tasks being submitted.
-        // 
+        //
         // As a result, we simply removed this buggy implementation, as it was a very minor performance optimization
         // that gave very bad results.
     }
@@ -431,24 +423,11 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         getSchedulingAgent(worker).onEvent(worker);
     }
 
-    /**
-     * Returns the number of threads that are currently active for the given
-     * <code>Connectable</code>.
-     *
-     * @return
-     */
     @Override
     public int getActiveThreadCount(final Object scheduled) {
         return getScheduleState(scheduled).getActiveThreadCount();
     }
 
-    /**
-     * Begins scheduling the given port to run.
-     *
-     * @throws NullPointerException if the Port is null
-     * @throws IllegalStateException if the Port is already scheduled to run or
-     * has threads running
-     */
     @Override
     public void startPort(final Port port) {
         if (!port.isValid()) {
@@ -501,7 +480,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         if (!state.isScheduled()) {
             return;
         }
-        
+
         state.setScheduled(false);
         getSchedulingAgent(connectable).unschedule(connectable, state);
 
@@ -561,7 +540,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         if (procNode.getScheduledState() != ScheduledState.DISABLED) {
             throw new IllegalStateException("Processor cannot be enabled because it is not disabled");
         }
-        
+
         procNode.setScheduledState(ScheduledState.STOPPED);
     }
 
@@ -570,21 +549,22 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         if (procNode.getScheduledState() != ScheduledState.STOPPED) {
             throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState());
         }
-        
+
         procNode.setScheduledState(ScheduledState.DISABLED);
     }
 
     public synchronized void enableReportingTask(final ReportingTaskNode taskNode) {
-        if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) {
+        if (taskNode.getScheduledState() != ScheduledState.DISABLED) {
             throw new IllegalStateException("Reporting Task cannot be enabled because it is not disabled");
         }
 
         taskNode.setScheduledState(ScheduledState.STOPPED);
     }
-    
+
     public synchronized void disableReportingTask(final ReportingTaskNode taskNode) {
-        if ( taskNode.getScheduledState() != ScheduledState.STOPPED ) {
-            throw new IllegalStateException("Reporting Task cannot be disabled because its state is set to " + taskNode.getScheduledState() + " but transition to DISABLED state is allowed only from the STOPPED state");
+        if (taskNode.getScheduledState() != ScheduledState.STOPPED) {
+            throw new IllegalStateException("Reporting Task cannot be disabled because its state is set to " + taskNode.getScheduledState()
+                    + " but transition to DISABLED state is allowed only from the STOPPED state");
         }
 
         taskNode.setScheduledState(ScheduledState.DISABLED);
@@ -597,12 +577,12 @@ public final class StandardProcessScheduler implements ProcessScheduler {
     }
 
     /**
-     * Returns the ScheduleState that is registered for the given component;
-     * if no ScheduleState current is registered, one is created and registered
+     * Returns the ScheduleState that is registered for the given component; if
+     * no ScheduleState current is registered, one is created and registered
      * atomically, and then that value is returned.
      *
-     * @param schedulable
-     * @return
+     * @param schedulable schedulable
+     * @return scheduled state
      */
     private ScheduleState getScheduleState(final Object schedulable) {
         ScheduleState scheduleState = scheduleStates.get(schedulable);
@@ -620,21 +600,21 @@ public final class StandardProcessScheduler implements ProcessScheduler {
     public void enableControllerService(final ControllerServiceNode service) {
         service.setState(ControllerServiceState.ENABLING);
         final ScheduleState scheduleState = getScheduleState(service);
-        
+
         final Runnable enableRunnable = new Runnable() {
             @Override
             public void run() {
                 try (final NarCloseable x = NarCloseable.withNarLoader()) {
                     long lastStopTime = scheduleState.getLastStopTime();
                     final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider);
-                    
+
                     while (true) {
                         try {
                             synchronized (scheduleState) {
                                 // if no longer enabled, then we're finished. This can happen, for example,
                                 // if the @OnEnabled method throws an Exception and the user disables the service
                                 // while we're administratively yielded.
-                                // 
+                                //
                                 // we also check if the schedule state's last stop time is equal to what it was before.
                                 // if not, then means that the service has been disabled and enabled again, so we should just
                                 // bail; another thread will be responsible for invoking the @OnEnabled methods.
@@ -649,11 +629,11 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                             }
                         } catch (final Exception e) {
                             final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
-                            
+
                             final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
                             componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
                             LOG.error("Failed to invoke @OnEnabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
-                            if ( LOG.isDebugEnabled() ) {
+                            if (LOG.isDebugEnabled()) {
                                 LOG.error("", cause);
                             }
 
@@ -666,15 +646,15 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                     final Throwable cause = (t instanceof InvocationTargetException) ? t.getCause() : t;
                     final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
                     componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
-                    
+
                     LOG.error("Failed to invoke @OnEnabled method on {} due to {}", service.getControllerServiceImplementation(), cause.toString());
-                    if ( LOG.isDebugEnabled() ) {
+                    if (LOG.isDebugEnabled()) {
                         LOG.error("", cause);
                     }
                 }
             }
         };
-        
+
         scheduleState.setScheduled(true);
         componentLifeCycleThreadPool.execute(enableRunnable);
     }
@@ -682,7 +662,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
     @Override
     public void disableControllerService(final ControllerServiceNode service) {
         service.verifyCanDisable();
-        
+
         final ScheduleState state = getScheduleState(requireNonNull(service));
         final Runnable disableRunnable = new Runnable() {
             @Override
@@ -693,8 +673,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
 
                 try (final NarCloseable x = NarCloseable.withNarLoader()) {
                     final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider);
-                    
-                    while(true) {
+
+                    while (true) {
                         try {
                             ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
                             heartbeater.heartbeat();
@@ -704,17 +684,18 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                             final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
                             final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
                             componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
-                            
+
                             LOG.error("Failed to invoke @OnDisabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
-                            if ( LOG.isDebugEnabled() ) {
+                            if (LOG.isDebugEnabled()) {
                                 LOG.error("", cause);
                             }
-        
+
                             ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
                             try {
                                 Thread.sleep(administrativeYieldMillis);
-                            } catch (final InterruptedException ie) {}
-                            
+                            } catch (final InterruptedException ie) {
+                            }
+
                             continue;
                         }
                     }
@@ -723,6 +704,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         };
 
         service.setState(ControllerServiceState.DISABLING);
-        componentLifeCycleThreadPool.execute(disableRunnable);        
+        componentLifeCycleThreadPool.execute(disableRunnable);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
index f3eecbd..c4e6609 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
@@ -42,9 +42,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TimerDrivenSchedulingAgent implements SchedulingAgent {
+
     private static final Logger logger = LoggerFactory.getLogger(TimerDrivenSchedulingAgent.class);
     private final long noWorkYieldNanos;
-    
+
     private final FlowController flowController;
     private final FlowEngine flowEngine;
     private final ProcessContextFactory contextFactory;
@@ -57,7 +58,7 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
         this.flowEngine = flowEngine;
         this.contextFactory = contextFactory;
         this.encryptor = encryptor;
-        
+
         final String boredYieldDuration = NiFiProperties.getInstance().getBoredYieldDuration();
         try {
             noWorkYieldNanos = FormatUtils.getTimeDuration(boredYieldDuration, TimeUnit.NANOSECONDS);
@@ -84,31 +85,30 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
         logger.info("{} started.", taskNode.getReportingTask());
     }
 
-    
     @Override
     public void schedule(final Connectable connectable, final ScheduleState scheduleState) {
-        
+
         final List<ScheduledFuture<?>> futures = new ArrayList<>();
         for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
             final Callable<Boolean> continuallyRunTask;
             final ProcessContext processContext;
-            
+
             // Determine the task to run and create it.
             if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
                 final ProcessorNode procNode = (ProcessorNode) connectable;
                 final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor);
-                final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, 
+                final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController,
                         contextFactory, scheduleState, standardProcContext);
-                
+
                 continuallyRunTask = runnableTask;
                 processContext = standardProcContext;
             } else {
                 processContext = new ConnectableProcessContext(connectable, encryptor);
                 continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, processContext);
             }
-            
+
             final AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>();
-            
+
             final Runnable yieldDetectionRunnable = new Runnable() {
                 @Override
                 public void run() {
@@ -122,50 +122,50 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
                     } catch (final Exception e) {
                         throw new ProcessException(e);
                     }
-                    
+
                     // If the component is yielded, cancel its future and re-submit it to run again
                     // after the yield has expired.
                     final long newYieldExpiration = connectable.getYieldExpiration();
-                    if ( newYieldExpiration > System.currentTimeMillis() ) {
+                    if (newYieldExpiration > System.currentTimeMillis()) {
                         final long yieldMillis = System.currentTimeMillis() - newYieldExpiration;
                         final ScheduledFuture<?> scheduledFuture = futureRef.get();
-                        if ( scheduledFuture == null ) {
+                        if (scheduledFuture == null) {
                             return;
                         }
-                        
+
                         // If we are able to cancel the future, create a new one and update the ScheduleState so that it has
                         // an accurate accounting of which futures are outstanding; we must then also update the futureRef
                         // so that we can do this again the next time that the component is yielded.
                         if (scheduledFuture.cancel(false)) {
                             final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis);
-                            
+
                             synchronized (scheduleState) {
-                                if ( scheduleState.isScheduled() ) {
-                                    final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos, 
+                                if (scheduleState.isScheduled()) {
+                                    final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos,
                                             connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
-                                    
+
                                     scheduleState.replaceFuture(scheduledFuture, newFuture);
                                     futureRef.set(newFuture);
                                 }
                             }
                         }
-                    } else if ( noWorkYieldNanos > 0L && shouldYield ) {
+                    } else if (noWorkYieldNanos > 0L && shouldYield) {
                         // Component itself didn't yield but there was no work to do, so the framework will choose
                         // to yield the component automatically for a short period of time.
                         final ScheduledFuture<?> scheduledFuture = futureRef.get();
-                        if ( scheduledFuture == null ) {
+                        if (scheduledFuture == null) {
                             return;
                         }
-                        
+
                         // If we are able to cancel the future, create a new one and update the ScheduleState so that it has
                         // an accurate accounting of which futures are outstanding; we must then also update the futureRef
                         // so that we can do this again the next time that the component is yielded.
                         if (scheduledFuture.cancel(false)) {
                             synchronized (scheduleState) {
-                                if ( scheduleState.isScheduled() ) {
-                                    final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, noWorkYieldNanos, 
+                                if (scheduleState.isScheduled()) {
+                                    final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, noWorkYieldNanos,
                                             connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
-                                    
+
                                     scheduleState.replaceFuture(scheduledFuture, newFuture);
                                     futureRef.set(newFuture);
                                 }
@@ -176,13 +176,13 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
             };
 
             // Schedule the task to run
-            final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(yieldDetectionRunnable, 0L, 
+            final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(yieldDetectionRunnable, 0L,
                     connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
-            
+
             // now that we have the future, set the atomic reference so that if the component is yielded we
             // are able to then cancel this future.
             futureRef.set(future);
-            
+
             // Keep track of the futures so that we can update the ScheduleState.
             futures.add(future);
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 1fde670..92fa3b2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -43,15 +43,16 @@ import org.w3c.dom.Element;
 import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
-/**
- *
- */
 public class ControllerServiceLoader {
 
     private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class);
 
-
-    public static List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) throws IOException {
+    public static List<ControllerServiceNode> loadControllerServices(
+            final ControllerServiceProvider provider,
+            final InputStream serializedStream,
+            final StringEncryptor encryptor,
+            final BulletinRepository bulletinRepo,
+            final boolean autoResumeState) throws IOException {
         final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
         documentBuilderFactory.setNamespaceAware(true);
 
@@ -87,66 +88,70 @@ public class ControllerServiceLoader {
                     throw err;
                 }
             });
-            
+
             final Document document = builder.parse(in);
             final Element controllerServices = document.getDocumentElement();
             final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServices, "controllerService");
-            return new ArrayList<ControllerServiceNode>(loadControllerServices(serviceElements, provider, encryptor, bulletinRepo, autoResumeState));
+            return new ArrayList<>(loadControllerServices(serviceElements, provider, encryptor, bulletinRepo, autoResumeState));
         } catch (SAXException | ParserConfigurationException sxe) {
             throw new IOException(sxe);
         }
     }
-    
-    public static Collection<ControllerServiceNode> loadControllerServices(final List<Element> serviceElements, final ControllerServiceProvider provider, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) {
+
+    public static Collection<ControllerServiceNode> loadControllerServices(
+            final List<Element> serviceElements,
+            final ControllerServiceProvider provider,
+            final StringEncryptor encryptor,
+            final BulletinRepository bulletinRepo,
+            final boolean autoResumeState) {
         final Map<ControllerServiceNode, Element> nodeMap = new HashMap<>();
-        for ( final Element serviceElement : serviceElements ) {
+        for (final Element serviceElement : serviceElements) {
             final ControllerServiceNode serviceNode = createControllerService(provider, serviceElement, encryptor);
-            // We need to clone the node because it will be used in a separate thread below, and 
+            // We need to clone the node because it will be used in a separate thread below, and
             // Element is not thread-safe.
             nodeMap.put(serviceNode, (Element) serviceElement.cloneNode(true));
         }
-        for ( final Map.Entry<ControllerServiceNode, Element> entry : nodeMap.entrySet() ) {
+        for (final Map.Entry<ControllerServiceNode, Element> entry : nodeMap.entrySet()) {
             configureControllerService(entry.getKey(), entry.getValue(), encryptor);
         }
-        
+
         // Start services
-        if ( autoResumeState ) {
+        if (autoResumeState) {
             final Set<ControllerServiceNode> nodesToEnable = new HashSet<>();
-            
-            for ( final ControllerServiceNode node : nodeMap.keySet() ) {
+
+            for (final ControllerServiceNode node : nodeMap.keySet()) {
                 final Element controllerServiceElement = nodeMap.get(node);
 
                 final ControllerServiceDTO dto;
                 synchronized (controllerServiceElement.getOwnerDocument()) {
                     dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
                 }
-                
+
                 final ControllerServiceState state = ControllerServiceState.valueOf(dto.getState());
                 if (state == ControllerServiceState.ENABLED) {
                     nodesToEnable.add(node);
                 }
             }
-            
+
             provider.enableControllerServices(nodesToEnable);
         }
-        
+
         return nodeMap.keySet();
     }
-    
-    
+
     private static ControllerServiceNode createControllerService(final ControllerServiceProvider provider, final Element controllerServiceElement, final StringEncryptor encryptor) {
         final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
-        
+
         final ControllerServiceNode node = provider.createControllerService(dto.getType(), dto.getId(), false);
         node.setName(dto.getName());
         node.setComments(dto.getComments());
         return node;
     }
-    
+
     private static void configureControllerService(final ControllerServiceNode node, final Element controllerServiceElement, final StringEncryptor encryptor) {
         final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
         node.setAnnotationData(dto.getAnnotationData());
-        
+
         for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
             if (entry.getValue() == null) {
                 node.removeProperty(entry.getKey());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
index 8d46b05..02d6263 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
@@ -64,15 +64,15 @@ public class StandardControllerServiceInitializationContext implements Controlle
     public boolean isControllerServiceEnabled(final ControllerService service) {
         return serviceProvider.isControllerServiceEnabled(service);
     }
-    
+
     @Override
     public boolean isControllerServiceEnabling(String serviceIdentifier) {
         return serviceProvider.isControllerServiceEnabling(serviceIdentifier);
     }
-    
+
     @Override
     public String getControllerServiceName(final String serviceIdentifier) {
-    	return serviceProvider.getControllerServiceName(serviceIdentifier);
+        return serviceProvider.getControllerServiceName(serviceIdentifier);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index e768b9a..e577ffe 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -59,12 +59,11 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
         this.serviceProvider = serviceProvider;
     }
 
-    
     @Override
     public ControllerService getProxiedControllerService() {
         return proxedControllerService;
     }
-    
+
     @Override
     public ControllerService getControllerServiceImplementation() {
         return implementation;
@@ -106,23 +105,23 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
             throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first.");
         }
     }
-    
+
     @Override
     public void setProperty(final String name, final String value) {
         super.setProperty(name, value);
         onConfigured();
     }
-    
+
     @Override
     public boolean removeProperty(String name) {
         final boolean removed = super.removeProperty(name);
-        if ( removed ) {
+        if (removed) {
             onConfigured();
         }
-        
+
         return removed;
     }
-    
+
     @SuppressWarnings("deprecation")
     private void onConfigured() {
         try (final NarCloseable x = NarCloseable.withNarLoader()) {
@@ -132,97 +131,97 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
             throw new ComponentLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + implementation, e);
         }
     }
-    
+
     @Override
     public void verifyCanDelete() {
-        if ( getState() != ControllerServiceState.DISABLED ) {
+        if (getState() != ControllerServiceState.DISABLED) {
             throw new IllegalStateException(implementation + " cannot be deleted because it is not disabled");
         }
     }
-    
+
     @Override
     public void verifyCanDisable() {
         verifyCanDisable(Collections.<ControllerServiceNode>emptySet());
     }
-    
+
     @Override
     public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
         final ControllerServiceState state = getState();
-        if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) {
+        if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
             throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation() + " because it is not enabled");
         }
-        
+
         final ControllerServiceReference references = getReferences();
-        
-        for ( final ConfiguredComponent activeReference : references.getActiveReferences() ) {
-            if ( !ignoreReferences.contains(activeReference) ) {
+
+        for (final ConfiguredComponent activeReference : references.getActiveReferences()) {
+            if (!ignoreReferences.contains(activeReference)) {
                 throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by at least one component that is currently running");
             }
         }
     }
-    
+
     @Override
     public void verifyCanEnable() {
-        if ( getState() != ControllerServiceState.DISABLED ) {
+        if (getState() != ControllerServiceState.DISABLED) {
             throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled");
         }
-        
-        if ( !isValid() ) {
+
+        if (!isValid()) {
             throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + getValidationErrors());
         }
     }
-    
+
     @Override
     public void verifyCanEnable(final Set<ControllerServiceNode> ignoredReferences) {
         if (getState() != ControllerServiceState.DISABLED) {
             throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled");
         }
-        
+
         final Set<String> ids = new HashSet<>();
-        for ( final ControllerServiceNode node : ignoredReferences ) {
+        for (final ControllerServiceNode node : ignoredReferences) {
             ids.add(node.getIdentifier());
         }
-        
+
         final Collection<ValidationResult> validationResults = getValidationErrors(ids);
-        for ( final ValidationResult result : validationResults ) {
-            if ( !result.isValid() ) {
+        for (final ValidationResult result : validationResults) {
+            if (!result.isValid()) {
                 throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + result);
             }
         }
     }
-    
+
     @Override
     public void verifyCanUpdate() {
-        if ( getState() != ControllerServiceState.DISABLED ) {
+        if (getState() != ControllerServiceState.DISABLED) {
             throw new IllegalStateException(implementation + " cannot be updated because it is not disabled");
         }
     }
-    
+
     @Override
     public String getComments() {
-    	readLock.lock();
-    	try {
-    		return comment;
-    	} finally {
-    		readLock.unlock();
-    	}
+        readLock.lock();
+        try {
+            return comment;
+        } finally {
+            readLock.unlock();
+        }
     }
-    
+
     @Override
     public void setComments(final String comment) {
-    	writeLock.lock();
-    	try {
-    		this.comment = comment;
-    	} finally {
-    		writeLock.unlock();
-    	}
+        writeLock.lock();
+        try {
+            this.comment = comment;
+        } finally {
+            writeLock.unlock();
+        }
     }
-    
+
     @Override
     public ControllerServiceState getState() {
         return stateRef.get();
     }
-    
+
     @Override
     public void setState(final ControllerServiceState state) {
         this.stateRef.set(state);


Mime
View raw message