nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbe...@apache.org
Subject nifi git commit: NIFI-4392 Create a MetricReportingTask with GraphiteMetricService
Date Sun, 08 Oct 2017 14:44:35 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 5930c0c21 -> 6201c06c9


NIFI-4392 Create a MetricReportingTask with GraphiteMetricService

This closes #2171.

Signed-off-by: Bryan Bende <bbende@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6201c06c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6201c06c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6201c06c

Branch: refs/heads/master
Commit: 6201c06c996746024249b1f448cea09d16dc7ba8
Parents: 5930c0c
Author: Omer Hadari <hadari.omer@gmail.com>
Authored: Sun Sep 24 22:24:09 2017 +0300
Committer: Bryan Bende <bbende@apache.org>
Committed: Sun Oct 8 10:44:11 2017 -0400

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |  10 +
 .../org/apache/nifi/util/MockEventAccess.java   |   8 +-
 .../pom.xml                                     |  40 +++
 .../src/main/resources/META-INF/NOTICE          |  25 ++
 .../nifi-metrics-reporter-service-api/pom.xml   |  27 ++
 .../reporter/service/MetricReporterService.java |  40 +++
 .../nifi-metrics-reporting-nar/pom.xml          |  42 +++
 .../src/main/resources/META-INF/NOTICE          |  25 ++
 .../nifi-metrics-reporting-task/pom.xml         |  55 ++++
 .../org/apache/nifi/metrics/FlowMetricSet.java  |  95 +++++++
 .../org/apache/nifi/metrics/MetricNames.java    |  35 +++
 .../service/GraphiteMetricReporterService.java  | 180 +++++++++++++
 .../reporting/task/MetricsReportingTask.java    | 151 +++++++++++
 ...org.apache.nifi.controller.ControllerService |  15 ++
 .../org.apache.nifi.reporting.ReportingTask     |  15 ++
 .../GraphiteMetricReporterServiceTest.java      | 211 +++++++++++++++
 .../task/MetricsReportingTaskTest.java          | 254 +++++++++++++++++++
 .../nifi-metrics-reporting-bundle/pom.xml       |  57 +++++
 nifi-nar-bundles/pom.xml                        |   1 +
 pom.xml                                         |  12 +
 20 files changed, 1297 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 108438b..7710b3d 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -516,6 +516,16 @@
             <artifactId>nifi-redis-nar</artifactId>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-metrics-reporter-service-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-metrics-reporting-nar</artifactId>
