nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [01/11] incubator-nifi git commit: NIFI-271 checkpoint
Date Wed, 22 Apr 2015 15:46:44 GMT
Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-271 b612b6bcd -> 888254b2a


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index ca68725..3486875 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -257,7 +257,7 @@ public class TestStandardProcessSession {
             Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
         }
     }
-    
+
     private void assertDisabled(final InputStream inputStream) {
         try {
             inputStream.read();
@@ -289,8 +289,8 @@ public class TestStandardProcessSession {
         } catch (final Exception ex) {
             Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
         }
-    }    
-    
+    }
+
     @Test
     public void testAppendAfterSessionClosesStream() throws IOException {
         final ContentClaim claim = contentRepo.create(false);
@@ -355,7 +355,7 @@ public class TestStandardProcessSession {
         });
         assertDisabled(inputStreamHolder.get());
         assertDisabled(outputStreamHolder.get());
-   }
+    }
 
     @Test
     public void testWriteAfterSessionClosesStream() throws IOException {
@@ -426,7 +426,6 @@ public class TestStandardProcessSession {
         assertEquals(0, provenanceRepo.getEvents(0L, 100000).size());
     }
 
-    
     @Test
     public void testProvenanceEventsEmittedForForkIfNotRemoved() throws IOException {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
@@ -466,59 +465,59 @@ public class TestStandardProcessSession {
     @Test
     public void testUpdateAttributesThenJoin() throws IOException {
         final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
-            .id(1L)
-            .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
-            .entryDate(System.currentTimeMillis())
-            .build();
-        
+                .id(1L)
+                .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
+                .entryDate(System.currentTimeMillis())
+                .build();
+
         final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
-            .id(2L)
-            .addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
-            .entryDate(System.currentTimeMillis())
-            .build();
-        
+                .id(2L)
+                .addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
+                .entryDate(System.currentTimeMillis())
+                .build();
+
         flowFileQueue.put(flowFileRecord1);
         flowFileQueue.put(flowFileRecord2);
-        
+
         FlowFile ff1 = session.get();
         FlowFile ff2 = session.get();
 
         ff1 = session.putAttribute(ff1, "index", "1");
         ff2 = session.putAttribute(ff2, "index", "2");
-        
+
         final List<FlowFile> parents = new ArrayList<>(2);
         parents.add(ff1);
         parents.add(ff2);
-        
+
         final FlowFile child = session.create(parents);
-        
+
         final Relationship rel = new Relationship.Builder().name("A").build();
-        
+
         session.transfer(ff1, rel);
         session.transfer(ff2, rel);
         session.transfer(child, rel);
-        
+
         session.commit();
-        
+
         final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 1000);
 
         // We should have a JOIN and 2 ATTRIBUTE_MODIFIED's
         assertEquals(3, events.size());
-        
+
         int joinCount = 0;
         int ff1UpdateCount = 0;
         int ff2UpdateCount = 0;
-        
-        for ( final ProvenanceEventRecord event : events ) {
+
+        for (final ProvenanceEventRecord event : events) {
             switch (event.getEventType()) {
                 case JOIN:
                     assertEquals(child.getAttribute("uuid"), event.getFlowFileUuid());
                     joinCount++;
                     break;
                 case ATTRIBUTES_MODIFIED:
-                    if ( event.getFlowFileUuid().equals(ff1.getAttribute("uuid")) ) {
+                    if (event.getFlowFileUuid().equals(ff1.getAttribute("uuid"))) {
                         ff1UpdateCount++;
-                    } else if ( event.getFlowFileUuid().equals(ff2.getAttribute("uuid")) ) {
+                    } else if (event.getFlowFileUuid().equals(ff2.getAttribute("uuid"))) {
                         ff2UpdateCount++;
                     } else {
                         Assert.fail("Got ATTRIBUTE_MODIFIED for wrong FlowFile: " + event.getFlowFileUuid());
@@ -528,14 +527,14 @@ public class TestStandardProcessSession {
                     Assert.fail("Unexpected event type: " + event);
             }
         }
-        
+
         assertEquals(1, joinCount);
         assertEquals(1, ff1UpdateCount);
         assertEquals(1, ff2UpdateCount);
-        
+
         assertEquals(1, joinCount);
     }
-    
+
     @Test
     public void testForkOneToOneReported() throws IOException {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
@@ -845,34 +844,34 @@ public class TestStandardProcessSession {
     @Test
     public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-            .entryDate(System.currentTimeMillis())
-            .contentClaim(new ContentClaim() {
-                @Override
-                public int compareTo(ContentClaim arg0) {
-                    return 0;
-                }
-    
-                @Override
-                public String getId() {
-                    return "0";
-                }
-    
-                @Override
-                public String getContainer() {
-                    return "container";
-                }
-    
-                @Override
-                public String getSection() {
-                    return "section";
-                }
-    
-                @Override
-                public boolean isLossTolerant() {
-                    return true;
-                }
-            }).build();
+                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+                .entryDate(System.currentTimeMillis())
+                .contentClaim(new ContentClaim() {
+                    @Override
+                    public int compareTo(ContentClaim arg0) {
+                        return 0;
+                    }
+
+                    @Override
+                    public String getId() {
+                        return "0";
+                    }
+
+                    @Override
+                    public String getContainer() {
+                        return "container";
+                    }
+
+                    @Override
+                    public String getSection() {
+                        return "section";
+                    }
+
+                    @Override
+                    public boolean isLossTolerant() {
+                        return true;
+                    }
+                }).build();
         flowFileQueue.put(flowFileRecord);
 
         FlowFile ff1 = session.get();
@@ -885,35 +884,35 @@ public class TestStandardProcessSession {
         session.commit();
 
         final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
-            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-            .entryDate(System.currentTimeMillis())
-            .contentClaim(new ContentClaim() {
-                @Override
-                public int compareTo(ContentClaim arg0) {
-                    return 0;
-                }
-    
-                @Override
-                public String getId() {
-                    return "0";
-                }
-    
-                @Override
-                public String getContainer() {
-                    return "container";
-                }
-    
-                @Override
-                public String getSection() {
-                    return "section";
-                }
-    
-                @Override
-                public boolean isLossTolerant() {
-                    return true;
-                }
-            })
-            .contentClaimOffset(1000L).size(1L).build();
+                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+                .entryDate(System.currentTimeMillis())
+                .contentClaim(new ContentClaim() {
+                    @Override
+                    public int compareTo(ContentClaim arg0) {
+                        return 0;
+                    }
+
+                    @Override
+                    public String getId() {
+                        return "0";
+                    }
+
+                    @Override
+                    public String getContainer() {
+                        return "container";
+                    }
+
+                    @Override
+                    public String getSection() {
+                        return "section";
+                    }
+
+                    @Override
+                    public boolean isLossTolerant() {
+                        return true;
+                    }
+                })
+                .contentClaimOffset(1000L).size(1L).build();
         flowFileQueue.put(flowFileRecord2);
 
         // attempt to read the data.
@@ -974,21 +973,20 @@ public class TestStandardProcessSession {
         }
     }
 
-    
     @Test
     public void testCreateEmitted() throws IOException {
         FlowFile newFlowFile = session.create();
         session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
         session.commit();
-        
+
         final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
         assertFalse(events.isEmpty());
         assertEquals(1, events.size());
-        
+
         final ProvenanceEventRecord event = events.get(0);
         assertEquals(ProvenanceEventType.CREATE, event.getEventType());
     }
-    
+
     @Test
     public void testContentModifiedNotEmittedForCreate() throws IOException {
         FlowFile newFlowFile = session.create();
@@ -999,23 +997,23 @@ public class TestStandardProcessSession {
         });
         session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
         session.commit();
-        
+
         final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
         assertFalse(events.isEmpty());
         assertEquals(1, events.size());
-        
+
         final ProvenanceEventRecord event = events.get(0);
         assertEquals(ProvenanceEventType.CREATE, event.getEventType());
     }
-    
+
     @Test
     public void testContentModifiedEmittedAndNotAttributesModified() throws IOException {
         final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
-            .id(1L)
-            .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
-            .build();
+                .id(1L)
+                .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+                .build();
         this.flowFileQueue.put(flowFile);
-        
+
         FlowFile existingFlowFile = session.get();
         existingFlowFile = session.write(existingFlowFile, new OutputStreamCallback() {
             @Override
@@ -1025,38 +1023,36 @@ public class TestStandardProcessSession {
         existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
         session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
         session.commit();
-        
+
         final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
         assertFalse(events.isEmpty());
         assertEquals(1, events.size());
-        
+
         final ProvenanceEventRecord event = events.get(0);
         assertEquals(ProvenanceEventType.CONTENT_MODIFIED, event.getEventType());
     }
-    
+
     @Test
     public void testAttributesModifiedEmitted() throws IOException {
         final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
-            .id(1L)
-            .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
-            .build();
+                .id(1L)
+                .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+                .build();
         this.flowFileQueue.put(flowFile);
-        
+
         FlowFile existingFlowFile = session.get();
         existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
         session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
         session.commit();
-        
+
         final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
         assertFalse(events.isEmpty());
         assertEquals(1, events.size());
-        
+
         final ProvenanceEventRecord event = events.get(0);
         assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, event.getEventType());
     }
-    
-    
-    
+
     private static class MockFlowFileRepository implements FlowFileRepository {
 
         private final AtomicLong idGenerator = new AtomicLong(0L);
@@ -1123,7 +1119,7 @@ public class TestStandardProcessSession {
         @Override
         public void shutdown() {
         }
-        
+
         public Set<ContentClaim> getExistingClaims() {
             final Set<ContentClaim> claims = new HashSet<>();
 
@@ -1146,7 +1142,7 @@ public class TestStandardProcessSession {
             if (Files.exists(parent) == false) {
                 Files.createDirectories(parent);
             }
-            Files.createFile(getPath(claim));            
+            Files.createFile(getPath(claim));
             return claim;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
index 7fef706..acd9993 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
@@ -27,45 +27,45 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class StandardControllerServiceProviderTest {
-	
+
     private ControllerService proxied;
     private ControllerService implementation;
-    
+
     @BeforeClass
     public static void setupSuite() throws Exception {
         System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, StandardFlowServiceTest.class.getResource("/conf/nifi.properties").getFile());
         NiFiProperties properties = NiFiProperties.getInstance();
         NarClassLoaders.load(properties);
-    	ExtensionManager.discoverExtensions();
+        ExtensionManager.discoverExtensions();
     }
 
     @Before
     public void setup() throws Exception {
-    	String id = "id";
-    	String clazz = "org.apache.nifi.controller.service.util.TestControllerService";  
-    	ControllerServiceProvider provider = new StandardControllerServiceProvider(null, null);
-    	ControllerServiceNode node = provider.createControllerService(clazz,id,true);
-    	proxied = node.getProxiedControllerService();
-    	implementation = node.getControllerServiceImplementation();
+        String id = "id";
+        String clazz = "org.apache.nifi.controller.service.util.TestControllerService";
+        ControllerServiceProvider provider = new StandardControllerServiceProvider(null, null);
+        ControllerServiceNode node = provider.createControllerService(clazz, id, true);
+        proxied = node.getProxiedControllerService();
+        implementation = node.getControllerServiceImplementation();
     }
-	
-    @Test (expected=UnsupportedOperationException.class)
-    public void testCallProxiedOnPropertyModified() {	
-		proxied.onPropertyModified(null, "oldValue", "newValue");
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testCallProxiedOnPropertyModified() {
+        proxied.onPropertyModified(null, "oldValue", "newValue");
     }
-    
+
     @Test
-    public void testCallImplementationOnPropertyModified() {	
-    	implementation.onPropertyModified(null, "oldValue", "newValue");
+    public void testCallImplementationOnPropertyModified() {
+        implementation.onPropertyModified(null, "oldValue", "newValue");
     }
-    
-    @Test (expected=UnsupportedOperationException.class)
-    public void testCallProxiedInitialized() throws InitializationException {	
-		proxied.initialize(null);
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testCallProxiedInitialized() throws InitializationException {
+        proxied.initialize(null);
     }
-    
+
     @Test
-    public void testCallImplementationInitialized() throws InitializationException {	
-    	implementation.initialize(null);
+    public void testCallImplementationInitialized() throws InitializationException {
+        implementation.initialize(null);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index 3dc1752..03aca7e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -54,7 +54,7 @@ public class TestStandardControllerServiceProvider {
                 return null;
             }
         }).when(scheduler).enableControllerService(Mockito.any(ControllerServiceNode.class));
-        
+
         Mockito.doAnswer(new Answer<Object>() {
             @Override
             public Object answer(final InvocationOnMock invocation) throws Throwable {
@@ -64,55 +64,54 @@ public class TestStandardControllerServiceProvider {
                 return null;
             }
         }).when(scheduler).disableControllerService(Mockito.any(ControllerServiceNode.class));
-        
+
         return scheduler;
     }
-    
+
     @Test
     public void testDisableControllerService() {
         final ProcessScheduler scheduler = createScheduler();
         final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
-        
+
         final ControllerServiceNode serviceNode = provider.createControllerService(ServiceB.class.getName(), "B", false);
         provider.enableControllerService(serviceNode);
         provider.disableControllerService(serviceNode);
     }
-    
+
     @Test
     public void testEnableDisableWithReference() {
         final ProcessScheduler scheduler = createScheduler();
         final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
-        
+
         final ControllerServiceNode serviceNodeB = provider.createControllerService(ServiceB.class.getName(), "B", false);
         final ControllerServiceNode serviceNodeA = provider.createControllerService(ServiceA.class.getName(), "A", false);
-        
+
         serviceNodeA.setProperty(ServiceA.OTHER_SERVICE.getName(), "B");
-        
+
         try {
             provider.enableControllerService(serviceNodeA);
             Assert.fail("Was able to enable Service A but Service B is disabled.");
         } catch (final IllegalStateException expected) {
         }
-        
+
         provider.enableControllerService(serviceNodeB);
         provider.enableControllerService(serviceNodeA);
-        
+
         try {
             provider.disableControllerService(serviceNodeB);
             Assert.fail("Was able to disable Service B but Service A is enabled and references B");
         } catch (final IllegalStateException expected) {
         }
-        
+
         provider.disableControllerService(serviceNodeA);
         provider.disableControllerService(serviceNodeB);
     }
-    
-    
+
     @Test
     public void testEnableReferencingServicesGraph() {
         final ProcessScheduler scheduler = createScheduler();
         final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
-        
+
         // build a graph of controller services with dependencies as such:
         //
         // A -> B -> D
@@ -125,31 +124,29 @@ public class TestStandardControllerServiceProvider {
         // So we have to verify that if D is enabled, when we enable its referencing services,
         // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
         // until B is first enabled so ensure that we enable B first.
-        
         final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
         final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
         final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
         final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
-        
+
         serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
         serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
         serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
         serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
-        
+
         provider.enableControllerService(serviceNode4);
         provider.enableReferencingServices(serviceNode4);
-        
+
         assertEquals(ControllerServiceState.ENABLED, serviceNode3.getState());
         assertEquals(ControllerServiceState.ENABLED, serviceNode2.getState());
         assertEquals(ControllerServiceState.ENABLED, serviceNode1.getState());
     }
-    
-    
+
     @Test
     public void testStartStopReferencingComponents() {
         final ProcessScheduler scheduler = createScheduler();
         final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
-        
+
         // build a graph of reporting tasks and controller services with dependencies as such:
         //
         // Processor P1 -> A -> B -> D
@@ -162,12 +159,11 @@ public class TestStandardControllerServiceProvider {
         // So we have to verify that if D is enabled, when we enable its referencing services,
         // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
         // until B is first enabled so ensure that we enable B first.
-        
         final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
         final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
         final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
         final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
-        
+
         final ProcessGroup mockProcessGroup = Mockito.mock(ProcessGroup.class);
         Mockito.doAnswer(new Answer<Object>() {
             @Override
@@ -178,7 +174,7 @@ public class TestStandardControllerServiceProvider {
                 return null;
             }
         }).when(mockProcessGroup).startProcessor(Mockito.any(ProcessorNode.class));
-        
+
         Mockito.doAnswer(new Answer<Object>() {
             @Override
             public Object answer(final InvocationOnMock invocation) throws Throwable {
@@ -188,36 +184,36 @@ public class TestStandardControllerServiceProvider {
                 return null;
             }
         }).when(mockProcessGroup).stopProcessor(Mockito.any(ProcessorNode.class));
-        
+
         final String id1 = UUID.randomUUID().toString();
         final ProcessorNode procNodeA = new StandardProcessorNode(new DummyProcessor(), id1,
                 new StandardValidationContextFactory(provider), scheduler, provider);
         procNodeA.getProcessor().initialize(new StandardProcessorInitializationContext(id1, null, provider));
         procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1");
         procNodeA.setProcessGroup(mockProcessGroup);
-        
+
         final String id2 = UUID.randomUUID().toString();
-        final ProcessorNode procNodeB = new StandardProcessorNode(new DummyProcessor(),id2,
+        final ProcessorNode procNodeB = new StandardProcessorNode(new DummyProcessor(), id2,
                 new StandardValidationContextFactory(provider), scheduler, provider);
         procNodeB.getProcessor().initialize(new StandardProcessorInitializationContext(id2, null, provider));
         procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3");
         procNodeB.setProcessGroup(mockProcessGroup);
-        
+
         serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
         serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
         serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
         serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
-        
+
         provider.enableControllerService(serviceNode4);
         provider.enableReferencingServices(serviceNode4);
         provider.scheduleReferencingComponents(serviceNode4);
-        
+
         assertEquals(ControllerServiceState.ENABLED, serviceNode3.getState());
         assertEquals(ControllerServiceState.ENABLED, serviceNode2.getState());
         assertEquals(ControllerServiceState.ENABLED, serviceNode1.getState());
         assertTrue(procNodeA.isRunning());
         assertTrue(procNodeB.isRunning());
-        
+
         // stop processors and verify results.
         provider.unscheduleReferencingComponents(serviceNode4);
         assertFalse(procNodeA.isRunning());
@@ -225,18 +221,17 @@ public class TestStandardControllerServiceProvider {
         assertEquals(ControllerServiceState.ENABLED, serviceNode3.getState());
         assertEquals(ControllerServiceState.ENABLED, serviceNode2.getState());
         assertEquals(ControllerServiceState.ENABLED, serviceNode1.getState());
-        
+
         provider.disableReferencingServices(serviceNode4);
         assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState());
         assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState());
         assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState());
         assertEquals(ControllerServiceState.ENABLED, serviceNode4.getState());
-        
+
         provider.disableControllerService(serviceNode4);
         assertEquals(ControllerServiceState.DISABLED, serviceNode4.getState());
     }
-    
-    
+
     @Test
     public void testOrderingOfServices() {
         final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null);
@@ -248,7 +243,7 @@ public class TestStandardControllerServiceProvider {
         final Map<String, ControllerServiceNode> nodeMap = new LinkedHashMap<>();
         nodeMap.put("1", serviceNode1);
         nodeMap.put("2", serviceNode2);
-        
+
         List<List<ControllerServiceNode>> branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
         assertEquals(2, branches.size());
         List<ControllerServiceNode> ordered = branches.get(0);
@@ -257,11 +252,11 @@ public class TestStandardControllerServiceProvider {
         assertTrue(ordered.get(1) == serviceNode1);
         assertEquals(1, branches.get(1).size());
         assertTrue(branches.get(1).get(0) == serviceNode2);
-        
+
         nodeMap.clear();
         nodeMap.put("2", serviceNode2);
         nodeMap.put("1", serviceNode1);
-        
+
         branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
         assertEquals(2, branches.size());
         ordered = branches.get(1);
@@ -270,20 +265,20 @@ public class TestStandardControllerServiceProvider {
         assertTrue(ordered.get(1) == serviceNode1);
         assertEquals(1, branches.get(0).size());
         assertTrue(branches.get(0).get(0) == serviceNode2);
-        
+
         // add circular dependency on self.
         nodeMap.clear();
         serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1");
         nodeMap.put("1", serviceNode1);
         nodeMap.put("2", serviceNode2);
-        
+
         branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
         assertEquals(2, branches.size());
         ordered = branches.get(0);
         assertEquals(2, ordered.size());
         assertTrue(ordered.get(0) == serviceNode2);
         assertTrue(ordered.get(1) == serviceNode1);
-        
+
         nodeMap.clear();
         nodeMap.put("2", serviceNode2);
         nodeMap.put("1", serviceNode1);
@@ -293,7 +288,7 @@ public class TestStandardControllerServiceProvider {
         assertEquals(2, ordered.size());
         assertTrue(ordered.get(0) == serviceNode2);
         assertTrue(ordered.get(1) == serviceNode1);
-        
+
         // add circular dependency once removed. In this case, we won't actually be able to enable these because of the
         // circular dependency because they will never be valid because they will always depend on a disabled service.
         // But we want to ensure that the method returns successfully without throwing a StackOverflowException or anything
@@ -310,7 +305,7 @@ public class TestStandardControllerServiceProvider {
         assertEquals(2, ordered.size());
         assertTrue(ordered.get(0) == serviceNode3);
         assertTrue(ordered.get(1) == serviceNode1);
-        
+
         nodeMap.clear();
         nodeMap.put("3", serviceNode3);
         nodeMap.put("1", serviceNode1);
@@ -320,8 +315,7 @@ public class TestStandardControllerServiceProvider {
         assertEquals(2, ordered.size());
         assertTrue(ordered.get(0) == serviceNode3);
         assertTrue(ordered.get(1) == serviceNode1);
-        
-        
+
         // Add multiple completely disparate branches.
         nodeMap.clear();
         serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
@@ -333,7 +327,7 @@ public class TestStandardControllerServiceProvider {
         nodeMap.put("3", serviceNode3);
         nodeMap.put("4", serviceNode4);
         nodeMap.put("5", serviceNode5);
-        
+
         branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
         assertEquals(5, branches.size());
 
@@ -341,21 +335,21 @@ public class TestStandardControllerServiceProvider {
         assertEquals(2, ordered.size());
         assertTrue(ordered.get(0) == serviceNode2);
         assertTrue(ordered.get(1) == serviceNode1);
-        
+
         assertEquals(1, branches.get(1).size());
         assertTrue(branches.get(1).get(0) == serviceNode2);
-        
+
         ordered = branches.get(2);
         assertEquals(2, ordered.size());
         assertTrue(ordered.get(0) == serviceNode4);
         assertTrue(ordered.get(1) == serviceNode3);
-        
+
         assertEquals(1, branches.get(3).size());
         assertTrue(branches.get(3).get(0) == serviceNode4);
-        
+
         assertEquals(1, branches.get(4).size());
         assertTrue(branches.get(4).get(0) == serviceNode5);
-        
+
         // create 2 branches both dependent on the same service
         nodeMap.clear();
         serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
@@ -363,19 +357,19 @@ public class TestStandardControllerServiceProvider {
         nodeMap.put("1", serviceNode1);
         nodeMap.put("2", serviceNode2);
         nodeMap.put("3", serviceNode3);
-        
+
         branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
         assertEquals(3, branches.size());
-        
+
         ordered = branches.get(0);
         assertEquals(2, ordered.size());
         assertTrue(ordered.get(0) == serviceNode2);
         assertTrue(ordered.get(1) == serviceNode1);
-        
+
         ordered = branches.get(1);
         assertEquals(1, ordered.size());
         assertTrue(ordered.get(0) == serviceNode2);
-        
+
         ordered = branches.get(2);
         assertEquals(2, ordered.size());
         assertTrue(ordered.get(0) == serviceNode2);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
index 615e172..13898a5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
@@ -29,19 +29,18 @@ import org.apache.nifi.processor.exception.ProcessException;
 public class DummyProcessor extends AbstractProcessor {
 
     public static final PropertyDescriptor SERVICE = new PropertyDescriptor.Builder()
-        .name("Controller Service")
-        .identifiesControllerService(ControllerService.class)
-        .required(true)
-        .build();
-    
-    
+            .name("Controller Service")
+            .identifiesControllerService(ControllerService.class)
+            .required(true)
+            .build();
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(SERVICE);
         return descriptors;
     }
-    
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java
index 4918468..f93184b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java
@@ -26,18 +26,17 @@ import org.apache.nifi.controller.ControllerService;
 public class ServiceA extends AbstractControllerService {
 
     public static final PropertyDescriptor OTHER_SERVICE = new PropertyDescriptor.Builder()
-        .name("Other Service")
-        .identifiesControllerService(ControllerService.class)
-        .required(true)
-        .build();
-    
+            .name("Other Service")
+            .identifiesControllerService(ControllerService.class)
+            .required(true)
+            .build();
+
     public static final PropertyDescriptor OTHER_SERVICE_2 = new PropertyDescriptor.Builder()
-        .name("Other Service 2")
-        .identifiesControllerService(ControllerService.class)
-        .required(false)
-        .build();
+            .name("Other Service 2")
+            .identifiesControllerService(ControllerService.class)
+            .required(false)
+            .build();
 
-    
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
@@ -45,5 +44,5 @@ public class ServiceA extends AbstractControllerService {
         descriptors.add(OTHER_SERVICE_2);
         return descriptors;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java
index 95200a0..65ef13f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java
@@ -28,34 +28,34 @@ import org.apache.nifi.reporting.InitializationException;
 
 public class TestControllerService implements ControllerService {
 
-	@Override
-	public Collection<ValidationResult> validate(ValidationContext context) {
-		return null;
-	}
-
-	@Override
-	public PropertyDescriptor getPropertyDescriptor(String name) {
-		return null;
-	}
-
-	@Override
-	public void onPropertyModified(PropertyDescriptor descriptor,
-			String oldValue, String newValue) {
-	}
-
-	@Override
-	public List<PropertyDescriptor> getPropertyDescriptors() {
-		return null;
-	}
-
-	@Override
-	public String getIdentifier() {
-		return null;
-	}
-
-	@Override
-	public void initialize(ControllerServiceInitializationContext context)
-			throws InitializationException {
-	}
+    @Override
+    public Collection<ValidationResult> validate(ValidationContext context) {
+        return null;
+    }
+
+    @Override
+    public PropertyDescriptor getPropertyDescriptor(String name) {
+        return null;
+    }
+
+    @Override
+    public void onPropertyModified(PropertyDescriptor descriptor,
+            String oldValue, String newValue) {
+    }
+
+    @Override
+    public List<PropertyDescriptor> getPropertyDescriptors() {
+        return null;
+    }
+
+    @Override
+    public String getIdentifier() {
+        return null;
+    }
+
+    @Override
+    public void initialize(ControllerServiceInitializationContext context)
+            throws InitializationException {
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java
index a0bf30d..be40e90 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java
@@ -165,9 +165,9 @@ public class TestStandardPropertyValue {
 
         @Override
         public String getControllerServiceName(String serviceIdentifier) {
-        	return null;
+            return null;
         }
-        
+
         @Override
         public boolean isControllerServiceEnabling(String serviceIdentifier) {
             return false;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/pom.xml
index 46a1aca..0406ed6 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/pom.xml
@@ -32,7 +32,7 @@
             <artifactId>nifi-properties</artifactId>
             <scope>compile</scope>
         </dependency>
-		<dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-documentation</artifactId>
             <scope>compile</scope>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
index 590797c..c1bdf97 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
@@ -47,166 +47,165 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class BootstrapListener {
-	private static final Logger logger = LoggerFactory.getLogger(BootstrapListener.class);
-	
-	private final NiFi nifi;
-	private final int bootstrapPort;
-	private final String secretKey;
-	
-	private volatile Listener listener;
-	private volatile ServerSocket serverSocket;
-	
-	
-	public BootstrapListener(final NiFi nifi, final int bootstrapPort) {
-		this.nifi = nifi;
-		this.bootstrapPort = bootstrapPort;
-		secretKey = UUID.randomUUID().toString();
-	}
-	
-	public void start() throws IOException {
-		logger.debug("Starting Bootstrap Listener to communicate with Bootstrap Port {}", bootstrapPort);
-		
-		serverSocket = new ServerSocket();
-		serverSocket.bind(new InetSocketAddress("localhost", 0));
-		serverSocket.setSoTimeout(2000);
-		
-		final int localPort = serverSocket.getLocalPort();
-		logger.info("Started Bootstrap Listener, Listening for incoming requests on port {}", localPort);
-		
-		listener = new Listener(serverSocket);
-		final Thread listenThread = new Thread(listener);
-		listenThread.setDaemon(true);
-		listenThread.setName("Listen to Bootstrap");
-		listenThread.start();
-		
-		logger.debug("Notifying Bootstrap that local port is {}", localPort);
-		try (final Socket socket = new Socket()) {
-			socket.setSoTimeout(60000);
-			socket.connect(new InetSocketAddress("localhost", bootstrapPort));
-			socket.setSoTimeout(60000);
-			
-			final OutputStream out = socket.getOutputStream();
-			out.write(("PORT " + localPort + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
-			out.flush();
-			
-			logger.debug("Awaiting response from Bootstrap...");
-			final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-			final String response = reader.readLine();
-			if ("OK".equals(response)) {
-				logger.info("Successfully initiated communication with Bootstrap");
-			} else {
-				logger.error("Failed to communicate with Bootstrap. Bootstrap may be unable to issue or receive commands from NiFi");
-			}
-		}
-	}
-	
-	
-	public void stop() {
-		if (listener != null) {
-			listener.stop();
-		}
-	}
-	
-	private class Listener implements Runnable {
-		private final ServerSocket serverSocket;
-		private final ExecutorService executor;
-		private volatile boolean stopped = false;
-		
-		public Listener(final ServerSocket serverSocket) {
-			this.serverSocket = serverSocket;
-			this.executor = Executors.newFixedThreadPool(2);
-		}
-		
-		public void stop() {
-			stopped = true;
-			
-			executor.shutdownNow();
-			
-			try {
-				serverSocket.close();
-			} catch (final IOException ioe) {
-				// nothing to really do here. we could log this, but it would just become
-				// confusing in the logs, as we're shutting down and there's no real benefit
-			}
-		}
-		
-		@Override
-		public void run() {
-			while (!stopped) {
-				try {
-					final Socket socket;
-					try {
-					    logger.debug("Listening for Bootstrap Requests");
-						socket = serverSocket.accept();
-					} catch (final SocketTimeoutException ste) {
-						if ( stopped ) {
-							return;
-						}
-						
-						continue;
-					} catch (final IOException ioe) {
-						if ( stopped ) {
-							return;
-						}
-						
-						throw ioe;
-					}
-					
-					logger.debug("Received connection from Bootstrap");
-					socket.setSoTimeout(5000);
-					
-					executor.submit(new Runnable() {
-						@Override
-						public void run() {
-							try {
-								final BootstrapRequest request = readRequest(socket.getInputStream());
-								final BootstrapRequest.RequestType requestType = request.getRequestType();
-								
-								switch (requestType) {
-									case PING:
-										logger.debug("Received PING request from Bootstrap; responding");
-										echoPing(socket.getOutputStream());
-										logger.debug("Responded to PING request from Bootstrap");
-										break;
-									case SHUTDOWN:
-										logger.info("Received SHUTDOWN request from Bootstrap");
-										echoShutdown(socket.getOutputStream());
-										nifi.shutdownHook();
-										return;
-									case DUMP:
-									    logger.info("Received DUMP request from Bootstrap");
-									    writeDump(socket.getOutputStream());
-									    break;
-								}
-							} catch (final Throwable t) {
-								logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
-							} finally {
-								try {
-									socket.close();
-								} catch (final IOException ioe) {
-									logger.warn("Failed to close socket to Bootstrap due to {}", ioe.toString());
-								}
-							}
-						}
-					});
-				} catch (final Throwable t) {
-					logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
-				}
-			}
-		}
-	}
-	
-	
-	private static void writeDump(final OutputStream out) throws IOException {
+
+    private static final Logger logger = LoggerFactory.getLogger(BootstrapListener.class);
+
+    private final NiFi nifi;
+    private final int bootstrapPort;
+    private final String secretKey;
+
+    private volatile Listener listener;
+    private volatile ServerSocket serverSocket;
+
+    public BootstrapListener(final NiFi nifi, final int bootstrapPort) {
+        this.nifi = nifi;
+        this.bootstrapPort = bootstrapPort;
+        secretKey = UUID.randomUUID().toString();
+    }
+
+    public void start() throws IOException {
+        logger.debug("Starting Bootstrap Listener to communicate with Bootstrap Port {}", bootstrapPort);
+
+        serverSocket = new ServerSocket();
+        serverSocket.bind(new InetSocketAddress("localhost", 0));
+        serverSocket.setSoTimeout(2000);
+
+        final int localPort = serverSocket.getLocalPort();
+        logger.info("Started Bootstrap Listener, Listening for incoming requests on port {}", localPort);
+
+        listener = new Listener(serverSocket);
+        final Thread listenThread = new Thread(listener);
+        listenThread.setDaemon(true);
+        listenThread.setName("Listen to Bootstrap");
+        listenThread.start();
+
+        logger.debug("Notifying Bootstrap that local port is {}", localPort);
+        try (final Socket socket = new Socket()) {
+            socket.setSoTimeout(60000);
+            socket.connect(new InetSocketAddress("localhost", bootstrapPort));
+            socket.setSoTimeout(60000);
+
+            final OutputStream out = socket.getOutputStream();
+            out.write(("PORT " + localPort + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
+            out.flush();
+
+            logger.debug("Awaiting response from Bootstrap...");
+            final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+            final String response = reader.readLine();
+            if ("OK".equals(response)) {
+                logger.info("Successfully initiated communication with Bootstrap");
+            } else {
+                logger.error("Failed to communicate with Bootstrap. Bootstrap may be unable to issue or receive commands from NiFi");
+            }
+        }
+    }
+
+    public void stop() {
+        if (listener != null) {
+            listener.stop();
+        }
+    }
+
+    private class Listener implements Runnable {
+
+        private final ServerSocket serverSocket;
+        private final ExecutorService executor;
+        private volatile boolean stopped = false;
+
+        public Listener(final ServerSocket serverSocket) {
+            this.serverSocket = serverSocket;
+            this.executor = Executors.newFixedThreadPool(2);
+        }
+
+        public void stop() {
+            stopped = true;
+
+            executor.shutdownNow();
+
+            try {
+                serverSocket.close();
+            } catch (final IOException ioe) {
+                // nothing to really do here. we could log this, but it would just become
+                // confusing in the logs, as we're shutting down and there's no real benefit
+            }
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                try {
+                    final Socket socket;
+                    try {
+                        logger.debug("Listening for Bootstrap Requests");
+                        socket = serverSocket.accept();
+                    } catch (final SocketTimeoutException ste) {
+                        if (stopped) {
+                            return;
+                        }
+
+                        continue;
+                    } catch (final IOException ioe) {
+                        if (stopped) {
+                            return;
+                        }
+
+                        throw ioe;
+                    }
+
+                    logger.debug("Received connection from Bootstrap");
+                    socket.setSoTimeout(5000);
+
+                    executor.submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                final BootstrapRequest request = readRequest(socket.getInputStream());
+                                final BootstrapRequest.RequestType requestType = request.getRequestType();
+
+                                switch (requestType) {
+                                    case PING:
+                                        logger.debug("Received PING request from Bootstrap; responding");
+                                        echoPing(socket.getOutputStream());
+                                        logger.debug("Responded to PING request from Bootstrap");
+                                        break;
+                                    case SHUTDOWN:
+                                        logger.info("Received SHUTDOWN request from Bootstrap");
+                                        echoShutdown(socket.getOutputStream());
+                                        nifi.shutdownHook();
+                                        return;
+                                    case DUMP:
+                                        logger.info("Received DUMP request from Bootstrap");
+                                        writeDump(socket.getOutputStream());
+                                        break;
+                                }
+                            } catch (final Throwable t) {
+                                logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
+                            } finally {
+                                try {
+                                    socket.close();
+                                } catch (final IOException ioe) {
+                                    logger.warn("Failed to close socket to Bootstrap due to {}", ioe.toString());
+                                }
+                            }
+                        }
+                    });
+                } catch (final Throwable t) {
+                    logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
+                }
+            }
+        }
+    }
+
+    private static void writeDump(final OutputStream out) throws IOException {
         final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
         final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
-        
+
         final ThreadInfo[] infos = mbean.dumpAllThreads(true, true);
         final long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
         final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();
-        
+
         final List<ThreadInfo> sortedInfos = new ArrayList<>(infos.length);
-        for ( final ThreadInfo info : infos ) {
+        for (final ThreadInfo info : infos) {
             sortedInfos.add(info);
         }
         Collections.sort(sortedInfos, new Comparator<ThreadInfo>() {
@@ -215,14 +214,14 @@ public class BootstrapListener {
                 return o1.getThreadName().toLowerCase().compareTo(o2.getThreadName().toLowerCase());
             }
         });
-        
+
         final StringBuilder sb = new StringBuilder();
-        for ( final ThreadInfo info : sortedInfos ) {
+        for (final ThreadInfo info : sortedInfos) {
             sb.append("\n");
             sb.append("\"").append(info.getThreadName()).append("\" Id=");
             sb.append(info.getThreadId()).append(" ");
             sb.append(info.getThreadState().toString()).append(" ");
-            
+
             switch (info.getThreadState()) {
                 case BLOCKED:
                 case TIMED_WAITING:
@@ -233,66 +232,66 @@ public class BootstrapListener {
                 default:
                     break;
             }
-            
+
             if (info.isSuspended()) {
                 sb.append(" (suspended)");
             }
-            if ( info.isInNative() ) {
+            if (info.isInNative()) {
                 sb.append(" (in native code)");
             }
-            
-            if ( deadlockedThreadIds != null && deadlockedThreadIds.length > 0 ) {
-                for ( final long id : deadlockedThreadIds ) {
-                    if ( id == info.getThreadId() ) {
+
+            if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
+                for (final long id : deadlockedThreadIds) {
+                    if (id == info.getThreadId()) {
                         sb.append(" ** DEADLOCKED THREAD **");
                     }
                 }
             }
 
-           if ( monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0 ) {
-                for ( final long id : monitorDeadlockThreadIds ) {
-                    if ( id == info.getThreadId() ) {
+            if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
+                for (final long id : monitorDeadlockThreadIds) {
+                    if (id == info.getThreadId()) {
                         sb.append(" ** MONITOR-DEADLOCKED THREAD **");
                     }
                 }
             }
 
             final StackTraceElement[] stackTraces = info.getStackTrace();
-            for ( final StackTraceElement element : stackTraces ) {
+            for (final StackTraceElement element : stackTraces) {
                 sb.append("\n\tat ").append(element);
-                
+
                 final MonitorInfo[] monitors = info.getLockedMonitors();
-                for ( final MonitorInfo monitor : monitors ) {
-                    if ( monitor.getLockedStackFrame().equals(element) ) {
+                for (final MonitorInfo monitor : monitors) {
+                    if (monitor.getLockedStackFrame().equals(element)) {
                         sb.append("\n\t- waiting on ").append(monitor);
                     }
                 }
             }
-            
+
             final LockInfo[] lockInfos = info.getLockedSynchronizers();
-            if ( lockInfos.length > 0 ) {
+            if (lockInfos.length > 0) {
                 sb.append("\n\t");
                 sb.append("Number of Locked Synchronizers: ").append(lockInfos.length);
-                for ( final LockInfo lockInfo : lockInfos ) {
+                for (final LockInfo lockInfo : lockInfos) {
                     sb.append("\n\t- ").append(lockInfo.toString());
                 }
             }
-            
+
             sb.append("\n");
         }
-        
+
         if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
             sb.append("\n\nDEADLOCK DETECTED!");
             sb.append("\nThe following thread IDs are deadlocked:");
-            for ( final long id : deadlockedThreadIds ) {
+            for (final long id : deadlockedThreadIds) {
                 sb.append("\n").append(id);
             }
         }
 
-       if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
+        if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
             sb.append("\n\nMONITOR DEADLOCK DETECTED!");
             sb.append("\nThe following thread IDs are deadlocked:");
-            for ( final long id : monitorDeadlockThreadIds ) {
+            for (final long id : monitorDeadlockThreadIds) {
                 sb.append("\n").append(id);
             }
         }
@@ -300,79 +299,79 @@ public class BootstrapListener {
         writer.write(sb.toString());
         writer.flush();
     }
-	
-	private void echoPing(final OutputStream out) throws IOException {
-		out.write("PING\n".getBytes(StandardCharsets.UTF_8));
-		out.flush();
-	}
-	
-	private void echoShutdown(final OutputStream out) throws IOException {
-		out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8));
-		out.flush();
-	}
-	
-	
-	@SuppressWarnings("resource")  // we don't want to close the stream, as the caller will do that
+
+    private void echoPing(final OutputStream out) throws IOException {
+        out.write("PING\n".getBytes(StandardCharsets.UTF_8));
+        out.flush();
+    }
+
+    private void echoShutdown(final OutputStream out) throws IOException {
+        out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8));
+        out.flush();
+    }
+
+    @SuppressWarnings("resource")  // we don't want to close the stream, as the caller will do that
     private BootstrapRequest readRequest(final InputStream in) throws IOException {
-	    // We want to ensure that we don't try to read data from an InputStream directly
-	    // by a BufferedReader because any user on the system could open a socket and send
-	    // a multi-gigabyte file without any new lines in order to crash the NiFi instance
-	    // (or at least cause OutOfMemoryErrors, which can wreak havoc on the running instance).
-	    // So we will limit the Input Stream to only 4 KB, which should be plenty for any request.
-	    final LimitingInputStream limitingIn = new LimitingInputStream(in, 4096);
-		final BufferedReader reader = new BufferedReader(new InputStreamReader(limitingIn));
-		
-		final String line = reader.readLine();
-		final String[] splits = line.split(" ");
-		if ( splits.length < 1 ) {
-			throw new IOException("Received invalid request from Bootstrap: " + line);
-		}
-		
-		final String requestType = splits[0];
-		final String[] args;
-		if ( splits.length == 1 ) {
-			throw new IOException("Received invalid request from Bootstrap; request did not have a secret key; request type = " + requestType);
-		} else if ( splits.length == 2 ) {
-		    args = new String[0];
-		} else {
-			args = Arrays.copyOfRange(splits, 2, splits.length);
-		}
-		
-		final String requestKey = splits[1];
-		if ( !secretKey.equals(requestKey) ) {
-		    throw new IOException("Received invalid Secret Key for request type " + requestType);
-		}
-		
-		try {
-			return new BootstrapRequest(requestType, args);
-		} catch (final Exception e) {
-			throw new IOException("Received invalid request from Bootstrap; request type = " + requestType);
-		}
-	}
-	
-	
-	private static class BootstrapRequest {
-		public static enum RequestType {
-			SHUTDOWN,
-			DUMP,
-			PING;
-		}
-		
-		private final RequestType requestType;
-		private final String[] args;
-		
-		public BootstrapRequest(final String request, final String[] args) {
-			this.requestType = RequestType.valueOf(request);
-			this.args = args;
-		}
-		
-		public RequestType getRequestType() {
-			return requestType;
-		}
-		
-		@SuppressWarnings("unused")
+        // We want to ensure that we don't try to read data from an InputStream directly
+        // by a BufferedReader because any user on the system could open a socket and send
+        // a multi-gigabyte file without any new lines in order to crash the NiFi instance
+        // (or at least cause OutOfMemoryErrors, which can wreak havoc on the running instance).
+        // So we will limit the Input Stream to only 4 KB, which should be plenty for any request.
+        final LimitingInputStream limitingIn = new LimitingInputStream(in, 4096);
+        final BufferedReader reader = new BufferedReader(new InputStreamReader(limitingIn));
+
+        final String line = reader.readLine();
+        final String[] splits = line.split(" ");
+        if (splits.length < 1) {
+            throw new IOException("Received invalid request from Bootstrap: " + line);
+        }
+
+        final String requestType = splits[0];
+        final String[] args;
+        if (splits.length == 1) {
+            throw new IOException("Received invalid request from Bootstrap; request did not have a secret key; request type = " + requestType);
+        } else if (splits.length == 2) {
+            args = new String[0];
+        } else {
+            args = Arrays.copyOfRange(splits, 2, splits.length);
+        }
+
+        final String requestKey = splits[1];
+        if (!secretKey.equals(requestKey)) {
+            throw new IOException("Received invalid Secret Key for request type " + requestType);
+        }
+
+        try {
+            return new BootstrapRequest(requestType, args);
+        } catch (final Exception e) {
+            throw new IOException("Received invalid request from Bootstrap; request type = " + requestType);
+        }
+    }
+
+    private static class BootstrapRequest {
+
+        public static enum RequestType {
+
+            SHUTDOWN,
+            DUMP,
+            PING;
+        }
+
+        private final RequestType requestType;
+        private final String[] args;
+
+        public BootstrapRequest(final String request, final String[] args) {
+            this.requestType = RequestType.valueOf(request);
+            this.args = args;
+        }
+
+        public RequestType getRequestType() {
+            return requestType;
+        }
+
+        @SuppressWarnings("unused")
         public String[] getArgs() {
-			return args;
-		}
-	}
+            return args;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
index e166f8e..ef2377f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
@@ -47,11 +47,12 @@ public class NiFi {
     private static final Logger logger = LoggerFactory.getLogger(NiFi.class);
     private final NiFiServer nifiServer;
     private final BootstrapListener bootstrapListener;
-    
+
     public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
     private volatile boolean shutdown = false;
 
-    public NiFi(final NiFiProperties properties) throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+    public NiFi(final NiFiProperties properties)
+            throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
         Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(final Thread t, final Throwable e) {
@@ -70,24 +71,24 @@ public class NiFi {
         }));
 
         final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
-        if ( bootstrapPort != null ) {
-        	try {
-        		final int port = Integer.parseInt(bootstrapPort);
-        		
-        		if (port < 1 || port > 65535) {
-        			throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
-        		}
-        		
-        		bootstrapListener = new BootstrapListener(this, port);
-        		bootstrapListener.start();
-        	} catch (final NumberFormatException nfe) {
-        		throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
-        	}
+        if (bootstrapPort != null) {
+            try {
+                final int port = Integer.parseInt(bootstrapPort);
+
+                if (port < 1 || port > 65535) {
+                    throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
+                }
+
+                bootstrapListener = new BootstrapListener(this, port);
+                bootstrapListener.start();
+            } catch (final NumberFormatException nfe) {
+                throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
+            }
         } else {
-        	logger.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap");
-        	bootstrapListener = null;
+            logger.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap");
+            bootstrapListener = null;
         }
-        
+
         // delete the web working dir - if the application does not start successfully
         // the web app directories might be in an invalid state. when this happens
         // jetty will not attempt to re-extract the war into the directory. by removing
@@ -118,7 +119,7 @@ public class NiFi {
         // discover the extensions
         ExtensionManager.discoverExtensions();
         ExtensionManager.logClassLoaderMapping();
-        
+
         DocGenerator.generate(properties);
 
         // load the server from the framework classloader
@@ -129,27 +130,27 @@ public class NiFi {
         final long startTime = System.nanoTime();
         nifiServer = (NiFiServer) jettyConstructor.newInstance(properties);
         nifiServer.setExtensionMapping(extensionMapping);
-        
-        if ( shutdown ) {
-        	logger.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller");
+
+        if (shutdown) {
+            logger.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller");
         } else {
-	        nifiServer.start();
-	        
-	        final long endTime = System.nanoTime();
-	        logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds.");
+            nifiServer.start();
+
+            final long endTime = System.nanoTime();
+            logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds.");
         }
     }
 
     protected void shutdownHook() {
         try {
-        	this.shutdown = true;
-        	
+            this.shutdown = true;
+
             logger.info("Initiating shutdown of Jetty web server...");
             if (nifiServer != null) {
                 nifiServer.stop();
             }
             if (bootstrapListener != null) {
-            	bootstrapListener.stop();
+                bootstrapListener.stop();
             }
             logger.info("Jetty web server shutdown completed (nicely or otherwise).");
         } catch (final Throwable t) {
@@ -164,10 +165,10 @@ public class NiFi {
         final int minRequiredOccurrences = 25;
         final int maxOccurrencesOutOfRange = 15;
         final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis());
-        
+
         final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
             private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
-            
+
             @Override
             public Thread newThread(final Runnable r) {
                 final Thread t = defaultFactory.newThread(r);
@@ -176,7 +177,7 @@ public class NiFi {
                 return t;
             }
         });
-        
+
         final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0);
         final AtomicInteger occurences = new AtomicInteger(0);
         final Runnable command = new Runnable() {
@@ -202,7 +203,8 @@ public class NiFi {
                 service.shutdownNow();
 
                 if (occurences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
-                    logger.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause Processors to be scheduled erratically. Please see the NiFi documentation for more information.");
+                    logger.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause "
+                            + "Processors to be scheduled erratically. Please see the NiFi documentation for more information.");
                 }
             }
         };
@@ -213,7 +215,7 @@ public class NiFi {
     /**
      * Main entry point of the application.
      *
-     * @param args
+     * @param args things which are ignored
      */
     public static void main(String[] args) {
         logger.info("Launching NiFi...");


Mime
View raw message