nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mattyb...@apache.org
Subject [2/2] nifi git commit: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask
Date Wed, 11 Apr 2018 19:01:18 GMT
NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Fixed dependency issue by providing a local JSON reader

Rebased + fixed conflict + updated versions in pom + EL scope

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2575


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6fbe1515
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6fbe1515
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6fbe1515

Branch: refs/heads/master
Commit: 6fbe1515eefd2071dc75a1de2c1fc15cc282da76
Parents: ce0855e
Author: Pierre Villard <pierre.villard.fr@gmail.com>
Authored: Tue Jan 23 23:15:18 2018 +0100
Committer: Matthew Burgess <mattyb149@apache.org>
Committed: Wed Apr 11 14:44:30 2018 -0400

----------------------------------------------------------------------
 .../nifi-ambari-reporting-task/pom.xml          |  15 +-
 .../reporting/ambari/AmbariReportingTask.java   |   4 +-
 .../reporting/ambari/api/MetricBuilder.java     |  84 ----
 .../nifi/reporting/ambari/api/MetricFields.java |  29 --
 .../reporting/ambari/api/MetricsBuilder.java    |  93 ----
 .../reporting/ambari/metrics/MetricNames.java   |  55 ---
 .../ambari/metrics/MetricsService.java          | 131 ------
 .../ambari/api/TestMetricsBuilder.java          |   2 +
 .../ambari/metrics/TestMetricsService.java      |   2 +
 .../nifi-reporting-utils/pom.xml                |  10 +
 .../reporting/util/metrics/MetricNames.java     |  59 +++
 .../reporting/util/metrics/MetricsService.java  | 230 ++++++++++
 .../util/metrics/api/MetricBuilder.java         |  84 ++++
 .../util/metrics/api/MetricFields.java          |  29 ++
 .../util/metrics/api/MetricsBuilder.java        |  93 ++++
 .../nifi-site-to-site-reporting-task/pom.xml    |  39 +-
 .../AbstractSiteToSiteReportingTask.java        | 420 ++++++++++++++++++-
 .../SiteToSiteBulletinReportingTask.java        |  18 +-
 .../SiteToSiteMetricsReportingTask.java         | 222 ++++++++++
 .../SiteToSiteProvenanceReportingTask.java      |  28 +-
 .../SiteToSiteStatusReportingTask.java          |  37 +-
 .../org.apache.nifi.reporting.ReportingTask     |   3 +-
 .../additionalDetails.html                      | 178 ++++++++
 .../additionalDetails.html                      |   2 +-
 .../src/main/resources/schema-metrics.avsc      |  37 ++
 .../TestSiteToSiteMetricsReportingTask.java     | 296 +++++++++++++
 26 files changed, 1715 insertions(+), 485 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
index dafe829..de024e2 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
@@ -30,21 +30,11 @@
             <artifactId>jersey-client</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.glassfish</groupId>
-            <artifactId>javax.json</artifactId>
-            <version>1.0.4</version>
-        </dependency>
-        <dependency>
             <groupId>javax.json</groupId>
             <artifactId>javax.json-api</artifactId>
             <version>1.0</version>
         </dependency>
         <dependency>
-            <groupId>com.yammer.metrics</groupId>
-            <artifactId>metrics-core</artifactId>
-            <version>2.2.0</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
         </dependency>
@@ -53,6 +43,11 @@
             <artifactId>nifi-utils</artifactId>
             <version>1.7.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-reporting-utils</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+        </dependency>
         <!-- test dependencies -->
         <dependency>
             <groupId>org.apache.nifi</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
index 5bbdecb..0568b3e 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
@@ -29,8 +29,8 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.AbstractReportingTask;
 import org.apache.nifi.reporting.ReportingContext;
