nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbe...@apache.org
Subject [1/2] nifi git commit: NIFI-3674: Implementing SiteToSiteStatusReportingTask
Date Mon, 01 May 2017 13:55:32 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 7df5c2dc8 -> de67e5f7d


NIFI-3674: Implementing SiteToSiteStatusReportingTask

Signed-off-by: Bryan Bende <bbende@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ea6320d6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ea6320d6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ea6320d6

Branch: refs/heads/master
Commit: ea6320d621398713c6aec98e0d79509fd8586e17
Parents: 7df5c2d
Author: Joe Gresock <joseph.gresock@lmco.com>
Authored: Fri Apr 7 15:45:40 2017 +0000
Committer: Bryan Bende <bbende@apache.org>
Committed: Mon May 1 09:55:10 2017 -0400

----------------------------------------------------------------------
 .../SiteToSiteStatusReportingTask.java          | 415 +++++++++++++++++++
 .../org.apache.nifi.reporting.ReportingTask     |   3 +-
 .../TestSiteToSiteStatusReportingTask.java      | 346 ++++++++++++++++
 3 files changed, 763 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ea6320d6/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
new file mode 100644
index 0000000..d94acc9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObjectBuilder;
+import javax.json.JsonValue;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+
+@Tags({"status", "metrics", "history", "site", "site to site"})
+@CapabilityDescription("Publishes Status events using the Site To Site protocol.  "
+        + "The component type and name filter regexes form a union: only components matching
both regexes will be reported.  "
+        + "However, all process groups are recursively searched for matching components,
regardless of whether the process group matches the component filters.")
+public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTask {
+
+    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+
+    static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
+        .name("Platform")
+        .description("The value to use for the platform field in each provenance event.")
+        .required(true)
+        .expressionLanguageSupported(true)
+        .defaultValue("nifi")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+    static final PropertyDescriptor COMPONENT_TYPE_FILTER_REGEX = new PropertyDescriptor.Builder()
+        .name("Component Type Filter Regex")
+        .description("A regex specifying which component types to report.  Any component
type matching this regex will be included.  "
+                + "Component types are: Processor, RootProcessGroup, ProcessGroup, RemoteProcessGroup,
Connection, InputPort, OutputPort")
+        .required(true)
+        .expressionLanguageSupported(true)
+        .defaultValue("(Processor|ProcessGroup|RemoteProcessGroup|RootProcessGroup|Connection|InputPort|OutputPort)")
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .build();
+    static final PropertyDescriptor COMPONENT_NAME_FILTER_REGEX = new PropertyDescriptor.Builder()
+        .name("Component Name Filter Regex")
+        .description("A regex specifying which component names to report.  Any component
name matching this regex will be included.")
+        .required(true)
+        .expressionLanguageSupported(true)
+        .defaultValue(".*")
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .build();
+
+    private volatile Pattern componentTypeFilter;
+    private volatile Pattern componentNameFilter;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(PLATFORM);
+        properties.add(COMPONENT_TYPE_FILTER_REGEX);
+        properties.add(COMPONENT_NAME_FILTER_REGEX);
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ReportingContext context) {
+        final boolean isClustered = context.isClustered();
+        final String nodeId = context.getClusterNodeIdentifier();
+        if (nodeId == null && isClustered) {
+            getLogger().debug("This instance of NiFi is configured for clustering, but the
Cluster Node Identifier is not yet available. "
+                + "Will wait for Node Identifier to be established.");
+            return;
+        }
+
+        componentTypeFilter = Pattern.compile(context.getProperty(COMPONENT_TYPE_FILTER_REGEX).getValue());
+        componentNameFilter = Pattern.compile(context.getProperty(COMPONENT_NAME_FILTER_REGEX).getValue());
+
+        final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus();
+        final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
+
+        final String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue();
+        URL url;
+        try {
+            url = new URL(nifiUrl);
+        } catch (final MalformedURLException e1) {
+            // already validated
+            throw new AssertionError();
+        }
+
+        final String hostname = url.getHost();
+        final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
+
+        final Map<String, ?> config = Collections.emptyMap();
+        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+
+        final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
+        df.setTimeZone(TimeZone.getTimeZone("Z"));
+
+        final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
+        serializeProcessGroupStatus(arrayBuilder, factory, procGroupStatus, df, hostname,
rootGroupName,
+                platform, null, new Date());
+
+        final JsonArray jsonArray = arrayBuilder.build();
+
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        int fromIndex = 0;
+        int toIndex = Math.min(batchSize, jsonArray.size());
+        List<JsonValue> jsonBatch = jsonArray.subList(fromIndex, toIndex);
+
+        while(!jsonBatch.isEmpty()) {
+            // Send the JSON document for the current batch
+            try {
+                long start = System.nanoTime();
+                final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
+                if (transaction == null) {
+                    getLogger().debug("All destination nodes are penalized; will attempt
to send data later");
+                    return;
+                }
+
+                final Map<String, String> attributes = new HashMap<>();
+                final String transactionId = UUID.randomUUID().toString();
+                attributes.put("reporting.task.transaction.id", transactionId);
+                attributes.put("mime.type", "application/json");
+
+                JsonArrayBuilder jsonBatchArrayBuilder = factory.createArrayBuilder();
+                for(JsonValue jsonValue : jsonBatch) {
+                    jsonBatchArrayBuilder.add(jsonValue);
+                }
+                final JsonArray jsonBatchArray = jsonBatchArrayBuilder.build();
+
+                final byte[] data = jsonBatchArray.toString().getBytes(StandardCharsets.UTF_8);
+                transaction.send(data, attributes);
+                transaction.confirm();
+                transaction.complete();
+
+                final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
- start);
+                getLogger().info("Successfully sent {} Status Records to destination in {}
ms; Transaction ID = {}",
+                        new Object[]{jsonArray.size(), transferMillis, transactionId});
+
+                fromIndex = toIndex;
+                toIndex = Math.min(fromIndex + batchSize, jsonArray.size());
+                jsonBatch = jsonArray.subList(fromIndex, toIndex);
+            } catch (final IOException e) {
+                throw new ProcessException("Failed to send Provenance Events to destination
due to IOException:" + e.getMessage(), e);
+            }
+        }
+    }
+
+    /**
+     * Returns true only if the component type matches the component type filter
+     * and the component name matches the component name filter.
+     *
+     * @param componentType
+     *            The component type
+     * @param componentName
+     *            The component name
+     * @return Whether the component matches both filters
+     */
+    boolean componentMatchesFilters(final String componentType, final String componentName)
{
+        return componentTypeFilter.matcher(componentType).matches()
+                && componentNameFilter.matcher(componentName).matches();
+    }
+
+    /**
+     * Serialize the ProcessGroupStatus and add it to the JsonArrayBuilder.
+     * @param arrayBuilder
+     *            The JSON Array builder
+     * @param factory
+     *            The JSON Builder Factory
+     * @param status
+     *            The ProcessGroupStatus
+     * @param df
+     *            A date format
+     * @param hostname
+     *            The current hostname
+     * @param applicationName
+     *            The root process group name
+     * @param platform
+     *            The configured platform
+     * @param parentId
+     *            The parent's component id
+     */
+    void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory
factory,
+            final ProcessGroupStatus status, final DateFormat df,
+        final String hostname, final String applicationName, final String platform, final
String parentId, final Date currentDate) {
+        final JsonObjectBuilder builder = factory.createObjectBuilder();
+        final String componentType = (parentId == null) ? "RootProcessGroup" : "ProcessGroup";
+        final String componentName = status.getName();
+
+        if (componentMatchesFilters(componentType, componentName)) {
+            addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
+                    componentType, componentName);
+
+            addField(builder, "componentId", status.getId());
+            addField(builder, "bytesRead", status.getBytesRead());
+            addField(builder, "bytesWritten", status.getBytesWritten());
+            addField(builder, "bytesReceived", status.getBytesReceived());
+            addField(builder, "bytesSent", status.getBytesSent());
+            addField(builder, "bytesTransferred", status.getBytesTransferred());
+            addField(builder, "flowFilesReceived", status.getFlowFilesReceived());
+            addField(builder, "flowFilesSent", status.getFlowFilesSent());
+            addField(builder, "flowFilesTransferred", status.getFlowFilesTransferred());
+            addField(builder, "inputContentSize", status.getInputContentSize());
+            addField(builder, "inputCount", status.getInputCount());
+            addField(builder, "outputContentSize", status.getOutputContentSize());
+            addField(builder, "outputCount", status.getOutputCount());
+            addField(builder, "queuedContentSize", status.getQueuedContentSize());
+            addField(builder, "activeThreadCount", status.getActiveThreadCount());
+            addField(builder, "queuedCount", status.getQueuedCount());
+
+            arrayBuilder.add(builder.build());
+        }
+
+        for(ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
+            serializeProcessGroupStatus(arrayBuilder, factory, childGroupStatus, df, hostname,
+                    applicationName, platform, status.getId(), currentDate);
+        }
+        for(ProcessorStatus processorStatus : status.getProcessorStatus()) {
+            serializeProcessorStatus(arrayBuilder, factory, processorStatus, df, hostname,
+                    applicationName, platform, status.getId(), currentDate);
+        }
+        for(ConnectionStatus connectionStatus : status.getConnectionStatus()) {
+            serializeConnectionStatus(arrayBuilder, factory, connectionStatus, df, hostname,
+                    applicationName, platform, status.getId(), currentDate);
+        }
+        for(PortStatus portStatus : status.getInputPortStatus()) {
+            serializePortStatus("InputPort", arrayBuilder, factory, portStatus, df,
+                    hostname, applicationName, platform, status.getId(), currentDate);
+        }
+        for(PortStatus portStatus : status.getOutputPortStatus()) {
+            serializePortStatus("OutputPort", arrayBuilder, factory, portStatus, df,
+                    hostname, applicationName, platform, status.getId(), currentDate);
+        }
+        for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus())
{
+            serializeRemoteProcessGroupStatus(arrayBuilder, factory, remoteProcessGroupStatus,
df, hostname,
+                    applicationName, platform, status.getId(), currentDate);
+        }
+    }
+
+    void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory
factory,
+            final RemoteProcessGroupStatus status, final DateFormat df, final String hostname,
final String applicationName,
+            final String platform, final String parentId, final Date currentDate) {
+        final JsonObjectBuilder builder = factory.createObjectBuilder();
+        final String componentType = "RemoteProcessGroup";
+        final String componentName = status.getName();
+
+        if (componentMatchesFilters(componentType, componentName)) {
+            addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
+                    componentType, componentName);
+
+            addField(builder, "componentId", status.getId());
+            addField(builder, "activeRemotePortCount", status.getActiveRemotePortCount());
+            addField(builder, "activeThreadCount", status.getActiveThreadCount());
+            addField(builder, "inactiveRemotePortCount", status.getInactiveRemotePortCount());
+            addField(builder, "receivedContentSize", status.getReceivedContentSize());
+            addField(builder, "receivedCount", status.getReceivedCount());
+            addField(builder, "sentContentSize", status.getSentContentSize());
+            addField(builder, "sentCount", status.getSentCount());
+            addField(builder, "averageLineageDuration", status.getAverageLineageDuration());
+
+            arrayBuilder.add(builder.build());
+        }
+    }
+
+    void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder,
final JsonBuilderFactory factory, final PortStatus status,
+            final DateFormat df, final String hostname, final String applicationName, final
String platform, final String parentId, final Date currentDate) {
+        final JsonObjectBuilder builder = factory.createObjectBuilder();
+        final String componentName = status.getName();
+
+        if (componentMatchesFilters(componentType, componentName)) {
+            addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
+                    componentType, componentName);
+
+            addField(builder, "componentId", status.getId());
+            addField(builder, "activeThreadCount", status.getActiveThreadCount());
+            addField(builder, "bytesReceived", status.getBytesReceived());
+            addField(builder, "bytesSent", status.getBytesSent());
+            addField(builder, "flowFilesReceived", status.getFlowFilesReceived());
+            addField(builder, "flowFilesSent", status.getFlowFilesSent());
+            addField(builder, "inputBytes", status.getInputBytes());
+            addField(builder, "inputCount", status.getInputCount());
+            addField(builder, "outputBytes", status.getOutputBytes());
+            addField(builder, "outputCount", status.getOutputCount());
+
+            arrayBuilder.add(builder.build());
+        }
+    }
+
+    void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory
factory, final ConnectionStatus status, final DateFormat df,
+            final String hostname, final String applicationName, final String platform, final
String parentId, final Date currentDate) {
+        final JsonObjectBuilder builder = factory.createObjectBuilder();
+        final String componentType = "Connection";
+        final String componentName = status.getName();
+
+        if (componentMatchesFilters(componentType, componentName)) {
+            addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
+                    componentType, componentName);
+
+            addField(builder, "componentId", status.getId());
+            addField(builder, "maxQueuedBytes", status.getMaxQueuedBytes());
+            addField(builder, "maxQueuedCount", status.getMaxQueuedCount());
+            addField(builder, "queuedBytes", status.getQueuedBytes());
+            addField(builder, "queuedCount", status.getQueuedCount());
+            addField(builder, "inputBytes", status.getInputBytes());
+            addField(builder, "inputCount", status.getInputCount());
+            addField(builder, "outputBytes", status.getOutputBytes());
+            addField(builder, "outputCount", status.getOutputCount());
+
+            arrayBuilder.add(builder.build());
+        }
+    }
+
+    void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory
factory, final ProcessorStatus status, final DateFormat df,
+            final String hostname, final String applicationName, final String platform, final
String parentId, final Date currentDate) {
+        final JsonObjectBuilder builder = factory.createObjectBuilder();
+        final String componentType = "Processor";
+        final String componentName = status.getName();
+
+        if (componentMatchesFilters(componentType, componentName)) {
+            addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
componentType, componentName);
+
+            addField(builder, "componentId", status.getId());
+            addField(builder, "processorType", status.getType());
+            addField(builder, "averageLineageDurationMS", status.getAverageLineageDuration());
+            addField(builder, "bytesRead", status.getBytesRead());
+            addField(builder, "bytesWritten", status.getBytesWritten());
+            addField(builder, "bytesReceived", status.getBytesReceived());
+            addField(builder, "bytesSent", status.getBytesSent());
+            addField(builder, "flowFilesRemoved", status.getFlowFilesRemoved());
+            addField(builder, "flowFilesReceived", status.getFlowFilesReceived());
+            addField(builder, "flowFilesSent", status.getFlowFilesSent());
+            addField(builder, "inputCount", status.getInputCount());
+            addField(builder, "inputBytes", status.getInputBytes());
+            addField(builder, "outputCount", status.getOutputCount());
+            addField(builder, "outputBytes", status.getOutputBytes());
+            addField(builder, "activeThreadCount", status.getActiveThreadCount());
+            addField(builder, "invocations", status.getInvocations());
+            addField(builder, "processingNanos", status.getProcessingNanos());
+
+            arrayBuilder.add(builder.build());
+        }
+    }
+
+    private static void addCommonFields(final JsonObjectBuilder builder, final DateFormat
df, final String hostname,
+            final String applicationName, final String platform, final String parentId, final
Date currentDate,
+            final String componentType, final String componentName) {
+        addField(builder, "statusId", UUID.randomUUID().toString());
+        addField(builder, "timestampMillis", currentDate.getTime());
+        addField(builder, "timestamp", df.format(currentDate));
+        addField(builder, "actorHostname", hostname);
+        addField(builder, "componentType", componentType);
+        addField(builder, "componentName", componentName);
+        addField(builder, "parentId", parentId);
+        addField(builder, "platform", platform);
+        addField(builder, "application", applicationName);
+    }
+
+    private static void addField(final JsonObjectBuilder builder, final String key, final
Long value) {
+        if (value != null) {
+            builder.add(key, value.longValue());
+        }
+    }
+
+    private static void addField(final JsonObjectBuilder builder, final String key, final
Integer value) {
+        if (value != null) {
+            builder.add(key, value.intValue());
+        }
+    }
+
+    private static void addField(final JsonObjectBuilder builder, final String key, final
String value) {
+        if (value == null) {
+            return;
+        }
+
+        builder.add(key, value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ea6320d6/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
index bdf61cc..0aced94 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -14,4 +14,5 @@
 # limitations under the License.
 
 org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
-org.apache.nifi.reporting.SiteToSiteBulletinReportingTask
\ No newline at end of file
+org.apache.nifi.reporting.SiteToSiteBulletinReportingTask
+org.apache.nifi.reporting.SiteToSiteStatusReportingTask
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/ea6320d6/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
new file mode 100644
index 0000000..3c737d1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting;
+
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.json.Json;
+import javax.json.JsonReader;
+import javax.json.JsonString;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockPropertyValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestSiteToSiteStatusReportingTask {
+    private ReportingContext context;
+
+    public MockSiteToSiteStatusReportingTask initTask(Map<PropertyDescriptor, String>
customProperties,
+            ProcessGroupStatus pgStatus) throws InitializationException {
+        final MockSiteToSiteStatusReportingTask task = new MockSiteToSiteStatusReportingTask();
+        Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors())
{
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.putAll(customProperties);
+
+        context = Mockito.mock(ReportingContext.class);
+        Mockito.when(context.getStateManager())
+                .thenReturn(new MockStateManager(task));
+        Mockito.doAnswer(new Answer<PropertyValue>() {
+            @Override
+            public PropertyValue answer(final InvocationOnMock invocation) throws Throwable
{
+                final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
+                return new MockPropertyValue(properties.get(descriptor));
+            }
+        }).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
+
+        final EventAccess eventAccess = Mockito.mock(EventAccess.class);
+
+        Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
+        Mockito.when(eventAccess.getControllerStatus()).thenReturn(pgStatus);
+
+        final ComponentLog logger = Mockito.mock(ComponentLog.class);
+        final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
+        Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+        Mockito.when(initContext.getLogger()).thenReturn(logger);
+        task.initialize(initContext);
+
+        return task;
+    }
+
+    @Test
+    public void testSerializedForm() throws IOException, InitializationException {
+        final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome",
1, 0);
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
+        properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
+        properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, ".*");
+
+        MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
+        task.onTrigger(context);
+
+        assertEquals(16, task.dataSent.size());
+        final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
+        JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
+        JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId");
+        assertEquals(pgStatus.getId(), componentId.getString());
+    }
+
+    @Test
+    public void testComponentTypeFilter() throws IOException, InitializationException {
+        final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome",
1, 0);
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
+        properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
+        properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(ProcessGroup|RootProcessGroup)");
+
+        MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
+        task.onTrigger(context);
+
+        assertEquals(1, task.dataSent.size()); // Only root pg and 3 child pgs
+        final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
+        JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
+        JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId");
+        assertEquals(pgStatus.getId(), componentId.getString());
+    }
+
+    @Test
+    public void testComponentNameFilter() throws IOException, InitializationException {
+        final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome",
1, 0);
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
+        properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*processor.*");
+        properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, ".*");
+
+        MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
+        task.onTrigger(context);
+
+        assertEquals(3, task.dataSent.size());  // 3 processors for each of 4 groups
+        final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
+        JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
+        JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId");
+        assertEquals("root.1.processor.1", componentId.getString());
+    }
+
+    @Test
+    public void testComponentNameFilter_nested() throws IOException, InitializationException
{
+        final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome",
2, 0);
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
+        properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*processor.*");
+        properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, ".*");
+
+        MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
+        task.onTrigger(context);
+
+        assertEquals(10, task.dataSent.size());  // 3 + (3 * 3) + (3 * 3 * 3) = 39, or 10
batches of 4
+        final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
+        JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
+        JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId");
+        assertEquals("root.1.1.processor.1", componentId.getString());
+    }
+
+    public static ProcessGroupStatus generateProcessGroupStatus(String id, String namePrefix,
+            int maxRecursion, int currentDepth) {
+        Collection<ConnectionStatus> cStatus = new ArrayList<>();
+        Collection<PortStatus> ipStatus = new ArrayList<>();
+        Collection<PortStatus> opStatus = new ArrayList<>();
+        Collection<ProcessorStatus> pStatus = new ArrayList<>();
+        Collection<RemoteProcessGroupStatus> rpgStatus = new ArrayList<>();
+        Collection<ProcessGroupStatus> childPgStatus = new ArrayList<>();
+
+        if (currentDepth < maxRecursion) {
+            for(int i = 1; i < 4; i++) {
+                childPgStatus.add(generateProcessGroupStatus(id + "." + i, namePrefix + "."
+ i,
+                        maxRecursion, currentDepth + 1));
+            }
+        }
+        for(int i = 1; i < 4; i++) {
+            pStatus.add(generateProcessorStatus(id + ".processor." + i, namePrefix + ".processor."
+ i));
+        }
+        for(int i = 1; i < 4; i++) {
+            cStatus.add(generateConnectionStatus(id + ".connection." + i, namePrefix + ".connection."
+ i));
+        }
+        for(int i = 1; i < 4; i++) {
+            rpgStatus.add(generateRemoteProcessGroupStatus(id + ".rpg." + i, namePrefix +
".rpg." + i));
+        }
+        for(int i = 1; i < 4; i++) {
+            ipStatus.add(generatePortStatus(id + ".ip." + i, namePrefix + ".ip." + i));
+        }
+        for(int i = 1; i < 4; i++) {
+            opStatus.add(generatePortStatus(id + ".op." + i, namePrefix + ".op." + i));
+        }
+
+        ProcessGroupStatus pgStatus = new ProcessGroupStatus();
+        pgStatus.setId(id);
+        pgStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
+        pgStatus.setInputPortStatus(ipStatus);
+        pgStatus.setOutputPortStatus(opStatus);
+        pgStatus.setProcessGroupStatus(childPgStatus);
+        pgStatus.setRemoteProcessGroupStatus(rpgStatus);
+        pgStatus.setProcessorStatus(pStatus);
+
+        pgStatus.setActiveThreadCount(1);
+        pgStatus.setBytesRead(2L);
+        pgStatus.setBytesReceived(3l);
+        pgStatus.setBytesSent(4l);
+        pgStatus.setBytesTransferred(5l);
+        pgStatus.setBytesWritten(6l);
+        pgStatus.setConnectionStatus(cStatus);
+        pgStatus.setFlowFilesReceived(7);
+        pgStatus.setFlowFilesSent(8);
+        pgStatus.setFlowFilesTransferred(9);
+        pgStatus.setInputContentSize(10l);
+        pgStatus.setInputCount(11);
+        pgStatus.setOutputContentSize(12l);
+        pgStatus.setOutputCount(13);
+        pgStatus.setQueuedContentSize(14l);
+        pgStatus.setQueuedCount(15);
+
+        return pgStatus;
+    }
+
+    public static PortStatus generatePortStatus(String id, String namePrefix) {
+        PortStatus pStatus = new PortStatus();
+        pStatus.setId(id);
+        pStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
+        pStatus.setActiveThreadCount(0);
+        pStatus.setBytesReceived(1l);
+        pStatus.setBytesSent(2l);
+        pStatus.setFlowFilesReceived(3);
+        pStatus.setFlowFilesSent(4);
+        pStatus.setInputBytes(5l);
+        pStatus.setInputCount(6);
+        pStatus.setOutputBytes(7l);
+        pStatus.setOutputCount(8);
+
+        return pStatus;
+    }
+
+    public static ProcessorStatus generateProcessorStatus(String id, String namePrefix) {
+        ProcessorStatus pStatus = new ProcessorStatus();
+        pStatus.setId(id);
+        pStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
+        pStatus.setActiveThreadCount(0);
+        pStatus.setAverageLineageDuration(1l);
+        pStatus.setBytesRead(2l);
+        pStatus.setBytesReceived(3l);
+        pStatus.setBytesSent(4l);
+        pStatus.setBytesWritten(5l);
+        pStatus.setFlowFilesReceived(6);
+        pStatus.setFlowFilesRemoved(7);
+        pStatus.setFlowFilesSent(8);
+        pStatus.setInputBytes(9l);
+        pStatus.setInputCount(10);
+        pStatus.setInvocations(11);
+        pStatus.setOutputBytes(12l);
+        pStatus.setOutputCount(13);
+        pStatus.setProcessingNanos(14l);
+        pStatus.setType("type");
+
+        return pStatus;
+    }
+
+    public static RemoteProcessGroupStatus generateRemoteProcessGroupStatus(String id, String
namePrefix) {
+        RemoteProcessGroupStatus rpgStatus = new RemoteProcessGroupStatus();
+        rpgStatus.setId(id);
+        rpgStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
+        rpgStatus.setActiveRemotePortCount(0);
+        rpgStatus.setActiveThreadCount(1);
+        rpgStatus.setAverageLineageDuration(2l);
+        rpgStatus.setInactiveRemotePortCount(3);
+        rpgStatus.setReceivedContentSize(4l);
+        rpgStatus.setReceivedCount(5);
+        rpgStatus.setSentContentSize(6l);
+        rpgStatus.setSentCount(7);
+        rpgStatus.setTargetUri("uri");
+
+        return rpgStatus;
+    }
+
+    public static ConnectionStatus generateConnectionStatus(String id, String namePrefix)
{
+        ConnectionStatus cStatus = new ConnectionStatus();
+        cStatus.setId(id);
+        cStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
+        cStatus.setBackPressureBytesThreshold(0l);
+        cStatus.setBackPressureObjectThreshold(1l);
+        cStatus.setInputBytes(2l);
+        cStatus.setInputCount(3);
+        cStatus.setMaxQueuedBytes(4l);
+        cStatus.setMaxQueuedCount(5);
+        cStatus.setOutputBytes(6);
+        cStatus.setOutputCount(7);
+        cStatus.setQueuedBytes(8l);
+        cStatus.setQueuedCount(9);
+
+        return cStatus;
+    }
+
+    public static FlowFile createFlowFile(final long id, final Map<String, String>
attributes) {
+        MockFlowFile mockFlowFile = new MockFlowFile(id);
+        mockFlowFile.putAttributes(attributes);
+        return mockFlowFile;
+    }
+
+    private static final class MockSiteToSiteStatusReportingTask extends SiteToSiteStatusReportingTask
{
+
+        final List<byte[]> dataSent = new ArrayList<>();
+
+        @Override
+        protected SiteToSiteClient getClient() {
+            final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
+            final Transaction transaction = Mockito.mock(Transaction.class);
+
+            try {
+                Mockito.doAnswer(new Answer<Object>() {
+                    @Override
+                    public Object answer(final InvocationOnMock invocation) throws Throwable
{
+                        final byte[] data = invocation.getArgumentAt(0, byte[].class);
+                        dataSent.add(data);
+                        return null;
+                    }
+                }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
+
+                Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
+            } catch (final Exception e) {
+                e.printStackTrace();
+                Assert.fail(e.toString());
+            }
+
+            return client;
+        }
+
+        public List<byte[]> getDataSent() {
+            return dataSent;
+        }
+    }
+
+}


Mime
View raw message