nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pvill...@apache.org
Subject nifi git commit: NIFI-4546: Make ReportingTask aware of node type in a cluster
Date Mon, 30 Oct 2017 10:22:51 GMT
Repository: nifi
Updated Branches:
  refs/heads/master d914ad292 -> a73b5bda4


NIFI-4546: Make ReportingTask aware of node type in a cluster

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2235.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a73b5bda
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a73b5bda
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a73b5bda

Branch: refs/heads/master
Commit: a73b5bda4233fd28f3ddeca789ec6150a4e9878f
Parents: d914ad2
Author: Koji Kawamura <ijokarumawak@apache.org>
Authored: Mon Oct 30 10:19:07 2017 +0900
Committer: Pierre Villard <pierre.villard.fr@gmail.com>
Committed: Mon Oct 30 11:22:03 2017 +0100

----------------------------------------------------------------------
 .../apache/nifi/reporting/AbstractReportingTask.java  | 14 ++++++++++++--
 .../reporting/ReportingInitializationContext.java     |  9 +++++++++
 .../nifi/util/MockReportingInitializationContext.java |  6 ++++++
 .../org/apache/nifi/controller/FlowController.java    |  2 +-
 .../nifi/controller/StandardFlowSynchronizer.java     |  2 +-
 .../StandardReportingInitializationContext.java       | 11 ++++++++++-
 .../scheduling/TestStandardProcessScheduler.java      |  2 +-
 .../nifi/mock/MockReportingInitializationContext.java |  6 ++++++
 8 files changed, 46 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a73b5bda/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java b/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java
index b5afe17..93d23b2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java
+++ b/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java
@@ -20,8 +20,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.components.AbstractConfigurableComponent;
 import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.controller.NodeTypeProvider;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 
 public abstract class AbstractReportingTask extends AbstractConfigurableComponent implements
