nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mattyb...@apache.org
Subject [1/4] nifi git commit: NIFI-4707: Build full component map for ID -> Name association in provenance reporting"
Date Tue, 02 Jan 2018 19:49:29 GMT
Repository: nifi
Updated Branches:
  refs/heads/master b7c9c88f9 -> 84cecfbee


NIFI-4707: Build full component map for ID -> Name association in provenance reporting"

NIFI-4707: Add process group ID/name to S2SProvReportingTask records

NIFI-4707: Added support for filtering provenance on process group ID

NIFI-4707: Fixed support for provenance in Atlas reporting task

NIFI-4707: Refactored common code into reporting-utils, fixed filtering


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1f793923
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1f793923
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1f793923

Branch: refs/heads/master
Commit: 1f793923a4f7663cdab6c259b49e6b4167553109
Parents: b7c9c88
Author: Matthew Burgess <mattyb149@apache.org>
Authored: Mon Dec 18 13:44:21 2017 -0500
Committer: Matthew Burgess <mattyb149@apache.org>
Committed: Tue Jan 2 14:46:36 2018 -0500

----------------------------------------------------------------------
 .../atlas/reporting/ReportLineageToAtlas.java   |  2 +-
 .../util/provenance/ComponentMapHolder.java     | 97 ++++++++++++++++++++
 .../provenance/ProvenanceEventConsumer.java     | 35 +++++--
 .../SiteToSiteProvenanceReportingTask.java      | 62 ++++---------
 4 files changed, 143 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1f793923/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index 4c78ef7..f722b9d 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -640,7 +640,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
         final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, clusterResolvers,
                 // FIXME: This class cast shouldn't be necessary to query lineage. Possible
refactor target in next major update.
                 (ProvenanceRepository)eventAccess.getProvenanceRepository());