-import org.apache.nifi.reporting.ambari.api.MetricsBuilder;
-import org.apache.nifi.reporting.ambari.metrics.MetricsService;
+import org.apache.nifi.reporting.util.metrics.MetricsService;
+import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 
 import javax.json.Json;

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java
deleted file mode 100644
index 8e234ce..0000000
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.api;
-
-import javax.json.JsonBuilderFactory;
-import javax.json.JsonObject;
-
-/**
- * Builds the JsonObject for an individual metric.
- */
-public class MetricBuilder {
-
-    private final JsonBuilderFactory factory;
-
-    private String applicationId;
-    private String instanceId;
-    private String hostname;
-    private String timestamp;
-    private String metricName;
-    private String metricValue;
-
-    public MetricBuilder(final JsonBuilderFactory factory) {
-        this.factory = factory;
-    }
-
-    public MetricBuilder applicationId(final String applicationId) {
-        this.applicationId = applicationId;
-        return this;
-    }
-
-    public MetricBuilder instanceId(final String instanceId) {
-        this.instanceId = instanceId;
-        return this;
-    }
-
-    public MetricBuilder hostname(final String hostname) {
-        this.hostname = hostname;
-        return this;
-    }
-
-    public MetricBuilder timestamp(final long timestamp) {
-        this.timestamp = String.valueOf(timestamp);
-        return this;
-    }
-
-    public MetricBuilder metricName(final String metricName) {
-        this.metricName = metricName;
-        return this;
-    }
-
-    public MetricBuilder metricValue(final String metricValue) {
-        this.metricValue = metricValue;
-        return this;
-    }
-
-    public JsonObject build() {
-        return factory.createObjectBuilder()
-                .add(MetricFields.METRIC_NAME, metricName)
-                .add(MetricFields.APP_ID, applicationId)
-                .add(MetricFields.INSTANCE_ID, instanceId)
-                .add(MetricFields.HOSTNAME, hostname)
-                .add(MetricFields.TIMESTAMP, timestamp)
-                .add(MetricFields.START_TIME, timestamp)
-                .add(MetricFields.METRICS,
-                        factory.createObjectBuilder()
-                                .add(String.valueOf(timestamp), metricValue)
-                ).build();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java
deleted file mode 100644
index 1c1629c..0000000
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.api;
-
-public interface MetricFields {
-
-    String METRIC_NAME = "metricname";
-    String APP_ID = "appid";
-    String INSTANCE_ID = "instanceid";
-    String HOSTNAME = "hostname";
-    String TIMESTAMP = "timestamp";
-    String START_TIME = "starttime";
-    String METRICS = "metrics";
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java
deleted file mode 100644
index 11b4db5..0000000
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.api;
-
-import javax.json.JsonArrayBuilder;
-import javax.json.JsonBuilderFactory;
-import javax.json.JsonObject;
-import javax.json.JsonObjectBuilder;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Builds the overall JsonObject for the Metrics.
- */
-public class MetricsBuilder {
-
-    static final String ROOT_JSON_ELEMENT = "metrics";
-
-    private final JsonBuilderFactory factory;
-
-    private long timestamp;
-    private String applicationId;
-    private String instanceId;
-    private String hostname;
-    private Map<String,String> metrics = new HashMap<>();
-
-    public MetricsBuilder(final JsonBuilderFactory factory) {
-        this.factory = factory;
-    }
-
-    public MetricsBuilder applicationId(final String applicationId) {
-        this.applicationId = applicationId;
-        return this;
-    }
-
-    public MetricsBuilder instanceId(final String instanceId) {
-        this.instanceId = instanceId;
-        return this;
-    }
-
-    public MetricsBuilder hostname(final String hostname) {
-        this.hostname = hostname;
-        return this;
-    }
-
-    public MetricsBuilder timestamp(final long timestamp) {
-        this.timestamp = timestamp;
-        return this;
-    }
-
-    public MetricsBuilder metric(final String name, String value) {
-        this.metrics.put(name, value);
-        return this;
-    }
-
-    public MetricsBuilder addAllMetrics(final Map<String,String> metrics) {
-        this.metrics.putAll(metrics);
-        return this;
-    }
-
-    public JsonObject build() {
-        // builds JsonObject for individual metrics
-        final MetricBuilder metricBuilder = new MetricBuilder(factory);
-        metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname);
-
-        final JsonArrayBuilder metricArrayBuilder = factory.createArrayBuilder();
-
-        for (Map.Entry<String,String> entry : metrics.entrySet()) {
-            metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue());
-            metricArrayBuilder.add(metricBuilder.build());
-        }
-
-        // add the array of metrics to a top-level json object
-        final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder();
-        metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder);
-        return metricsBuilder.build();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
deleted file mode 100644
index 20cfa4e..0000000
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.metrics;
-
-/**
- * The Metric names to send to Ambari.
- */
-public interface MetricNames {
-
-    // Metric Name separator
-    String METRIC_NAME_SEPARATOR = ".";
-
-    // NiFi Metrics
-    String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
-    String BYTES_RECEIVED = "BytesReceivedLast5Minutes";
-    String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes";
-    String BYTES_SENT = "BytesSentLast5Minutes";
-    String FLOW_FILES_QUEUED = "FlowFilesQueued";
-    String BYTES_QUEUED = "BytesQueued";
-    String BYTES_READ = "BytesReadLast5Minutes";
-    String BYTES_WRITTEN = "BytesWrittenLast5Minutes";
-    String ACTIVE_THREADS = "ActiveThreads";
-    String TOTAL_TASK_DURATION_SECONDS = "TotalTaskDurationSeconds";
-    String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds";
-
-    // JVM Metrics
-    String JVM_UPTIME = "jvm.uptime";
-    String JVM_HEAP_USED = "jvm.heap_used";
-    String JVM_HEAP_USAGE = "jvm.heap_usage";
-    String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage";
-    String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable";
-    String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked";
-    String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting";
-    String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated";
-    String JVM_THREAD_COUNT = "jvm.thread_count";
-    String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count";
-    String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage";
-    String JVM_GC_RUNS = "jvm.gc.runs";
-    String JVM_GC_TIME = "jvm.gc.time";
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
deleted file mode 100644
index cef257d..0000000
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.metrics;
-
-import com.yammer.metrics.core.VirtualMachineMetrics;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A service used to produce key/value metrics based on a given input.
- */
-public class MetricsService {
-
-    /**
-     * Generates a Map of metrics for a ProcessGroupStatus instance.
-     *
-     * @param status a ProcessGroupStatus to get metrics from
-     * @param appendPgId if true, the process group ID will be appended at the end of the metric name
-     * @return a map of metrics for the given status
-     */
-    public Map<String,String> getMetrics(ProcessGroupStatus status, boolean appendPgId) {
-        final Map<String,String> metrics = new HashMap<>();
-        metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), String.valueOf(status.getFlowFilesReceived()));
-        metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), String.valueOf(status.getBytesReceived()));
-        metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), String.valueOf(status.getFlowFilesSent()));
-        metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), String.valueOf(status.getBytesSent()));
-        metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedCount()));
-        metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedContentSize()));
-        metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), String.valueOf(status.getBytesRead()));
-        metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), String.valueOf(status.getBytesWritten()));
-        metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), String.valueOf(status.getActiveThreadCount()));
-
-        final long durationNanos = calculateProcessingNanos(status);
-        metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), String.valueOf(durationNanos));
-
-        final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
-        metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), String.valueOf(durationSeconds));
-
-        return metrics;
-    }
-
-    /**
-     * Generates a Map of metrics for VirtualMachineMetrics.
-     *
-     * @param virtualMachineMetrics a VirtualMachineMetrics instance to get metrics from
-     * @return a map of metrics from the given VirtualMachineStatus
-     */
-    public Map<String,String> getMetrics(VirtualMachineMetrics virtualMachineMetrics) {
-        final Map<String,String> metrics = new HashMap<>();
-        metrics.put(MetricNames.JVM_UPTIME, String.valueOf(virtualMachineMetrics.uptime()));
-        metrics.put(MetricNames.JVM_HEAP_USED, String.valueOf(virtualMachineMetrics.heapUsed()));
-        metrics.put(MetricNames.JVM_HEAP_USAGE, String.valueOf(virtualMachineMetrics.heapUsage()));
-        metrics.put(MetricNames.JVM_NON_HEAP_USAGE, String.valueOf(virtualMachineMetrics.nonHeapUsage()));
-        metrics.put(MetricNames.JVM_THREAD_COUNT, String.valueOf(virtualMachineMetrics.threadCount()));
-        metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, String.valueOf(virtualMachineMetrics.daemonThreadCount()));
-        metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, String.valueOf(virtualMachineMetrics.fileDescriptorUsage()));
-
-        for (Map.Entry<Thread.State,Double> entry : virtualMachineMetrics.threadStatePercentages().entrySet()) {
-            final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue()));
-            switch(entry.getKey()) {
-                case BLOCKED:
-                    metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, String.valueOf(normalizedValue));
-                    break;
-                case RUNNABLE:
-                    metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, String.valueOf(normalizedValue));
-                    break;
-                case TERMINATED:
-                    metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, String.valueOf(normalizedValue));
-                    break;
-                case TIMED_WAITING:
-                    metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, String.valueOf(normalizedValue));
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        for (Map.Entry<String,VirtualMachineMetrics.GarbageCollectorStats> entry : virtualMachineMetrics.garbageCollectors().entrySet()) {
-            final String gcName = entry.getKey().replace(" ", "");
-            final long runs = entry.getValue().getRuns();
-            final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS);
-            metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, String.valueOf(runs));
-            metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, String.valueOf(timeMS));
-        }
-
-        return metrics;
-    }
-
-    // calculates the total processing time of all processors in nanos
-    protected long calculateProcessingNanos(final ProcessGroupStatus status) {
-        long nanos = 0L;
-
-        for (final ProcessorStatus procStats : status.getProcessorStatus()) {
-            nanos += procStats.getProcessingNanos();
-        }
-
-        for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
-            nanos += calculateProcessingNanos(childGroupStatus);
-        }
-
-        return nanos;
-    }
-
-    // append the process group ID if necessary
-    private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) {
-        if(appendPgId) {
-            return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId();
-        } else {
-            return name;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
index cdaa453..9b96eb9 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.reporting.ambari.api;
 
+import org.apache.nifi.reporting.util.metrics.api.MetricFields;
+import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
index 93224eb..ec0cf6e 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
@@ -19,6 +19,8 @@ package org.apache.nifi.reporting.ambari.metrics;
 import com.yammer.metrics.core.VirtualMachineMetrics;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.reporting.util.metrics.MetricNames;
+import org.apache.nifi.reporting.util.metrics.MetricsService;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
index ba10afb..3e2d158 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
@@ -40,6 +40,16 @@
             <artifactId>commons-lang3</artifactId>
             <version>3.7</version>
         </dependency>
+        <dependency>
+            <groupId>com.yammer.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>2.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish</groupId>
+            <artifactId>javax.json</artifactId>
+            <version>1.0.4</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java
new file mode 100644
index 0000000..19bb90d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics;
+
+/**
+ * The Metric names to send to Ambari.
+ */
+public interface MetricNames {
+
+    // Metric Name separator
+    String METRIC_NAME_SEPARATOR = ".";
+
+    // NiFi Metrics
+    String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
+    String BYTES_RECEIVED = "BytesReceivedLast5Minutes";
+    String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes";
+    String BYTES_SENT = "BytesSentLast5Minutes";
+    String FLOW_FILES_QUEUED = "FlowFilesQueued";
+    String BYTES_QUEUED = "BytesQueued";
+    String BYTES_READ = "BytesReadLast5Minutes";
+    String BYTES_WRITTEN = "BytesWrittenLast5Minutes";
+    String ACTIVE_THREADS = "ActiveThreads";
+    String TOTAL_TASK_DURATION_SECONDS = "TotalTaskDurationSeconds";
+    String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds";
+
+    // JVM Metrics
+    String JVM_UPTIME = "jvm.uptime";
+    String JVM_HEAP_USED = "jvm.heap_used";
+    String JVM_HEAP_USAGE = "jvm.heap_usage";
+    String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage";
+    String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable";
+    String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked";
+    String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting";
+    String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated";
+    String JVM_THREAD_COUNT = "jvm.thread_count";
+    String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count";
+    String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage";
+    String JVM_GC_RUNS = "jvm.gc.runs";
+    String JVM_GC_TIME = "jvm.gc.time";
+
+    // OS Metrics
+    String LOAD1MN = "loadAverage1min";
+    String CORES = "availableCores";
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java
new file mode 100644
index 0000000..ed3922a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.reporting.util.metrics.api.MetricFields;
+
+import com.yammer.metrics.core.VirtualMachineMetrics;
+
+/**
+ * A service used to produce key/value metrics based on a given input.
+ */
+public class MetricsService {
+
+    /**
+     * Generates a Map of metrics for a ProcessGroupStatus instance.
+     *
+     * @param status a ProcessGroupStatus to get metrics from
+     * @param appendPgId if true, the process group ID will be appended at the end of the metric name
+     * @return a map of metrics for the given status
+     */
+    public Map<String,String> getMetrics(ProcessGroupStatus status, boolean appendPgId) {
+        final Map<String,String> metrics = new HashMap<>();
+
+        Map<String,Long> longMetrics = getLongMetrics(status, appendPgId);
+        for (String key : longMetrics.keySet()) {
+            metrics.put(key, String.valueOf(longMetrics.get(key)));
+        }
+
+        Map<String,Integer> integerMetrics = getIntegerMetrics(status, appendPgId);
+        for (String key : integerMetrics.keySet()) {
+            metrics.put(key, String.valueOf(integerMetrics.get(key)));
+        }
+
+        return metrics;
+    }
+
+    private Map<String,Integer> getIntegerMetrics(ProcessGroupStatus status, boolean appendPgId) {
+        final Map<String,Integer> metrics = new HashMap<>();
+        metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), status.getFlowFilesReceived());
+        metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), status.getFlowFilesSent());
+        metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), status.getQueuedCount());
+        metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), status.getActiveThreadCount());
+        return metrics;
+    }
+
+    private Map<String,Long> getLongMetrics(ProcessGroupStatus status, boolean appendPgId) {
+        final Map<String,Long> metrics = new HashMap<>();
+        metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), status.getBytesReceived());
+        metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), status.getBytesSent());
+        metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), status.getQueuedContentSize());
+        metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), status.getBytesRead());
+        metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), status.getBytesWritten());
+
+        final long durationNanos = calculateProcessingNanos(status);
+        metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), durationNanos);
+
+        final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
+        metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), durationSeconds);
+
+        return metrics;
+    }
+
+    /**
+     * Generates a Map of metrics for VirtualMachineMetrics.
+     *
+     * @param virtualMachineMetrics a VirtualMachineMetrics instance to get metrics from
+     * @return a map of metrics from the given VirtualMachineStatus
+     */
+    public Map<String,String> getMetrics(VirtualMachineMetrics virtualMachineMetrics) {
+        final Map<String,String> metrics = new HashMap<>();
+
+        Map<String,Integer> integerMetrics = getIntegerMetrics(virtualMachineMetrics);
+        for (String key : integerMetrics.keySet()) {
+            metrics.put(key, String.valueOf(integerMetrics.get(key)));
+        }
+
+        Map<String,Long> longMetrics = getLongMetrics(virtualMachineMetrics);
+        for (String key : longMetrics.keySet()) {
+            metrics.put(key, String.valueOf(longMetrics.get(key)));
+        }
+
+        Map<String,Double> doubleMetrics = getDoubleMetrics(virtualMachineMetrics);
+        for (String key : doubleMetrics.keySet()) {
+            metrics.put(key, String.valueOf(doubleMetrics.get(key)));
+        }
+
+        return metrics;
+    }
+
+    // calculates the total processing time of all processors in nanos
+    protected long calculateProcessingNanos(final ProcessGroupStatus status) {
+        long nanos = 0L;
+
+        for (final ProcessorStatus procStats : status.getProcessorStatus()) {
+            nanos += procStats.getProcessingNanos();
+        }
+
+        for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
+            nanos += calculateProcessingNanos(childGroupStatus);
+        }
+
+        return nanos;
+    }
+
+    // append the process group ID if necessary
+    private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) {
+        if(appendPgId) {
+            return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId();
+        } else {
+            return name;
+        }
+    }
+
+    private Map<String,Double> getDoubleMetrics(VirtualMachineMetrics virtualMachineMetrics) {
+        final Map<String,Double> metrics = new HashMap<>();
+        metrics.put(MetricNames.JVM_HEAP_USED, virtualMachineMetrics.heapUsed());
+        metrics.put(MetricNames.JVM_HEAP_USAGE, virtualMachineMetrics.heapUsage());
+        metrics.put(MetricNames.JVM_NON_HEAP_USAGE, virtualMachineMetrics.nonHeapUsage());
+        metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, virtualMachineMetrics.fileDescriptorUsage());
+        return metrics;
+    }
+
+    private Map<String,Long> getLongMetrics(VirtualMachineMetrics virtualMachineMetrics) {
+        final Map<String,Long> metrics = new HashMap<>();
+        metrics.put(MetricNames.JVM_UPTIME, virtualMachineMetrics.uptime());
+
+        for (Map.Entry<String,VirtualMachineMetrics.GarbageCollectorStats> entry : virtualMachineMetrics.garbageCollectors().entrySet()) {
+            final String gcName = entry.getKey().replace(" ", "");
+            final long runs = entry.getValue().getRuns();
+            final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS);
+            metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, runs);
+            metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, timeMS);
+        }
+
+        return metrics;
+    }
+
+    private Map<String,Integer> getIntegerMetrics(VirtualMachineMetrics virtualMachineMetrics) {
+        final Map<String,Integer> metrics = new HashMap<>();
+        metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, virtualMachineMetrics.daemonThreadCount());
+        metrics.put(MetricNames.JVM_THREAD_COUNT, virtualMachineMetrics.threadCount());
+
+        for (Map.Entry<Thread.State,Double> entry : virtualMachineMetrics.threadStatePercentages().entrySet()) {
+            final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue()));
+            switch(entry.getKey()) {
+                case BLOCKED:
+                    metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, normalizedValue);
+                    break;
+                case RUNNABLE:
+                    metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, normalizedValue);
+                    break;
+                case TERMINATED:
+                    metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, normalizedValue);
+                    break;
+                case TIMED_WAITING:
+                    metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, normalizedValue);
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        return metrics;
+    }
+
+    public JsonObject getMetrics(JsonBuilderFactory factory, ProcessGroupStatus status, VirtualMachineMetrics virtualMachineMetrics,
+            String applicationId, String id, String hostname, long currentTimeMillis, int availableProcessors, double systemLoad) {
+        JsonObjectBuilder objectBuilder = factory.createObjectBuilder()
+                .add(MetricFields.APP_ID, applicationId)
+                .add(MetricFields.HOSTNAME, hostname)
+                .add(MetricFields.INSTANCE_ID, status.getId())
+                .add(MetricFields.TIMESTAMP, currentTimeMillis);
+
+        objectBuilder
+        .add(MetricNames.CORES, availableProcessors)
+        .add(MetricNames.LOAD1MN, systemLoad);
+
+        Map<String,Integer> integerMetrics = getIntegerMetrics(virtualMachineMetrics);
+        for (String key : integerMetrics.keySet()) {
+            objectBuilder.add(key.replaceAll("\\.", ""), integerMetrics.get(key));
+        }
+
+        Map<String,Long> longMetrics = getLongMetrics(virtualMachineMetrics);
+        for (String key : longMetrics.keySet()) {
+            objectBuilder.add(key.replaceAll("\\.", ""), longMetrics.get(key));
+        }
+
+        Map<String,Double> doubleMetrics = getDoubleMetrics(virtualMachineMetrics);
+        for (String key : doubleMetrics.keySet()) {
+            objectBuilder.add(key.replaceAll("\\.", ""), doubleMetrics.get(key));
+        }
+
+        Map<String,Long> longPgMetrics = getLongMetrics(status, false);
+        for (String key : longPgMetrics.keySet()) {
+            objectBuilder.add(key, longPgMetrics.get(key));
+        }
+
+        Map<String,Integer> integerPgMetrics = getIntegerMetrics(status, false);
+        for (String key : integerPgMetrics.keySet()) {
+            objectBuilder.add(key, integerPgMetrics.get(key));
+        }
+
+        return objectBuilder.build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java
new file mode 100644
index 0000000..81fb021
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics.api;
+
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+
+/**
+ * Builds the JsonObject for an individual metric.
+ */
+public class MetricBuilder {
+
+    private final JsonBuilderFactory factory;
+
+    private String applicationId;
+    private String instanceId;
+    private String hostname;
+    private String timestamp;
+    private String metricName;
+    private String metricValue;
+
+    public MetricBuilder(final JsonBuilderFactory factory) {
+        this.factory = factory;
+    }
+
+    public MetricBuilder applicationId(final String applicationId) {
+        this.applicationId = applicationId;
+        return this;
+    }
+
+    public MetricBuilder instanceId(final String instanceId) {
+        this.instanceId = instanceId;
+        return this;
+    }
+
+    public MetricBuilder hostname(final String hostname) {
+        this.hostname = hostname;
+        return this;
+    }
+
+    public MetricBuilder timestamp(final long timestamp) {
+        this.timestamp = String.valueOf(timestamp);
+        return this;
+    }
+
+    public MetricBuilder metricName(final String metricName) {
+        this.metricName = metricName;
+        return this;
+    }
+
+    public MetricBuilder metricValue(final String metricValue) {
+        this.metricValue = metricValue;
+        return this;
+    }
+
+    public JsonObject build() {
+        return factory.createObjectBuilder()
+                .add(MetricFields.METRIC_NAME, metricName)
+                .add(MetricFields.APP_ID, applicationId)
+                .add(MetricFields.INSTANCE_ID, instanceId)
+                .add(MetricFields.HOSTNAME, hostname)
+                .add(MetricFields.TIMESTAMP, timestamp)
+                .add(MetricFields.START_TIME, timestamp)
+                .add(MetricFields.METRICS,
+                        factory.createObjectBuilder()
+                                .add(String.valueOf(timestamp), metricValue)
+                ).build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java
new file mode 100644
index 0000000..4c451ea
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics.api;
+
+public interface MetricFields {
+
+    String METRIC_NAME = "metricname";
+    String APP_ID = "appid";
+    String INSTANCE_ID = "instanceid";
+    String HOSTNAME = "hostname";
+    String TIMESTAMP = "timestamp";
+    String START_TIME = "starttime";
+    String METRICS = "metrics";
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java
new file mode 100644
index 0000000..3694720
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics.api;
+
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Builds the overall JsonObject for the Metrics.
+ */
+public class MetricsBuilder {
+
+    static final String ROOT_JSON_ELEMENT = "metrics";
+
+    private final JsonBuilderFactory factory;
+
+    private long timestamp;
+    private String applicationId;
+    private String instanceId;
+    private String hostname;
+    private Map<String,String> metrics = new HashMap<>();
+
+    public MetricsBuilder(final JsonBuilderFactory factory) {
+        this.factory = factory;
+    }
+
+    public MetricsBuilder applicationId(final String applicationId) {
+        this.applicationId = applicationId;
+        return this;
+    }
+
+    public MetricsBuilder instanceId(final String instanceId) {
+        this.instanceId = instanceId;
+        return this;
+    }
+
+    public MetricsBuilder hostname(final String hostname) {
+        this.hostname = hostname;
+        return this;
+    }
+
+    public MetricsBuilder timestamp(final long timestamp) {
+        this.timestamp = timestamp;
+        return this;
+    }
+
+    public MetricsBuilder metric(final String name, String value) {
+        this.metrics.put(name, value);
+        return this;
+    }
+
+    public MetricsBuilder addAllMetrics(final Map<String,String> metrics) {
+        this.metrics.putAll(metrics);
+        return this;
+    }
+
+    public JsonObject build() {
+        // builds JsonObject for individual metrics
+        final MetricBuilder metricBuilder = new MetricBuilder(factory);
+        metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname);
+
+        final JsonArrayBuilder metricArrayBuilder = factory.createArrayBuilder();
+
+        for (Map.Entry<String,String> entry : metrics.entrySet()) {
+            metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue());
+            metricArrayBuilder.add(metricBuilder.build());
+        }
+
+        // add the array of metrics to a top-level json object
+        final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder();
+        metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder);
+        return metricsBuilder.build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
index c320ae2..93a3196 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
@@ -55,6 +55,23 @@
             <version>1.7.0-SNAPSHOT</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
             <groupId>org.glassfish</groupId>
             <artifactId>javax.json</artifactId>
             <version>1.0.4</version>
@@ -83,10 +100,30 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.12</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/main/resources/schema-metrics.avsc</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
index 341a6d8..e755354 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
@@ -16,6 +16,23 @@
  */
 package org.apache.nifi.reporting;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import javax.json.JsonObjectBuilder;
+import javax.json.JsonValue;
+import javax.net.ssl.SSLContext;
+
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -25,27 +42,51 @@ import org.apache.nifi.components.Validator;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.protocol.http.HttpProxy;
 import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SerializedForm;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.StringUtils;
-
-import javax.net.ssl.SSLContext;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
 
 /**
  * Base class for ReportingTasks that send data over site-to-site.
  */
 public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingTask {
+
+    protected static final String LAST_EVENT_ID_KEY = "last_event_id";
     protected static final String DESTINATION_URL_PATH = "/nifi";
+    protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
 
     static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder()
             .name("Destination URL")
@@ -141,8 +182,16 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
             .sensitive(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .build();
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Controller Service to use for writing out the records.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(false)
+            .build();
 
     protected volatile SiteToSiteClient siteToSiteClient;
+    protected volatile RecordSchema recordSchema;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -188,7 +237,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
         final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
         final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(context.getProperty(HTTP_PROXY_HOSTNAME).getValue()) ? null
                 : new HttpProxy(context.getProperty(HTTP_PROXY_HOSTNAME).getValue(), context.getProperty(HTTP_PROXY_PORT).asInteger(),
-                context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue());
+                        context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue());
 
         siteToSiteClient = new SiteToSiteClient.Builder()
                 .urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl))
@@ -215,6 +264,33 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
         return this.siteToSiteClient;
     }
 
+    protected byte[] getData(final ReportingContext context, InputStream in, Map<String, String> attributes) {
+        try (final JsonRecordReader reader = new JsonRecordReader(in, recordSchema)) {
+
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+            final RecordSchema writeSchema = writerFactory.getSchema(null, recordSchema);
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+            try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
+                writer.beginRecordSet();
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    writer.write(record);
+                }
+
+                final WriteResult writeResult = writer.finishRecordSet();
+
+                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+            }
+
+            return out.toByteArray();
+        } catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
+            throw new ProcessException("Failed to write metrics using record writer: " + e.getMessage(), e);
+        }
+    }
+
     static class NiFiUrlValidator implements Validator {
         @Override
         public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
@@ -236,4 +312,334 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
             }
         }
     }