ReportingTask {
 
@@ -30,6 +30,7 @@ public abstract class AbstractReportingTask extends AbstractConfigurableComponen
     private long schedulingNanos;
     private ControllerServiceLookup serviceLookup;
     private ComponentLog logger;
+    private NodeTypeProvider nodeTypeProvider;
 
     @Override
     public final void initialize(final ReportingInitializationContext config) throws InitializationException
{
@@ -38,19 +39,28 @@ public abstract class AbstractReportingTask extends AbstractConfigurableComponen
         name = config.getName();
         schedulingNanos = config.getSchedulingPeriod(TimeUnit.NANOSECONDS);
         serviceLookup = config.getControllerServiceLookup();
+        nodeTypeProvider = config.getNodeTypeProvider();
 
         init(config);
     }
 
     /**
      * @return the {@link ControllerServiceLookup} that was passed to the
-     * {@link #init(ProcessorInitializationContext)} method
+     * {@link #initialize(ReportingInitializationContext)} method
      */
     protected final ControllerServiceLookup getControllerServiceLookup() {
         return serviceLookup;
     }
 
     /**
+     * @return the {@link NodeTypeProvider} that was passed to the
+     * {@link #initialize(ReportingInitializationContext)} method
+     */
+    protected final NodeTypeProvider getNodeTypeProvider() {
+        return nodeTypeProvider;
+    }
+
+    /**
      * @return the identifier of this Reporting Task
      */
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/a73b5bda/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java
b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java
index df64e03..0bf49d3 100644
--- a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java
@@ -19,6 +19,7 @@ package org.apache.nifi.reporting;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.controller.NodeTypeProvider;
 import org.apache.nifi.kerberos.KerberosContext;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -75,4 +76,12 @@ public interface ReportingInitializationContext extends KerberosContext
{
      * way and generate bulletins when appropriate
      */
     ComponentLog getLogger();
+
+    /**
+     * @return the {@link NodeTypeProvider} which can be used to detect the node
+     * type of this NiFi instance.
+     * @since Apache NiFi 1.5.0
+     */
+    NodeTypeProvider getNodeTypeProvider();
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a73b5bda/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java
b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java
index 454b742..d1b8e5c 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.controller.NodeTypeProvider;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -74,6 +75,11 @@ public class MockReportingInitializationContext extends MockControllerServiceLoo
     }
 
     @Override
+    public NodeTypeProvider getNodeTypeProvider() {
+        return null;
+    }
+
+    @Override
     public String getSchedulingPeriod() {
         return "0 sec";
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a73b5bda/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index acc3102..99d8ed0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -3150,7 +3150,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
 
         if (firstTimeAdded) {
             final ReportingInitializationContext config = new StandardReportingInitializationContext(id,
taskNode.getName(),
-                    SchedulingStrategy.TIMER_DRIVEN, "1 min", taskNode.getLogger(), this,
nifiProperties);
+                    SchedulingStrategy.TIMER_DRIVEN, "1 min", taskNode.getLogger(), this,
nifiProperties, this);
 
             try {
                 taskNode.getReportingTask().initialize(config);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a73b5bda/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 3af270c..3d07456 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -620,7 +620,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
 
             final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask());
             final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(),
dto.getName(),
-                    SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(),
componentLog, controller, nifiProperties);
+                    SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(),
componentLog, controller, nifiProperties, controller);
 
             try {
                 reportingTask.getReportingTask().initialize(config);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a73b5bda/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
index d96d2b7..ebe774b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.controller.NodeTypeProvider;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.reporting.ReportingInitializationContext;
@@ -38,11 +39,13 @@ public class StandardReportingInitializationContext implements ReportingInitiali
     private final ControllerServiceProvider serviceProvider;
     private final ComponentLog logger;
     private final NiFiProperties nifiProperties;
+    private final NodeTypeProvider nodeTypeProvider;
 
     public StandardReportingInitializationContext(
             final String id, final String name, final SchedulingStrategy schedulingStrategy,
             final String schedulingPeriod, final ComponentLog logger,
-            final ControllerServiceProvider serviceProvider, final NiFiProperties nifiProperties)
{
+            final ControllerServiceProvider serviceProvider, final NiFiProperties nifiProperties,
+            final NodeTypeProvider nodeTypeProvider) {
         this.id = id;
         this.name = name;
         this.schedulingPeriod = schedulingPeriod;
@@ -50,6 +53,7 @@ public class StandardReportingInitializationContext implements ReportingInitiali
         this.schedulingStrategy = schedulingStrategy;
         this.logger = logger;
         this.nifiProperties = nifiProperties;
+        this.nodeTypeProvider = nodeTypeProvider;
     }
 
     @Override
@@ -134,4 +138,9 @@ public class StandardReportingInitializationContext implements ReportingInitiali
     public File getKerberosConfigurationFile() {
         return nifiProperties.getKerberosConfigurationFile();
     }
+
+    @Override
+    public NodeTypeProvider getNodeTypeProvider() {
+        return nodeTypeProvider;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a73b5bda/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 2c59964..0c4acd8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -116,7 +116,7 @@ public class TestStandardProcessScheduler {
 
         reportingTask = new TestReportingTask();
         final ReportingInitializationContext config = new StandardReportingInitializationContext(UUID.randomUUID().toString(),
"Test", SchedulingStrategy.TIMER_DRIVEN, "5 secs",
-                Mockito.mock(ComponentLog.class), null, nifiProperties);
+                Mockito.mock(ComponentLog.class), null, nifiProperties, null);
         reportingTask.initialize(config);
 
         final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null,
variableRegistry);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a73b5bda/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java
index 630c657..379a56e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.mock;
 
 import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.controller.NodeTypeProvider;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -80,4 +81,9 @@ public class MockReportingInitializationContext implements ReportingInitializati
     public File getKerberosConfigurationFile() {
         return null;
     }
+
+    @Override
+    public NodeTypeProvider getNodeTypeProvider() {
+        return null;
+    }
 }


Mime
View raw message