NIFI-1464 addressed PR comments from @apiri and @markap14
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f53f45de
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f53f45de
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f53f45de
Branch: refs/heads/master
Commit: f53f45def3ed353232ec65d656bba4eee922b252
Parents: 0c5b1c2
Author: Oleg Zhurakousky <oleg@suitcase.io>
Authored: Wed Feb 17 09:10:15 2016 -0500
Committer: Mark Payne <markap14@hotmail.com>
Committed: Fri Mar 11 12:54:50 2016 -0500
----------------------------------------------------------------------
.../apache/nifi/controller/ProcessorNode.java | 4 +-
.../nifi/controller/StandardProcessorNode.java | 6 +-
.../scheduling/AbstractSchedulingAgent.java | 2 +-
.../StandardControllerServiceProvider.java | 1 -
.../TestStandardControllerServiceProvider.java | 122 -------------------
5 files changed, 7 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/f53f45de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index d7dbe24..ff7977d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -102,7 +102,7 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent
implemen
/**
* Will start the {@link Processor} represented by this
- * {@link ProcessorNode}. Starting processor typically means invoking it's
+ * {@link ProcessorNode}. Starting processor typically means invoking its
* operation that is annotated with @OnScheduled and then executing a
* callback provided by the {@link ProcessScheduler} to which typically
* initiates
@@ -126,7 +126,7 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent
implemen
/**
* Will stop the {@link Processor} represented by this {@link ProcessorNode}.
- * Stopping processor typically means invoking it's operation that is
+ * Stopping processor typically means invoking its operation that is
* annotated with @OnUnschedule and then @OnStopped.
*
* @param scheduler
http://git-wip-us.apache.org/repos/asf/nifi/blob/f53f45de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 3e07995..dac56b5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1374,8 +1374,10 @@ public class StandardProcessorNode extends ProcessorNode implements
Connectable
}
});
- long onScheduleTimeout = Long.parseLong(NiFiProperties.getInstance()
- .getProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, String.valueOf(Long.MAX_VALUE)));
+ String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_START_TIMEOUT);
+ long onScheduleTimeout = timeoutString == null ? Long.MAX_VALUE
+ : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
+
try {
executionResult.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/f53f45de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
index b931c64..3544dac 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
@@ -23,7 +23,7 @@ import org.apache.nifi.controller.ReportingTaskNode;
* Base implementation of the {@link SchedulingAgent} which encapsulates the
* updates to the {@link ScheduleState} based on invoked operation and then
* delegates to the corresponding 'do' methods. For example; By invoking
- * {@link #schedule(Connectable, ScheduleState)} the the
+ * {@link #schedule(Connectable, ScheduleState)} the
* {@link ScheduleState#setScheduled(boolean)} with value 'true' will be
* invoked.
*
http://git-wip-us.apache.org/repos/asf/nifi/blob/f53f45de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 3b9b073..77dc87e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -457,7 +457,6 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService>
serviceType) {
final Set<String> identifiers = new HashSet<>();
for (final Map.Entry<String, ControllerServiceNode> entry : controllerServices.entrySet())
{
- Class<? extends ControllerService> c = entry.getValue().getProxiedControllerService().getClass();
if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getProxiedControllerService().getClass()))
{
identifiers.add(entry.getKey());
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f53f45de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index 9b35238..11b73a8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -17,7 +17,6 @@
package org.apache.nifi.controller.service;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.beans.PropertyDescriptor;
@@ -41,15 +40,11 @@ import org.apache.nifi.controller.service.mock.ServiceA;
import org.apache.nifi.controller.service.mock.ServiceB;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.StandardProcessGroup;
-import org.apache.nifi.processor.StandardProcessorInitializationContext;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
public class TestStandardControllerServiceProvider {
private static StateManagerProvider stateManagerProvider = new StateManagerProvider()
{
@@ -191,119 +186,6 @@ public class TestStandardControllerServiceProvider {
}
}
- @Test(timeout = 10000)
- @Ignore // this may be obsolete since TestProcessorLifecycle covers this
- // scenario without mocks
- public void testStartStopReferencingComponents() {
- final ProcessScheduler scheduler = createScheduler();
- final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler,
null, stateManagerProvider);
-
- // build a graph of reporting tasks and controller services with dependencies as
such:
- //
- // Processor P1 -> A -> B -> D
- // Processor P2 -> C ---^----^
- //
- // In other words, Processor P1 references Controller Service A, which references
B, which references D.
- // AND
- // Processor P2 references Controller Service C, which references B and D.
- //
- // So we have to verify that if D is enabled, when we enable its referencing services,
- // we enable C and B, even if we attempt to enable C before B... i.e., if we try
to enable C, we cannot do so
- // until B is first enabled so ensure that we enable B first.
- final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(),
"1", false);
- final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(),
"2", false);
- final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(),
"3", false);
- final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(),
"4", false);
-
- final ProcessGroup mockProcessGroup = Mockito.mock(ProcessGroup.class);
- Mockito.doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- final ProcessorNode procNode = (ProcessorNode) invocation.getArguments()[0];
- procNode.verifyCanStart();
- // procNode.setScheduledState(ScheduledState.RUNNING);
- return null;
- }
- }).when(mockProcessGroup).startProcessor(Mockito.any(ProcessorNode.class));
-
- Mockito.doAnswer(new Answer<Object>() {
- @Override
- public Object answer(final InvocationOnMock invocation) throws Throwable {
- final ProcessorNode procNode = (ProcessorNode) invocation.getArguments()[0];
- procNode.verifyCanStop();
- // procNode.setScheduledState(ScheduledState.STOPPED);
- return null;
- }
- }).when(mockProcessGroup).stopProcessor(Mockito.any(ProcessorNode.class));
-
- final String id1 = UUID.randomUUID().toString();
- final ProcessorNode procNodeA = new StandardProcessorNode(new DummyProcessor(), id1,
- new StandardValidationContextFactory(provider), scheduler, provider);
- procNodeA.getProcessor().initialize(new StandardProcessorInitializationContext(id1,
null, provider));
- procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1");
- procNodeA.setProcessGroup(mockProcessGroup);
-
- final String id2 = UUID.randomUUID().toString();
- final ProcessorNode procNodeB = new StandardProcessorNode(new DummyProcessor(), id2,
- new StandardValidationContextFactory(provider), scheduler, provider);
- procNodeB.getProcessor().initialize(new StandardProcessorInitializationContext(id2,
null, provider));
- procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3");
- procNodeB.setProcessGroup(mockProcessGroup);
-
- serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
- serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
- serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
- serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
-
- provider.enableControllerService(serviceNode4);
- provider.enableReferencingServices(serviceNode4);
- provider.scheduleReferencingComponents(serviceNode4);
-
- final Set<ControllerServiceState> enableStates = new HashSet<>();
- enableStates.add(ControllerServiceState.ENABLED);
- enableStates.add(ControllerServiceState.ENABLING);
-
- while (serviceNode3.getState() != ControllerServiceState.ENABLED
- || serviceNode2.getState() != ControllerServiceState.ENABLED
- || serviceNode1.getState() != ControllerServiceState.ENABLED) {
- assertTrue(enableStates.contains(serviceNode3.getState()));
- assertTrue(enableStates.contains(serviceNode2.getState()));
- assertTrue(enableStates.contains(serviceNode1.getState()));
- }
- assertTrue(procNodeA.isRunning());
- assertTrue(procNodeB.isRunning());
-
- // stop processors and verify results.
- provider.unscheduleReferencingComponents(serviceNode4);
- assertFalse(procNodeA.isRunning());
- assertFalse(procNodeB.isRunning());
- while (serviceNode3.getState() != ControllerServiceState.ENABLED
- || serviceNode2.getState() != ControllerServiceState.ENABLED
- || serviceNode1.getState() != ControllerServiceState.ENABLED) {
- assertTrue(enableStates.contains(serviceNode3.getState()));
- assertTrue(enableStates.contains(serviceNode2.getState()));
- assertTrue(enableStates.contains(serviceNode1.getState()));
- }
-
- provider.disableReferencingServices(serviceNode4);
- final Set<ControllerServiceState> disableStates = new HashSet<>();
- disableStates.add(ControllerServiceState.DISABLED);
- disableStates.add(ControllerServiceState.DISABLING);
-
- // Wait for the services to be disabled.
- while (serviceNode3.getState() != ControllerServiceState.DISABLED
- || serviceNode2.getState() != ControllerServiceState.DISABLED
- || serviceNode1.getState() != ControllerServiceState.DISABLED) {
- assertTrue(disableStates.contains(serviceNode3.getState()));
- assertTrue(disableStates.contains(serviceNode2.getState()));
- assertTrue(disableStates.contains(serviceNode1.getState()));
- }
-
- assertEquals(ControllerServiceState.ENABLED, serviceNode4.getState());
-
- provider.disableControllerService(serviceNode4);
- assertTrue(disableStates.contains(serviceNode4.getState()));
- }
@Test
public void testOrderingOfServices() {
@@ -476,9 +358,5 @@ public class TestStandardControllerServiceProvider {
// procNode.setScheduledState(ScheduledState.RUNNING);
provider.unscheduleReferencingComponents(serviceNode);
assertEquals(ScheduledState.STOPPED, procNode.getScheduledState());
-
- // procNode.setScheduledState(ScheduledState.DISABLED);
- // provider.unscheduleReferencingComponents(serviceNode);
- // assertEquals(ScheduledState.DISABLED, procNode.getScheduledState());
}
}
|