+
+    protected void addField(final JsonObjectBuilder builder, final String key, final Long value) {
+        if (value != null) {
+            builder.add(key, value.longValue());
+        }
+    }
+
+    protected void addField(final JsonObjectBuilder builder, final String key, final Integer value) {
+        if (value != null) {
+            builder.add(key, value.intValue());
+        }
+    }
+
+    protected void addField(final JsonObjectBuilder builder, final String key, final String value) {
+        if (value == null) {
+            return;
+        }
+
+        builder.add(key, value);
+    }
+
+    protected void addField(final JsonObjectBuilder builder, final String key, final String value, final boolean allowNullValues) {
+        if (value == null) {
+            if (allowNullValues) {
+                builder.add(key, JsonValue.NULL);
+            }
+        } else {
+            builder.add(key, value);
+        }
+    }
+
+    private class JsonRecordReader implements RecordReader {
+
+        private RecordSchema recordSchema;
+        private final JsonParser jsonParser;
+        private final boolean array;
+        private final JsonNode firstJsonNode;
+        private boolean firstObjectConsumed = false;
+
+        private final Supplier<DateFormat> dateFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat());
+        private final Supplier<DateFormat> timeFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.TIME.getDefaultFormat());
+        private final Supplier<DateFormat> timestampFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat());
+
+        public JsonRecordReader(final InputStream in, RecordSchema recordSchema) throws IOException, MalformedRecordException {
+            this.recordSchema = recordSchema;
+            try {
+                jsonParser = new JsonFactory().createJsonParser(in);
+                jsonParser.setCodec(new ObjectMapper());
+                JsonToken token = jsonParser.nextToken();
+                if (token == JsonToken.START_ARRAY) {
+                    array = true;
+                    token = jsonParser.nextToken();
+                } else {
+                    array = false;
+                }
+                if (token == JsonToken.START_OBJECT) {
+                    firstJsonNode = jsonParser.readValueAsTree();
+                } else {
+                    firstJsonNode = null;
+                }
+            } catch (final JsonParseException e) {
+                throw new MalformedRecordException("Could not parse data as JSON", e);
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            jsonParser.close();
+        }
+
+        @Override
+        public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
+            if (firstObjectConsumed && !array) {
+                return null;
+            }
+            try {
+                return convertJsonNodeToRecord(getNextJsonNode(), getSchema(), null, coerceTypes, dropUnknownFields);
+            } catch (final MalformedRecordException mre) {
+                throw mre;
+            } catch (final IOException ioe) {
+                throw ioe;
+            } catch (final Exception e) {
+                throw new MalformedRecordException("Failed to convert data into a Record object with the given schema", e);
+            }
+        }
+
+        @Override
+        public RecordSchema getSchema() throws MalformedRecordException {
+            return recordSchema;
+        }
+
+        private JsonNode getNextJsonNode() throws JsonParseException, IOException, MalformedRecordException {
+            if (!firstObjectConsumed) {
+                firstObjectConsumed = true;
+                return firstJsonNode;
+            }
+            while (true) {
+                final JsonToken token = jsonParser.nextToken();
+                if (token == null) {
+                    return null;
+                }
+                switch (token) {
+                    case END_OBJECT:
+                        continue;
+                    case START_OBJECT:
+                        return jsonParser.readValueAsTree();
+                    case END_ARRAY:
+                    case START_ARRAY:
+                        return null;
+                    default:
+                        throw new MalformedRecordException("Expected to get a JSON Object but got a token of type " + token.name());
+                }
+            }
+        }
+
+        private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix,
+                final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException {
+
+            final Map<String, Object> values = new HashMap<>(schema.getFieldCount() * 2);
+
+            if (dropUnknown) {
+                for (final RecordField recordField : schema.getFields()) {
+                    final JsonNode childNode = getChildNode(jsonNode, recordField);
+                    if (childNode == null) {
+                        continue;
+                    }
+
+                    final String fieldName = recordField.getFieldName();
+                    final Object value;
+
+                    if (coerceTypes) {
+                        final DataType desiredType = recordField.getDataType();
+                        final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
+                        value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
+                    } else {
+                        value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType());
+                    }
+
+                    values.put(fieldName, value);
+                }
+            } else {
+                final Iterator<String> fieldNames = jsonNode.getFieldNames();
+                while (fieldNames.hasNext()) {
+                    final String fieldName = fieldNames.next();
+                    final JsonNode childNode = jsonNode.get(fieldName);
+                    final RecordField recordField = schema.getField(fieldName).orElse(null);
+                    final Object value;
+
+                    if (coerceTypes && recordField != null) {
+                        final DataType desiredType = recordField.getDataType();
+                        final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
+                        value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
+                    } else {
+                        value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType());
+                    }
+
+                    values.put(fieldName, value);
+                }
+            }
+
+            final Supplier<String> supplier = () -> jsonNode.toString();
+            return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"), false, dropUnknown);
+        }
+
+        private JsonNode getChildNode(final JsonNode jsonNode, final RecordField field) {
+            if (jsonNode.has(field.getFieldName())) {
+                return jsonNode.get(field.getFieldName());
+            }
+            for (final String alias : field.getAliases()) {
+                if (jsonNode.has(alias)) {
+                    return jsonNode.get(alias);
+                }
+            }
+            return null;
+        }
+
+        protected Object convertField(final JsonNode fieldNode, final String fieldName, final DataType desiredType, final boolean dropUnknown) throws IOException, MalformedRecordException {
+            if (fieldNode == null || fieldNode.isNull()) {
+                return null;
+            }
+
+            switch (desiredType.getFieldType()) {
+                case BOOLEAN:
+                case BYTE:
+                case CHAR:
+                case DOUBLE:
+                case FLOAT:
+                case INT:
+                case BIGINT:
+                case LONG:
+                case SHORT:
+                case STRING:
+                case DATE:
+                case TIME:
+                case TIMESTAMP: {
+                    final Object rawValue = getRawNodeValue(fieldNode, null);
+                    final Object converted = DataTypeUtils.convertType(rawValue, desiredType, dateFormat, timeFormat, timestampFormat, fieldName);
+                    return converted;
+                }
+                case MAP: {
+                    final DataType valueType = ((MapDataType) desiredType).getValueType();
+
+                    final Map<String, Object> map = new HashMap<>();
+                    final Iterator<String> fieldNameItr = fieldNode.getFieldNames();
+                    while (fieldNameItr.hasNext()) {
+                        final String childName = fieldNameItr.next();
+                        final JsonNode childNode = fieldNode.get(childName);
+                        final Object childValue = convertField(childNode, fieldName, valueType, dropUnknown);
+                        map.put(childName, childValue);
+                    }
+
+                    return map;
+                }
+                case ARRAY: {
+                    final ArrayNode arrayNode = (ArrayNode) fieldNode;
+                    final int numElements = arrayNode.size();
+                    final Object[] arrayElements = new Object[numElements];
+                    int count = 0;
+                    for (final JsonNode node : arrayNode) {
+                        final DataType elementType = ((ArrayDataType) desiredType).getElementType();
+                        final Object converted = convertField(node, fieldName, elementType, dropUnknown);
+                        arrayElements[count++] = converted;
+                    }
+
+                    return arrayElements;
+                }
+                case RECORD: {
+                    if (fieldNode.isObject()) {
+                        RecordSchema childSchema;
+                        if (desiredType instanceof RecordDataType) {
+                            childSchema = ((RecordDataType) desiredType).getChildSchema();
+                        } else {
+                            return null;
+                        }
+
+                        if (childSchema == null) {
+                            final List<RecordField> fields = new ArrayList<>();
+                            final Iterator<String> fieldNameItr = fieldNode.getFieldNames();
+                            while (fieldNameItr.hasNext()) {
+                                fields.add(new RecordField(fieldNameItr.next(), RecordFieldType.STRING.getDataType()));
+                            }
+
+                            childSchema = new SimpleRecordSchema(fields);
+                        }
+
+                        return convertJsonNodeToRecord(fieldNode, childSchema, fieldName + ".", true, dropUnknown);
+                    } else {
+                        return null;
+                    }
+                }
+                case CHOICE: {
+                    return DataTypeUtils.convertType(getRawNodeValue(fieldNode, null), desiredType, fieldName);
+                }
+            }
+
+            return null;
+        }
+
+        protected Object getRawNodeValue(final JsonNode fieldNode, final DataType dataType) throws IOException {
+            if (fieldNode == null || fieldNode.isNull()) {
+                return null;
+            }
+
+            if (fieldNode.isNumber()) {
+                return fieldNode.getNumberValue();
+            }
+
+            if (fieldNode.isBinary()) {
+                return fieldNode.getBinaryValue();
+            }
+
+            if (fieldNode.isBoolean()) {
+                return fieldNode.getBooleanValue();
+            }
+
+            if (fieldNode.isTextual()) {
+                return fieldNode.getTextValue();
+            }
+
+            if (fieldNode.isArray()) {
+                final ArrayNode arrayNode = (ArrayNode) fieldNode;
+                final int numElements = arrayNode.size();
+                final Object[] arrayElements = new Object[numElements];
+                int count = 0;
+
+                final DataType elementDataType;
+                if (dataType != null && dataType.getFieldType() == RecordFieldType.ARRAY) {
+                    final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+                    elementDataType = arrayDataType.getElementType();
+                } else {
+                    elementDataType = null;
+                }
+
+                for (final JsonNode node : arrayNode) {
+                    final Object value = getRawNodeValue(node, elementDataType);
+                    arrayElements[count++] = value;
+                }
+
+                return arrayElements;
+            }
+
+            if (fieldNode.isObject()) {
+                RecordSchema childSchema;
+                if (dataType != null && RecordFieldType.RECORD == dataType.getFieldType()) {
+                    final RecordDataType recordDataType = (RecordDataType) dataType;
+                    childSchema = recordDataType.getChildSchema();
+                } else {
+                    childSchema = null;
+                }
+
+                if (childSchema == null) {
+                    childSchema = new SimpleRecordSchema(Collections.emptyList());
+                }
+
+                final Iterator<String> fieldNames = fieldNode.getFieldNames();
+                final Map<String, Object> childValues = new HashMap<>();
+                while (fieldNames.hasNext()) {
+                    final String childFieldName = fieldNames.next();
+                    final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), dataType);
+                    childValues.put(childFieldName, childValue);
+                }
+
+                final MapRecord record = new MapRecord(childSchema, childValues);
+                return record;
+            }
+
+            return null;
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
index fac7696..20ed96a 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
@@ -68,9 +68,6 @@ import java.util.concurrent.TimeUnit;
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
 public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
 
-    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
-    static final String LAST_EVENT_ID_KEY = "last_event_id";
-
     static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
         .name("Platform")
         .description("The value to use for the platform field in each provenance event.")
@@ -195,7 +192,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
         lastSentBulletinId = currMaxId;
     }
 
-    static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
+    private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
         final String platform, final String nodeIdentifier) {
 
         addField(builder, "objectId", UUID.randomUUID().toString());
@@ -216,17 +213,4 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
         return builder.build();
     }
 
-    private static void addField(final JsonObjectBuilder builder, final String key, final Long value) {
-        if (value != null) {
-            builder.add(key, value.longValue());
-        }
-    }
-
-    private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
-        if (value == null) {
-            return;
-        }
-        builder.add(key, value);
-    }
-
 }


Mime
View raw message