-        consumer.consumeEvents(eventAccess, context.getStateManager(), events -> {
+        consumer.consumeEvents(context, context.getStateManager(), (componentMapHolder, events)
-> {
             for (ProvenanceEventRecord event : events) {
                 try {
                     lineageStrategy.processEvent(analysisContext, nifiFlow, event);

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f793923/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
new file mode 100644
index 0000000..495968a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.provenance;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ComponentMapHolder {
+    final Map<String,String> componentMap = new HashMap<>();
+    final Map<String,String> componentToParentGroupMap = new HashMap<>();
+
+    public ComponentMapHolder putAll(ComponentMapHolder holder) {
+        this.componentMap.putAll(holder.getComponentMap());
+        this.componentToParentGroupMap.putAll(holder.getComponentToParentGroupMap());
+        return this;
+    }
+
+    public Map<String, String> getComponentMap() {
+        return componentMap;
+    }
+
+    public Map<String, String> getComponentToParentGroupMap() {
+        return componentToParentGroupMap;
+    }
+
+    public String getComponentName(final String componentId) {
+        return componentMap.get(componentId);
+    }
+
+    public String getProcessGroupId(final String componentId) {
+        return componentToParentGroupMap.get(componentId);
+    }
+
+    public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status)
{
+        final ComponentMapHolder holder = new ComponentMapHolder();
+        final Map<String,String> componentMap = holder.getComponentMap();
+        final Map<String,String> componentToParentGroupMap = holder.getComponentToParentGroupMap();
+
+        if (status != null) {
+            componentMap.put(status.getId(), status.getName());
+
+            for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
+                componentMap.put(procStatus.getId(), procStatus.getName());
+                componentToParentGroupMap.put(procStatus.getId(), status.getId());
+            }
+
+            for (final PortStatus portStatus : status.getInputPortStatus()) {
+                componentMap.put(portStatus.getId(), portStatus.getName());
+                componentToParentGroupMap.put(portStatus.getId(), status.getId());
+            }
+
+            for (final PortStatus portStatus : status.getOutputPortStatus()) {
+                componentMap.put(portStatus.getId(), portStatus.getName());
+                componentToParentGroupMap.put(portStatus.getId(), status.getId());
+            }
+
+            for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus())
{
+                componentMap.put(rpgStatus.getId(), rpgStatus.getName());
+                componentToParentGroupMap.put(rpgStatus.getId(), status.getId());
+            }
+
+            for (final ConnectionStatus connectionStatus : status.getConnectionStatus())
{
+                componentMap.put(connectionStatus.getId(), connectionStatus.getName());
+                componentToParentGroupMap.put(connectionStatus.getId(), status.getId());
+            }
+
+            for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
+                componentMap.put(childGroup.getId(), childGroup.getName());
+                componentToParentGroupMap.put(childGroup.getId(), status.getId());
+                holder.putAll(createComponentMap(childGroup));
+            }
+        }
+
+        return holder;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f793923/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
index 1cbdbf1..8256626 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
@@ -21,19 +21,21 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
 import java.util.regex.Pattern;
 
 public class ProvenanceEventConsumer {
@@ -111,8 +113,16 @@ public class ProvenanceEventConsumer {
         this.logger = logger;
     }
 
-    public void consumeEvents(final EventAccess eventAccess, final StateManager stateManager,
-                              final Consumer<List<ProvenanceEventRecord>> consumer)
throws ProcessException {
+    public void consumeEvents(final ReportingContext context, final StateManager stateManager,
+                              final BiConsumer<ComponentMapHolder, List<ProvenanceEventRecord>>
consumer) throws ProcessException {
+
+        if (context == null) {
+            logger.debug("No ReportingContext available.");
+            return;
+        }
+        final EventAccess eventAccess = context.getEventAccess();
+        final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus();
+        final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus);
 
         Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId();
 
@@ -160,7 +170,7 @@ public class ProvenanceEventConsumer {
         List<ProvenanceEventRecord> filteredEvents;
         try {
             rawEvents = eventAccess.getProvenanceEvents(firstEventId, batchSize);
-            filteredEvents = filterEvents(rawEvents);
+            filteredEvents = filterEvents(componentMapHolder, rawEvents);
         } catch (final IOException ioe) {
             logger.error("Failed to retrieve Provenance Events from repository due to: "
+ ioe.getMessage(), ioe);
             return;
@@ -176,7 +186,7 @@ public class ProvenanceEventConsumer {
 
             if (!filteredEvents.isEmpty()) {
                 // Executes callback.
-                consumer.accept(filteredEvents);
+                consumer.accept(componentMapHolder, filteredEvents);
             }
 
             firstEventId = updateLastEventId(rawEvents, stateManager);
@@ -184,7 +194,7 @@ public class ProvenanceEventConsumer {
             // Retrieve the next batch
             try {
                 rawEvents = eventAccess.getProvenanceEvents(firstEventId, batchSize);
-                filteredEvents = filterEvents(rawEvents);
+                filteredEvents = filterEvents(componentMapHolder, rawEvents);
             } catch (final IOException ioe) {
                 logger.error("Failed to retrieve Provenance Events from repository due to:
" + ioe.getMessage(), ioe);
                 return;
@@ -218,13 +228,20 @@ public class ProvenanceEventConsumer {
         return componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty();
     }
 
-    private List<ProvenanceEventRecord> filterEvents(List<ProvenanceEventRecord>
provenanceEvents) {
+    private List<ProvenanceEventRecord> filterEvents(ComponentMapHolder componentMapHolder,
List<ProvenanceEventRecord> provenanceEvents) {
         if(isFilteringEnabled()) {
-            List<ProvenanceEventRecord> filteredEvents = new ArrayList<ProvenanceEventRecord>();
+            List<ProvenanceEventRecord> filteredEvents = new ArrayList<>();
 
             for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
                 if(!componentIds.isEmpty() && !componentIds.contains(provenanceEventRecord.getComponentId()))
{
-                    continue;
+                    // If we aren't filtering it out based on component ID, let's see if
this component has a parent process group ID
+                    // that is being filtered on
+                    if (componentMapHolder == null || componentMapHolder.getComponentToParentGroupMap().isEmpty())
{
+                        continue;
+                    }
+                    if (!componentIds.contains(componentMapHolder.getComponentToParentGroupMap().get(provenanceEventRecord.getComponentId())))
{
+                        continue;
+                    }
                 }
                 if(!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType()))
{
                     continue;

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f793923/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index 8b8048b..c99e9d8 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -28,10 +28,7 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -46,6 +43,7 @@ import javax.json.JsonArrayBuilder;
 import javax.json.JsonBuilderFactory;
 import javax.json.JsonObject;
 import javax.json.JsonObjectBuilder;
+import javax.json.JsonValue;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -174,36 +172,6 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
         return properties;
     }
 
-    private Map<String,String> createComponentMap(final ProcessGroupStatus status)
{
-        final Map<String,String> componentMap = new HashMap<>();
-
-        if (status != null) {
-            componentMap.put(status.getId(), status.getName());
-
-            for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
-                componentMap.put(procStatus.getId(), procStatus.getName());
-            }
-
-            for (final PortStatus portStatus : status.getInputPortStatus()) {
-                componentMap.put(portStatus.getId(), portStatus.getName());
-            }
-
-            for (final PortStatus portStatus : status.getOutputPortStatus()) {
-                componentMap.put(portStatus.getId(), portStatus.getName());
-            }
-
-            for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus())
{
-                componentMap.put(rpgStatus.getId(), rpgStatus.getName());
-            }
-
-            for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
-                componentMap.put(childGroup.getId(), childGroup.getName());
-            }
-        }
-
-        return componentMap;
-    }
-
     @Override
     public void onTrigger(final ReportingContext context) {
         final boolean isClustered = context.isClustered();
@@ -216,8 +184,6 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
 
         final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus();
         final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
-        final Map<String,String> componentMap = createComponentMap(procGroupStatus);
-
         final String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue();
         URL url;
         try {
@@ -237,13 +203,15 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
         final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
         df.setTimeZone(TimeZone.getTimeZone("Z"));
 
-        consumer.consumeEvents(context.getEventAccess(), context.getStateManager(), events
-> {
+        consumer.consumeEvents(context, context.getStateManager(), (mapHolder, events) ->
{
             final long start = System.nanoTime();
             // Create a JSON array of all the events in the current batch
             final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
             for (final ProvenanceEventRecord event : events) {
-                final String componentName = componentMap.get(event.getComponentId());
-                arrayBuilder.add(serialize(factory, builder, event, df, componentName, hostname,
url, rootGroupName, platform, nodeId));
+                final String componentName = mapHolder.getComponentName(event.getComponentId());
+                final String processGroupId = mapHolder.getProcessGroupId(event.getComponentId());
+                final String processGroupName = mapHolder.getComponentMap().get(processGroupId);
+                arrayBuilder.add(serialize(factory, builder, event, df, componentName, processGroupId,
processGroupName, hostname, url, rootGroupName, platform, nodeId));
             }
             final JsonArray jsonArray = arrayBuilder.build();
 
@@ -277,7 +245,8 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
 
 
     static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder
builder, final ProvenanceEventRecord event, final DateFormat df,
-        final String componentName, final String hostname, final URL nifiUrl, final String
applicationName, final String platform, final String nodeIdentifier) {
+                                final String componentName, final String processGroupId,
final String processGroupName, final String hostname, final URL nifiUrl, final String applicationName,
+                                final String platform, final String nodeIdentifier) {
         addField(builder, "eventId", UUID.randomUUID().toString());
         addField(builder, "eventOrdinal", event.getEventId());
         addField(builder, "eventType", event.getEventType().name());
@@ -289,6 +258,8 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
         addField(builder, "componentId", event.getComponentId());
         addField(builder, "componentType", event.getComponentType());
         addField(builder, "componentName", componentName);
+        addField(builder, "processGroupId", processGroupId, true);
+        addField(builder, "processGroupName", processGroupName, true);
         addField(builder, "entityId", event.getFlowFileUuid());
         addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile");
         addField(builder, "entitySize", event.getFileSize());
@@ -352,11 +323,17 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
     }
 
     private static void addField(final JsonObjectBuilder builder, final String key, final
String value) {
+        addField(builder, key, value, false);
+    }
+
+    private static void addField(final JsonObjectBuilder builder, final String key, final
String value, final boolean allowNullValues) {
         if (value == null) {
-            return;
+            if (allowNullValues) {
+                builder.add(key, JsonValue.NULL);
+            }
+        } else {
+            builder.add(key, value);
         }
-
-        builder.add(key, value);
     }
 
     private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection<String>
values) {
@@ -368,5 +345,4 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
         }
         return builder;
     }
-
 }


Mime
View raw message