+            <type>nar</type>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java
index 2a2aab2..38d1619 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java
@@ -18,7 +18,9 @@ package org.apache.nifi.util;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.nifi.action.Action;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
@@ -31,10 +33,14 @@ public class MockEventAccess implements EventAccess {
     private ProcessGroupStatus processGroupStatus;
     private final List<ProvenanceEventRecord> provenanceRecords = new ArrayList<>();
     private final List<Action> flowChanges = new ArrayList<>();
+    private final Map<String, ProcessGroupStatus> processGroupStatusMap = new HashMap<>();
 
     public void setProcessGroupStatus(final ProcessGroupStatus status) {
         this.processGroupStatus = status;
     }
+    public void setProcessGroupStatus(String groupId, final ProcessGroupStatus status) {
+        processGroupStatusMap.put(groupId, status);
+    }
 
     @Override
     public ProcessGroupStatus getControllerStatus() {
@@ -43,7 +49,7 @@ public class MockEventAccess implements EventAccess {
 
     @Override
     public ProcessGroupStatus getGroupStatus(final String groupId) {
-        return null;
+        return processGroupStatusMap.get(groupId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/pom.xml b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/pom.xml
new file mode 100644
index 0000000..de48b81
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-metrics-reporting-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+    <packaging>nar</packaging>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-metrics-reporter-service-api-nar</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-metrics-reporter-service-api</artifactId>
+            <version>1.5.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..76b99fd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,25 @@
+nifi-metrics-reporter-service-api-nar
+Copyright 2015-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License v2
+
+  (ASLv2) Dropwizard Metrics
+    The following NOTICE information applies:
+      Metrics
+      Copyright 2010-2013 Coda Hale and Yammer, Inc., 2014-2017 Dropwizard Team
+
+      This product includes software developed by Coda Hale and Yammer, Inc.
+
+      This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64,
+      LongAdder), which was released with the following comments:
+
+          Written by Doug Lea with assistance from members of JCP JSR-166
+          Expert Group and released to the public domain, as explained at
+      http://creativecommons.org/publicdomain/zero/1.0/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/pom.xml b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/pom.xml
new file mode 100644
index 0000000..173d35c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/pom.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-metrics-reporting-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-metrics-reporter-service-api</artifactId>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/MetricReporterService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/MetricReporterService.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/MetricReporterService.java
new file mode 100644
index 0000000..d87be15
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/MetricReporterService.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.metrics.reporting.reporter.service;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.processor.exception.ProcessException;
+
+/**
+ * An interface for controller services used by MetricsReportingTask. In order to report to a new
+ * client, implement this interface and make sure to return the desired implementation of {@link ScheduledReporter}.
+ *
+ * @author Omer Hadari
+ */
+public interface MetricReporterService extends ControllerService {
+
+    /**
+     * Create a reporter to a metric client (i.e. graphite).
+     *
+     * @param metricRegistry registry with the metrics to report.
+     * @return an instance of the reporter.
+     * @throws ProcessException if there was an error creating the reporter.
+     */
+    ScheduledReporter createReporter(MetricRegistry metricRegistry) throws ProcessException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/pom.xml b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/pom.xml
new file mode 100644
index 0000000..f92b17b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-metrics-reporting-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+    <packaging>nar</packaging>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-metrics-reporting-nar</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-metrics-reporting-task</artifactId>
+            <version>1.5.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-metrics-reporter-service-api-nar</artifactId>
+            <version>1.5.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..504e3f1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,25 @@
+nifi-metrics-reporting-nar
+Copyright 2015-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License v2
+
+  (ASLv2) Dropwizard Metrics
+    The following NOTICE information applies:
+      Metrics
+      Copyright 2010-2013 Coda Hale and Yammer, Inc., 2014-2017 Dropwizard Team
+
+      This product includes software developed by Coda Hale and Yammer, Inc.
+
+      This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64,
+      LongAdder), which was released with the following comments:
+
+          Written by Doug Lea with assistance from members of JCP JSR-166
+          Expert Group and released to the public domain, as explained at
+      http://creativecommons.org/publicdomain/zero/1.0/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/pom.xml b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/pom.xml
new file mode 100644
index 0000000..7b437bc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-metrics-reporting-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-metrics-reporting-task</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-metrics-reporter-service-api</artifactId>
+            <version>1.5.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-graphite</artifactId>
+            <version>3.1.2</version>
+        </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-jvm</artifactId>
+            <version>3.1.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/FlowMetricSet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/FlowMetricSet.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/FlowMetricSet.java
new file mode 100644
index 0000000..d6e2fba
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/FlowMetricSet.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A metric set of NiFi instance related metrics.
+ *
+ * @author Omer Hadari
+ */
+public class FlowMetricSet implements MetricSet {
+
+
+    /**
+     * Reference to the process status that should be reported. Should be updated when the status changes.
+     */
+    private final AtomicReference<ProcessGroupStatus> currentStatusReference;
+
+    /**
+     * Create a metric set that will look at a given process status reference for deciding metrics.
+     *
+     * @param currentStatusReference a reference to the process status.
+     */
+    public FlowMetricSet(AtomicReference<ProcessGroupStatus> currentStatusReference) {
+        this.currentStatusReference = currentStatusReference;
+    }
+
+    /**
+     * Create a map of {@link Gauge}s for the {@link #currentStatusReference}. This methods reports the metrics as
+     * found in the reference.
+     *
+     * @return map between the metric name and a {@link Gauge} to it's value.
+     */
+    @Override
+    public Map<String, Metric> getMetrics() {
+
+        Map<String, Metric> metrics = new HashMap<>();
+
+        metrics.put(MetricNames.ACTIVE_THREADS, (Gauge<Integer>) () -> currentStatusReference.get().getActiveThreadCount());
+        metrics.put(MetricNames.BYTES_QUEUED, (Gauge<Long>) () -> currentStatusReference.get().getQueuedContentSize());
+        metrics.put(MetricNames.BYTES_READ, (Gauge<Long>) () -> currentStatusReference.get().getBytesRead());
+        metrics.put(MetricNames.BYTES_RECEIVED, (Gauge<Long>) () -> currentStatusReference.get().getBytesReceived());
+        metrics.put(MetricNames.BYTES_SENT, (Gauge<Long>) () -> currentStatusReference.get().getBytesSent());
+        metrics.put(MetricNames.BYTES_WRITTEN, (Gauge<Long>) () -> currentStatusReference.get().getBytesWritten());
+        metrics.put(MetricNames.FLOW_FILES_RECEIVED, (Gauge<Integer>) () -> currentStatusReference.get().getFlowFilesReceived());
+        metrics.put(MetricNames.FLOW_FILES_QUEUED, (Gauge<Integer>) () -> currentStatusReference.get().getQueuedCount());
+        metrics.put(MetricNames.FLOW_FILES_SENT, (Gauge<Integer>) () -> currentStatusReference.get().getFlowFilesSent());
+        metrics.put(MetricNames.TOTAL_TASK_DURATION_NANOS, (Gauge<Long>) () -> calculateProcessingNanos(currentStatusReference.get()));
+
+        return metrics;
+    }
+
+    /**
+     * Calculate the total processing time of a process group.
+     *
+     * @param status the current process group status.
+     * @return the total amount of nanoseconds spent in each processor in the process group.
+     */
+    private long calculateProcessingNanos(final ProcessGroupStatus status) {
+        long nanos = 0L;
+
+        for (final ProcessorStatus procStats : status.getProcessorStatus()) {
+            nanos += procStats.getProcessingNanos();
+        }
+
+        for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
+            nanos += calculateProcessingNanos(childGroupStatus);
+        }
+
+        return nanos;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/MetricNames.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/MetricNames.java
new file mode 100644
index 0000000..fa06b8b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/MetricNames.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.metrics;
+
+/**
+ * The Metric names to send to Ambari.
+ */
+public interface MetricNames {
+
+    // NiFi Metrics
+    String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
+    String BYTES_RECEIVED = "BytesReceivedLast5Minutes";
+    String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes";
+    String BYTES_SENT = "BytesSentLast5Minutes";
+    String FLOW_FILES_QUEUED = "FlowFilesQueued";
+    String BYTES_QUEUED = "BytesQueued";
+    String BYTES_READ = "BytesReadLast5Minutes";
+    String BYTES_WRITTEN = "BytesWrittenLast5Minutes";
+    String ACTIVE_THREADS = "ActiveThreads";
+    String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds";
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java
new file mode 100644
index 0000000..55623ce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.metrics.reporting.reporter.service;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.graphite.GraphiteReporter;
+import com.codahale.metrics.graphite.GraphiteSender;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.metrics.reporting.task.MetricsReportingTask;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A controller service that provides metric reporters for graphite, can be used by {@link MetricsReportingTask}.
+ *
+ * @author Omer Hadari
+ */
+@Tags({"metrics", "reporting", "graphite"})
+@CapabilityDescription("A controller service that provides metric reporters for graphite. " +
+        "Used by MetricsReportingTask.")
+public class GraphiteMetricReporterService extends AbstractControllerService implements MetricReporterService {
+
+    /**
+     * Points to the hostname of the graphite listener.
+     */
+    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("host")
+            .displayName("Host")
+            .description("The hostname of the carbon listener")
+            .required(true)
+            .addValidator(StandardValidators.URI_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    /**
+     * Points to the port on which the graphite server listens.
+     */
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("port")
+            .displayName("Port")
+            .description("The port on which carbon listens")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    /**
+     * Points to the charset name that the graphite server expects.
+     */
+    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("charset")
+            .displayName("Charset")
+            .description("The charset used by the graphite server")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    /**
+     * Prefix for all metric names sent by reporters - for separation of NiFi stats in graphite.
+     */
+    protected static final PropertyDescriptor METRIC_NAME_PREFIX = new PropertyDescriptor.Builder()
+            .name("metric name prefix")
+            .displayName("Metric Name Prefix")
+            .description("A prefix that will be used for all metric names sent by reporters provided by this service.")
+            .required(true)
+            .defaultValue("nifi")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    /**
+     * List of property descriptors used by the service.
+     */
+    private static final List<PropertyDescriptor> properties;
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(HOST);
+        props.add(PORT);
+        props.add(CHARSET);
+        props.add(METRIC_NAME_PREFIX);
+        properties = Collections.unmodifiableList(props);
+    }
+
+    /**
+     * Graphite sender, a connection to the server.
+     */
+    private GraphiteSender graphiteSender;
+
+    /**
+     * The configured {@link #METRIC_NAME_PREFIX} value.
+     */
+    private String metricNamePrefix;
+
+    /**
+     * Create the {@link #graphiteSender} according to configuration.
+     *
+     * @param context used to access properties.
+     */
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        String host = context.getProperty(HOST).evaluateAttributeExpressions().getValue();
+        int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+        Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
+        graphiteSender = createSender(host, port, charset);
+        metricNamePrefix = context.getProperty(METRIC_NAME_PREFIX).evaluateAttributeExpressions().getValue();
+    }
+
+    /**
+     * Close the graphite sender.
+     *
+     * @throws IOException if failed to close the connection.
+     */
+    @OnDisabled
+    public void shutdown() throws IOException {
+        try {
+            graphiteSender.close();
+        } finally {
+            graphiteSender = null;
+        }
+    }
+
+    /**
+     * Use the {@link #graphiteSender} in order to create a reporter.
+     *
+     * @param metricRegistry registry with the metrics to report.
+     * @return a reporter instance.
+     */
+    @Override
+    public ScheduledReporter createReporter(MetricRegistry metricRegistry) {
+        return GraphiteReporter.forRegistry(metricRegistry).prefixedWith(metricNamePrefix).build(graphiteSender);
+
+    }
+
+    /**
+     * Create a sender.
+     *
+     * @param host the hostname of the server to connect to.
+     * @param port the port on which the server listens.
+     * @param charset the charset in which the server expects logs.
+     * @return The created sender.
+     */
+    protected GraphiteSender createSender(String host, int port, Charset charset) {
+        return new Graphite(host, port, SocketFactory.getDefault(), charset);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java
new file mode 100644
index 0000000..37eb194
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.metrics.reporting.task;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.metrics.FlowMetricSet;
+import org.apache.nifi.metrics.reporting.reporter.service.MetricReporterService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A reporting task for NiFi instance and JVM related metrics.
+ * <p>
+ * This task reports metrics to services according to a provided {@link ScheduledReporter}, reached by using a
+ * {@link MetricReporterService}. In order to report to different clients, simply use different implementations of
+ * the controller service.
+ *
+ * @author Omer Hadari
+ * @see MetricReporterService
+ */
+@Tags({"metrics", "reporting"})
+@CapabilityDescription("This reporting task reports a set of metrics regarding the JVM and the NiFi instance" +
+        "to a reporter. The reporter is provided by a MetricReporterService. It can be optionally used for a specific" +
+        "process group if a property with the group id is provided.")
+public class MetricsReportingTask extends AbstractReportingTask {
+
+    /**
+     * Points to the service which provides {@link ScheduledReporter} instances.
+     */
+    protected static final PropertyDescriptor REPORTER_SERVICE = new PropertyDescriptor.Builder()
+            .name("metric reporter service")
+            .displayName("Metric Reporter Service")
+            .description("The service that provides a reporter for the gathered metrics")
+            .identifiesControllerService(MetricReporterService.class)
+            .required(true)
+            .build();
+
+    /**
+     * Metrics of the process group with this ID should be reported. If not specified, use the root process group.
+     */
+    protected static final PropertyDescriptor PROCESS_GROUP_ID = new PropertyDescriptor.Builder()
+            .name("process group id")
+            .displayName("Process Group ID")
+            .description("The id of the process group to report. If not specified, metrics of the root process group" +
+                    "are reported.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    /**
+     * Contains the metrics that should be reported.
+     */
+    private MetricRegistry metricRegistry;
+
+    /**
+     * Used for actually reporting metrics.
+     */
+    private ScheduledReporter reporter;
+
+    // Protected for testing sake. DO NOT ACCESS FOR OTHER PURPOSES.
+    /**
+     * Points to the most recent process group status seen by this task.
+     */
+    protected AtomicReference<ProcessGroupStatus> currentStatusReference;
+
+    /**
+     * Register all wanted metrics to {@link #metricRegistry}.
+     * <p>
+     * {@inheritDoc}
+     */
+    @Override
+    protected void init(ReportingInitializationContext config) {
+        metricRegistry = new MetricRegistry();
+        currentStatusReference = new AtomicReference<>();
+        metricRegistry.registerAll(new MemoryUsageGaugeSet());
+        metricRegistry.registerAll(new FlowMetricSet(currentStatusReference));
+    }
+
+    /**
+     * Populate {@link #reporter} using the {@link MetricReporterService}. If the reporter is active already,
+     * do nothing.
+     *
+     * @param context used for accessing the controller service.
+     */
+    @OnScheduled
+    public void connect(ConfigurationContext context) {
+        if (reporter == null) {
+            reporter = ((MetricReporterService) context.getProperty(REPORTER_SERVICE).asControllerService())
+                    .createReporter(metricRegistry);
+        }
+    }
+
+    /**
+     * Report the registered metrics.
+     *
+     * @param context used for getting the most recent {@link ProcessGroupStatus}.
+     */
+    @Override
+    public void onTrigger(ReportingContext context) {
+        String groupId = context.getProperty(PROCESS_GROUP_ID).evaluateAttributeExpressions().getValue();
+
+        ProcessGroupStatus statusToReport = groupId == null
+                ? context.getEventAccess().getControllerStatus()
+                : context.getEventAccess().getGroupStatus(groupId);
+
+        if (statusToReport != null) {
+            currentStatusReference.set(statusToReport);
+            reporter.report();
+        } else {
+            getLogger().error("Process group with provided group id could not be found.");
+        }
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(REPORTER_SERVICE);
+        properties.add(PROCESS_GROUP_ID);
+        return Collections.unmodifiableList(properties);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..0ddc70a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.metrics.reporting.reporter.service.GraphiteMetricReporterService
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
new file mode 100644
index 0000000..8c554a4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.metrics.reporting.task.MetricsReportingTask

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterServiceTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterServiceTest.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterServiceTest.java
new file mode 100644
index 0000000..e7257a0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterServiceTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.metrics.reporting.reporter.service;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.graphite.GraphiteSender;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test class for {@link GraphiteMetricReporterService}.
+ *
+ * @author Omer Hadari
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class GraphiteMetricReporterServiceTest {
+
+    /**
+     * Service identifier for registerting the tested service to the tests runner.
+     */
+    private static final String SERVICE_IDENTIFIER = "graphite-metric-reporter-service";
+
+    /**
+     * Sample host name for the {@link GraphiteMetricReporterService#HOST} property.
+     */
+    private static final String TEST_HOST = "some-host";
+
+    /**
+     * Sample port for the {@link GraphiteMetricReporterService#PORT} property.
+     */
+    private static final int TEST_PORT = 12345;
+
+    /**
+     * Sample charset for the {@link GraphiteMetricReporterService#CHARSET} property.
+     */
+    private static final Charset TEST_CHARSET = StandardCharsets.UTF_16LE;
+
+    /**
+     * Sample prefix for metric names.
+     */
+    private static final String METRIC_NAMES_PREFIX = "test-metric-name-prefix";
+
+    /**
+     * Sample metric for verifying that a graphite sender with the correct configuration is used.
+     */
+    private static final String TEST_METRIC_NAME = "test-metric";
+
+    /**
+     * The fixed value of {@link #TEST_METRIC_NAME}.
+     */
+    private static final int TEST_METRIC_VALUE = 2;
+
+    /**
+     * Dummy processor for creating {@link #runner}.
+     */
+    @Mock
+    private Processor processorDummy;
+
+    /**
+     * Mock sender for verifying creation with the correct configuration.
+     */
+    @Mock
+    private GraphiteSender graphiteSenderMock;
+
+    /**
+     * Stub metric registry, that contains the test metrics.
+     */
+    private MetricRegistry metricRegistryStub;
+
+    /**
+     * Test runner for activating and configuring the service.
+     */
+    private TestRunner runner;
+
+    /**
+     * The test subject.
+     */
+    private GraphiteMetricReporterService testedService;
+
+    /**
+     * Instantiate the runner and mocks between tests. Register metrics to the {@link #metricRegistryStub}.
+     */
+    @Before
+    public void setUp() throws Exception {
+        runner = TestRunners.newTestRunner(processorDummy);
+        testedService = new GraphiteMetricReporterService();
+
+        metricRegistryStub = new MetricRegistry();
+        metricRegistryStub.register(TEST_METRIC_NAME, ((Gauge<Integer>) () -> TEST_METRIC_VALUE));
+
+    }
+
+
+    /**
+     * Make sure that a correctly configured service can be activated.
+     */
+    @Test
+    public void testGraphiteMetricReporterSanityConfiguration() throws Exception {
+        runner.addControllerService(SERVICE_IDENTIFIER, testedService);
+        setServiceProperties(TEST_HOST, TEST_PORT, TEST_CHARSET, METRIC_NAMES_PREFIX);
+        runner.enableControllerService(testedService);
+
+        runner.assertValid(testedService);
+    }
+
+
+    /**
+     * Make sure that a correctly configured service provides a reporter for the matching configuration, and
+     * actually reports to the correct address.
+     */
+    @Test
+    public void testCreateReporterUsesCorrectSender() throws Exception {
+        testedService = new TestableGraphiteMetricReporterService();
+        runner.addControllerService(SERVICE_IDENTIFIER, testedService);
+        setServiceProperties(TEST_HOST, TEST_PORT, TEST_CHARSET, METRIC_NAMES_PREFIX);
+        when(graphiteSenderMock.isConnected()).thenReturn(false);
+        runner.enableControllerService(testedService);
+
+        ScheduledReporter createdReporter = testedService.createReporter(metricRegistryStub);
+        createdReporter.report();
+
+        String expectedMetricName = MetricRegistry.name(METRIC_NAMES_PREFIX, TEST_METRIC_NAME);
+        verify(graphiteSenderMock).send(eq(expectedMetricName), eq(String.valueOf(TEST_METRIC_VALUE)), anyLong());
+    }
+
+    /**
+     * Make sure that {@link GraphiteMetricReporterService#shutdown()} closes the connection to graphite.
+     */
+    @Test
+    public void testShutdownClosesSender() throws Exception {
+        testedService = new TestableGraphiteMetricReporterService();
+        runner.addControllerService(SERVICE_IDENTIFIER, testedService);
+        setServiceProperties(TEST_HOST, TEST_PORT, TEST_CHARSET, METRIC_NAMES_PREFIX);
+        runner.enableControllerService(testedService);
+        runner.disableControllerService(testedService);
+
+        verify(graphiteSenderMock).close();
+    }
+
+    /**
+     * Set the test subject's properties.
+     *
+     * @param host              populates {@link GraphiteMetricReporterService#HOST}.
+     * @param port              populates {@link GraphiteMetricReporterService#PORT}.
+     * @param charset           populates {@link GraphiteMetricReporterService#CHARSET}.
+     * @param metricNamesPrefix populates {@link GraphiteMetricReporterService#METRIC_NAME_PREFIX}.
+     */
+    private void setServiceProperties(String host, int port, Charset charset, String metricNamesPrefix) {
+        runner.setProperty(testedService, GraphiteMetricReporterService.HOST, host);
+        runner.setProperty(testedService, GraphiteMetricReporterService.PORT, String.valueOf(port));
+        runner.setProperty(testedService, GraphiteMetricReporterService.CHARSET, charset.name());
+        runner.setProperty(testedService, GraphiteMetricReporterService.METRIC_NAME_PREFIX, metricNamesPrefix);
+    }
+
+    /**
+     * This class is a patch. It overrides {@link GraphiteMetricReporterService#createSender(String, int, Charset)}
+     * so that it is possible to verify a correct creation of graphite senders according to property values.
+     */
+    private class TestableGraphiteMetricReporterService extends GraphiteMetricReporterService {
+
+        /**
+         * Overrides the actual methods in order to inject the mock {@link #graphiteSenderMock}.
+         * <p>
+         * If this method is called with the test property values, it returns the mock. Otherwise operate
+         * regularly.
+         *
+         * @param host    the provided hostname.
+         * @param port    the provided port.
+         * @param charset the provided graphite server charset.
+         * @return {@link #graphiteSenderMock} if all params were the constant test params, regular result otherwise.
+         */
+        @Override
+        protected GraphiteSender createSender(String host, int port, Charset charset) {
+            if (TEST_HOST.equals(host) && TEST_PORT == port && TEST_CHARSET.equals(charset)) {
+                return graphiteSenderMock;
+
+            }
+            return super.createSender(host, port, charset);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTaskTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTaskTest.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTaskTest.java
new file mode 100644
index 0000000..0619e73
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTaskTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.metrics.reporting.task;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.metrics.FlowMetricSet;
+import org.apache.nifi.metrics.reporting.reporter.service.MetricReporterService;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockReportingContext;
+import org.apache.nifi.util.MockReportingInitializationContext;
+import org.apache.nifi.util.MockVariableRegistry;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test class for {@link MetricsReportingTask}.
+ *
+ * @author Omer Hadari
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class MetricsReportingTaskTest {
+
+    /**
+     * Identifier for {@link #reporterServiceStub}.
+     */
+    private static final String REPORTER_SERVICE_IDENTIFIER = "reporter-service";
+
+    /**
+     * Id for the group with status {@link #innerGroupStatus}.
+     */
+    private static final String TEST_GROUP_ID = "test-process-group-id";
+
+    /**
+     * Id for the {@link #reportingInitContextStub}.
+     */
+    private static final String TEST_INIT_CONTEXT_ID = "test-init-context-id";
+
+    /**
+     * Name for {@link #reportingInitContextStub}.
+     */
+    private static final String TEST_INIT_CONTEXT_NAME = "test-init-context-name";
+
+    /**
+     * Id for the tested tested reporting task.
+     */
+    private static final String TEST_TASK_ID = "test-task-id";
+
+
+    /**
+     * Stub context, used by {@link MetricsReportingTask#onTrigger(ReportingContext)} for reaching the status.
+     */
+    private MockReportingContext reportingContextStub;
+
+    /**
+     * Stub context, used by {@link MetricsReportingTask#connect(ConfigurationContext)} for reaching the service.
+     */
+    private MockConfigurationContext configurationContextStub;
+
+    /**
+     * Stub service for providing {@link #reporterMock}, used for actual reporting
+     */
+    @Mock
+    private MetricReporterService reporterServiceStub;
+
+    /**
+     * Mock reporter, used for verifying actual reporting.
+     */
+    @Mock
+    private ScheduledReporter reporterMock;
+
+    /**
+     * A status for the "root" process group.
+     */
+    private ProcessGroupStatus rootGroupStatus;
+
+    /**
+     * Same as {@link #rootGroupStatus}, used when {@link MetricsReportingTask#PROCESS_GROUP_ID} is set.
+     */
+    private ProcessGroupStatus innerGroupStatus;
+
+    /**
+     * Stub initialization context for calling {@link MetricsReportingTask#initialize(ReportingInitializationContext)}.
+     */
+    private MockReportingInitializationContext reportingInitContextStub;
+
+    /**
+     * The test subject.
+     */
+    private MetricsReportingTask testedReportingTask;
+
+    /**
+     * Set up the test environment and mock behaviour. This includes registering {@link #reporterServiceStub} in the
+     * different contexts, overriding {@link MetricsReportingTask#currentStatusReference} and instantiating the test
+     * subject.
+     */
+    @Before
+    public void setUp() throws Exception {
+        Map<String, ControllerService> services = new HashMap<>();
+        services.put(REPORTER_SERVICE_IDENTIFIER, reporterServiceStub);
+        testedReportingTask = new MetricsReportingTask();
+        reportingContextStub = new MockReportingContext(
+                services, new MockStateManager(testedReportingTask), new MockVariableRegistry());
+
+        rootGroupStatus = new ProcessGroupStatus();
+        innerGroupStatus = new ProcessGroupStatus();
+        when(reporterServiceStub.createReporter(any())).thenReturn(reporterMock);
+        when(reporterServiceStub.getIdentifier()).thenReturn(REPORTER_SERVICE_IDENTIFIER);
+        reportingContextStub.setProperty(MetricsReportingTask.REPORTER_SERVICE.getName(), REPORTER_SERVICE_IDENTIFIER);
+        reportingContextStub.addControllerService(reporterServiceStub, REPORTER_SERVICE_IDENTIFIER);
+
+        configurationContextStub = new MockConfigurationContext(reportingContextStub.getProperties(),
+                reportingContextStub.getControllerServiceLookup());
+        reportingInitContextStub = new MockReportingInitializationContext(
+                TEST_INIT_CONTEXT_ID,
+                TEST_INIT_CONTEXT_NAME,
+                new MockComponentLog(TEST_TASK_ID, testedReportingTask));
+    }
+
+    /**
+     * Make sure that in a single life cycle the correct metrics are registered, the correct {@link ProcessGroupStatus}
+     * is used and that metrics are actually reported.
+     */
+    @Test
+    public void testValidLifeCycleReportsCorrectly() throws Exception {
+        reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
+
+        testedReportingTask.initialize(reportingInitContextStub);
+        testedReportingTask.connect(configurationContextStub);
+        testedReportingTask.onTrigger(reportingContextStub);
+        verify(reporterMock).report();
+
+        // Verify correct metrics are registered
+        ArgumentCaptor<MetricRegistry> registryCaptor = ArgumentCaptor.forClass(MetricRegistry.class);
+        verify(reporterServiceStub).createReporter(registryCaptor.capture());
+        MetricRegistry usedRegistry = registryCaptor.getValue();
+        Map<String, Metric> usedMetrics = usedRegistry.getMetrics();
+        assertTrue(usedMetrics.keySet().containsAll(new MemoryUsageGaugeSet().getMetrics().keySet()));
+        assertTrue(usedMetrics.keySet()
+                .containsAll(new FlowMetricSet(testedReportingTask.currentStatusReference).getMetrics().keySet()));
+
+        // Verify the most current ProcessGroupStatus is updated
+        assertEquals(testedReportingTask.currentStatusReference.get(), rootGroupStatus);
+    }
+
+    /**
+     * Make sure that in a single life cycle the correct metrics are registered, the correct {@link ProcessGroupStatus}
+     * is used and that metrics are actually reported.
+     */
+    @Test
+    public void testValidLifeCycleReportsCorrectlyProcessGroupSpecified() throws Exception {
+        reportingContextStub.setProperty(MetricsReportingTask.PROCESS_GROUP_ID.getName(), TEST_GROUP_ID);
+        reportingContextStub.getEventAccess().setProcessGroupStatus(TEST_GROUP_ID, innerGroupStatus);
+
+        testedReportingTask.initialize(reportingInitContextStub);
+        testedReportingTask.connect(configurationContextStub);
+        testedReportingTask.onTrigger(reportingContextStub);
+        verify(reporterMock).report();
+
+        // Verify correct metrics are registered
+        ArgumentCaptor<MetricRegistry> registryCaptor = ArgumentCaptor.forClass(MetricRegistry.class);
+        verify(reporterServiceStub).createReporter(registryCaptor.capture());
+        MetricRegistry usedRegistry = registryCaptor.getValue();
+        Map<String, Metric> usedMetrics = usedRegistry.getMetrics();
+        assertTrue(usedMetrics.keySet().containsAll(new MemoryUsageGaugeSet().getMetrics().keySet()));
+        assertTrue(usedMetrics.keySet()
+                .containsAll(new FlowMetricSet(testedReportingTask.currentStatusReference).getMetrics().keySet()));
+
+        // Verify the most current ProcessGroupStatus is updated
+        assertEquals(testedReportingTask.currentStatusReference.get(), innerGroupStatus);
+    }
+
+    /**
+     * Make sure that in a single life cycle the correct metrics are registered, the correct {@link ProcessGroupStatus}
+     * is used and that metrics are actually reported.
+     */
+    @Test
+    public void testInvalidProcessGroupId() throws Exception {
+        reportingContextStub.setProperty(MetricsReportingTask.PROCESS_GROUP_ID.getName(), TEST_GROUP_ID + "-invalid");
+        reportingContextStub.getEventAccess().setProcessGroupStatus(TEST_GROUP_ID, innerGroupStatus);
+
+        testedReportingTask.initialize(reportingInitContextStub);
+        testedReportingTask.connect(configurationContextStub);
+        testedReportingTask.onTrigger(reportingContextStub);
+        verify(reporterMock, never()).report();
+        assertNull(testedReportingTask.currentStatusReference.get());
+    }
+
+    /**
+     * Make sure that {@link MetricsReportingTask#connect(ConfigurationContext)} does not create a new reporter
+     * if there is already an active reporter.
+     */
+    @Test
+    public void testConnectCreatesSingleReporter() throws Exception {
+        testedReportingTask.initialize(reportingInitContextStub);
+        testedReportingTask.connect(configurationContextStub);
+        testedReportingTask.connect(configurationContextStub);
+
+        verify(reporterServiceStub, times(1)).createReporter(any());
+    }
+
+    /**
+     * Sanity check for registered properties.
+     */
+    @Test
+    public void testGetSupportedPropertyDescriptorsSanity() throws Exception {
+        List<PropertyDescriptor> expected = Arrays.asList(
+                MetricsReportingTask.REPORTER_SERVICE,
+                MetricsReportingTask.PROCESS_GROUP_ID);
+        assertEquals(expected, testedReportingTask.getSupportedPropertyDescriptors());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/pom.xml b/nifi-nar-bundles/nifi-metrics-reporting-bundle/pom.xml
new file mode 100644
index 0000000..2006ebf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-metrics-reporting-bundle</artifactId>
+    <packaging>pom</packaging>
+    <modules>
+        <module>nifi-metrics-reporting-task</module>
+        <module>nifi-metrics-reporting-nar</module>
+        <module>nifi-metrics-reporter-service-api</module>
+        <module>nifi-metrics-reporter-service-api-nar</module>
+    </modules>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-metrics-reporting-task</artifactId>
+                <version>1.5.0-SNAPSHOT</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>3.1.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 0771c62..330daba 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -88,6 +88,7 @@
         <module>nifi-extension-utils</module>
         <module>nifi-grpc-bundle</module>
         <module>nifi-redis-bundle</module>
+        <module>nifi-metrics-reporting-bundle</module>
   </modules>
 
     <build>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 56961ec..ab46861 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1489,6 +1489,18 @@
                 <version>1.5.0-SNAPSHOT</version>
                 <type>nar</type>
             </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-metrics-reporter-service-api-nar</artifactId>
+                <version>1.5.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-metrics-reporting-nar</artifactId>
+                <version>1.5.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
 	        <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-properties</artifactId>


Mime
View raw message