Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8381B200D6D for ; Mon, 18 Dec 2017 18:25:00 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8218C160BFB; Mon, 18 Dec 2017 17:25:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 103F6160C35 for ; Mon, 18 Dec 2017 18:24:57 +0100 (CET) Received: (qmail 93291 invoked by uid 500); 18 Dec 2017 17:24:56 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 92976 invoked by uid 99); 18 Dec 2017 17:24:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Dec 2017 17:24:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8CAD3F17B6; Mon, 18 Dec 2017 17:24:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: markap14@apache.org To: commits@nifi.apache.org Date: Mon, 18 Dec 2017 17:25:09 -0000 Message-Id: <7d498277fabf4d5c911d66f47110f10b@git.apache.org> In-Reply-To: <60267ae7b2df48a1a2b0df59d5926a67@git.apache.org> References: <60267ae7b2df48a1a2b0df59d5926a67@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/18] nifi git commit: NIFI-3709: Export NiFi flow dataset lineage to Apache Atlas archived-at: Mon, 18 Dec 2017 17:25:00 -0000 http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiTypes.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiTypes.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiTypes.java new file mode 100644 index 0000000..1e02449 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiTypes.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class NiFiTypes { + + public static final String TYPE_ASSET = "Asset"; + public static final String TYPE_REFERENCEABLE = "Referenceable"; + public static final String TYPE_PROCESS = "Process"; + public static final String TYPE_DATASET = "DataSet"; + public static final String TYPE_NIFI_COMPONENT = "nifi_component"; + public static final String TYPE_NIFI_FLOW = "nifi_flow"; + public static final String TYPE_NIFI_FLOW_PATH = "nifi_flow_path"; + public static final String TYPE_NIFI_DATA = "nifi_data"; + public static final String TYPE_NIFI_QUEUE = "nifi_queue"; + public static final String TYPE_NIFI_INPUT_PORT = "nifi_input_port"; + public static final String TYPE_NIFI_OUTPUT_PORT = "nifi_output_port"; + + public static final String ATTR_GUID = "guid"; + public static final String ATTR_TYPENAME = "typeName"; + public static final String ATTR_NAME = "name"; + public static final String ATTR_CLUSTER_NAME = "clusterName"; + public static final String ATTR_DESCRIPTION = "description"; + public static final String ATTR_INPUTS = "inputs"; + public static final String ATTR_OUTPUTS = "outputs"; + public static final String ATTR_URL = "url"; + public static final String ATTR_URI = "uri"; + public static final String ATTR_PATH = "path"; + public static final String ATTR_QUALIFIED_NAME = "qualifiedName"; + public static final String ATTR_NIFI_FLOW = "nifiFlow"; + public static final String ATTR_FLOW_PATHS = "flowPaths"; + public static final String ATTR_QUEUES = "queues"; + public static final String ATTR_INPUT_PORTS = "inputPorts"; + public static final String ATTR_OUTPUT_PORTS = "outputPorts"; + + @FunctionalInterface + interface EntityDefinition { + void define(AtlasEntityDef entity, Set superTypes, List attributes); + } + + private static String arrayOf(String typeName) { + return "array<" + typeName + ">"; + } + + private static EntityDefinition NIFI_FLOW = (entity, superTypes, attributes) -> { + entity.setVersion(1L); + superTypes.add(TYPE_REFERENCEABLE); + superTypes.add(TYPE_ASSET); + + final AtlasAttributeDef url = new AtlasAttributeDef(ATTR_URL, "string"); + + final AtlasAttributeDef flowPaths = new AtlasAttributeDef(ATTR_FLOW_PATHS, arrayOf(TYPE_NIFI_FLOW_PATH)); + flowPaths.setIsOptional(true); + // Set ownedRef so that child flowPaths entities those no longer exist can be deleted when a NiFi is updated. + final AtlasConstraintDef ownedRef = new AtlasConstraintDef("ownedRef"); + flowPaths.addConstraint(ownedRef); + + final AtlasAttributeDef queues = new AtlasAttributeDef(ATTR_QUEUES, arrayOf(TYPE_NIFI_QUEUE)); + queues.setIsOptional(true); + queues.addConstraint(ownedRef); + + final AtlasAttributeDef inputPorts = new AtlasAttributeDef(ATTR_INPUT_PORTS, arrayOf(TYPE_NIFI_INPUT_PORT)); + inputPorts.setIsOptional(true); + inputPorts.addConstraint(ownedRef); + + final AtlasAttributeDef outputPorts = new AtlasAttributeDef(ATTR_OUTPUT_PORTS, arrayOf(TYPE_NIFI_OUTPUT_PORT)); + outputPorts.setIsOptional(true); + outputPorts.addConstraint(ownedRef); + + attributes.add(url); + attributes.add(flowPaths); + attributes.add(queues); + attributes.add(inputPorts); + attributes.add(outputPorts); + }; + + private static EntityDefinition NIFI_COMPONENT = (entity, superTypes, attributes) -> { + entity.setVersion(1L); + + final AtlasAttributeDef nifiFlow = new AtlasAttributeDef(ATTR_NIFI_FLOW, TYPE_NIFI_FLOW); + nifiFlow.setIsOptional(true); + + attributes.add(nifiFlow); + }; + + private static EntityDefinition NIFI_FLOW_PATH = (entity, superTypes, attributes) -> { + entity.setVersion(1L); + superTypes.add(TYPE_PROCESS); + superTypes.add(TYPE_NIFI_COMPONENT); + + final AtlasAttributeDef url = new AtlasAttributeDef(ATTR_URL, "string"); + + attributes.add(url); + }; + + private static EntityDefinition NIFI_DATA = (entity, superTypes, attributes) -> { + entity.setVersion(1L); + superTypes.add(TYPE_DATASET); + superTypes.add(TYPE_NIFI_COMPONENT); + }; + + private static EntityDefinition NIFI_QUEUE = (entity, superTypes, attributes) -> { + entity.setVersion(1L); + superTypes.add(TYPE_DATASET); + superTypes.add(TYPE_NIFI_COMPONENT); + }; + + private static EntityDefinition NIFI_INPUT_PORT = (entity, superTypes, attributes) -> { + entity.setVersion(1L); + superTypes.add(TYPE_DATASET); + superTypes.add(TYPE_NIFI_COMPONENT); + }; + + private static EntityDefinition NIFI_OUTPUT_PORT = (entity, superTypes, attributes) -> { + entity.setVersion(1L); + superTypes.add(TYPE_DATASET); + superTypes.add(TYPE_NIFI_COMPONENT); + }; + + static Map ENTITIES = new HashMap<>(); + static { + ENTITIES.put(TYPE_NIFI_COMPONENT, NIFI_COMPONENT); + ENTITIES.put(TYPE_NIFI_DATA, NIFI_DATA); + ENTITIES.put(TYPE_NIFI_QUEUE, NIFI_QUEUE); + ENTITIES.put(TYPE_NIFI_INPUT_PORT, NIFI_INPUT_PORT); + ENTITIES.put(TYPE_NIFI_OUTPUT_PORT, NIFI_OUTPUT_PORT); + ENTITIES.put(TYPE_NIFI_FLOW_PATH, NIFI_FLOW_PATH); + ENTITIES.put(TYPE_NIFI_FLOW, NIFI_FLOW); + } + + static final String[] NIFI_TYPES = ENTITIES.keySet().toArray(new String[ENTITIES.size()]); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java new file mode 100644 index 0000000..069276a --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.provenance.ProvenanceEventType; + +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Arrays; + +public abstract class AbstractNiFiProvenanceEventAnalyzer implements NiFiProvenanceEventAnalyzer { + + /** + * Utility method to parse a string uri silently. + * @param uri uri to parse + * @return parsed URI instance + */ + protected URI parseUri(String uri) { + try { + return new URI(uri); + } catch (URISyntaxException e) { + final String msg = String.format("Failed to parse uri %s due to %s", uri, e); + throw new IllegalArgumentException(msg, e); + } + } + + /** + * Utility method to parse a string uri silently. + * @param url url to parse + * @return parsed URL instance + */ + protected URL parseUrl(String url) { + try { + return new URL(url); + } catch (MalformedURLException e) { + final String msg = String.format("Failed to parse url %s due to %s", url, e); + throw new IllegalArgumentException(msg, e); + } + } + + protected DataSetRefs singleDataSetRef(String componentId, ProvenanceEventType eventType, Referenceable ref) { + final DataSetRefs refs = new DataSetRefs(componentId); + switch (eventType) { + case SEND: + case REMOTE_INVOCATION: + refs.addOutput(ref); + break; + case FETCH: + case RECEIVE: + refs.addInput(ref); + break; + } + + return refs; + } + + /** + * Utility method to split comma separated host names. Port number will be removed. + */ + protected String[] splitHostNames(String hostNames) { + return Arrays.stream(hostNames.split(",")) + .map(hostName -> hostName.split(":")[0].trim()) + .toArray(String[]::new); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java new file mode 100644 index 0000000..c0071f0 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; + +import java.util.List; + +public interface AnalysisContext { + String getNiFiClusterName(); + ClusterResolver getClusterResolver(); + List findConnectionTo(String componentId); + List findConnectionFrom(String componentId); + ComputeLineageResult queryLineage(long eventId); + ComputeLineageResult findParents(long eventId); + ProvenanceEventRecord getProvenanceEvent(long eventId); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java new file mode 100644 index 0000000..4f745d1 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.atlas.typesystem.Referenceable; + +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Set; + +public class DataSetRefs { + private final Set componentIds; + private Set inputs; + private Set outputs; + private boolean referableFromRootPath; + + public DataSetRefs(String componentId) { + this.componentIds = Collections.singleton(componentId); + } + + public DataSetRefs(Set componentIds) { + this.componentIds = componentIds; + } + + public Set getComponentIds() { + return componentIds; + } + + public Set getInputs() { + return inputs != null ? inputs : Collections.emptySet(); + } + + public void addInput(Referenceable input) { + if (inputs == null) { + inputs = new LinkedHashSet<>(); + } + inputs.add(input); + } + + public Set getOutputs() { + return outputs != null ? outputs : Collections.emptySet(); + } + + public void addOutput(Referenceable output) { + if (outputs == null) { + outputs = new LinkedHashSet<>(); + } + outputs.add(output); + } + + public boolean isEmpty() { + return (inputs == null || inputs.isEmpty()) && (outputs == null || outputs.isEmpty()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzer.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzer.java new file mode 100644 index 0000000..a937c43 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; + +/** + * Responsible for analyzing NiFi provenance event data to generate Atlas DataSet reference. + * Implementations of this interface should be thread safe. + */ +public interface NiFiProvenanceEventAnalyzer { + + DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event); + + /** + * Returns target component type pattern that this Analyzer supports. + * Note that a component type of NiFi provenance event only has processor type name without package name. + * @return A RegularExpression to match with a component type of a provenance event. + */ + default String targetComponentTypePattern() { + return null; + } + + /** + * Returns target transit URI pattern that this Analyzer supports. + * @return A RegularExpression to match with a transit URI of a provenance event. + */ + default String targetTransitUriPattern() { + return null; + } + + /** + * Returns target provenance event type that this Analyzer supports. + * @return A Provenance event type + */ + default ProvenanceEventType targetProvenanceEventType() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java new file mode 100644 index 0000000..21f0231 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.nifi.provenance.ProvenanceEventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.ServiceLoader; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; + +public class NiFiProvenanceEventAnalyzerFactory { + + /** + * This holder class is used to implement initialization-on-demand holder idiom to avoid double-checked locking anti-pattern. + * The static initializer is performed only once for a class loader. + * See these links for detail: + * + */ + private static class AnalyzerHolder { + private static final Logger logger = LoggerFactory.getLogger(NiFiProvenanceEventAnalyzerFactory.AnalyzerHolder.class); + private static final Map analyzersForComponentType = new ConcurrentHashMap<>(); + private static final Map analyzersForTransitUri = new ConcurrentHashMap<>(); + private static final Map analyzersForProvenanceEventType = new ConcurrentHashMap<>(); + + private static void addAnalyzer(String patternStr, Map toAdd, + NiFiProvenanceEventAnalyzer analyzer) { + if (patternStr != null && !patternStr.isEmpty()) { + Pattern pattern = Pattern.compile(patternStr.trim()); + toAdd.put(pattern, analyzer); + } + } + + static { + logger.debug("Loading NiFiProvenanceEventAnalyzer ..."); + final ServiceLoader serviceLoader + = ServiceLoader.load(NiFiProvenanceEventAnalyzer.class); + serviceLoader.forEach(analyzer -> { + addAnalyzer(analyzer.targetComponentTypePattern(), analyzersForComponentType, analyzer); + addAnalyzer(analyzer.targetTransitUriPattern(), analyzersForTransitUri, analyzer); + final ProvenanceEventType eventType = analyzer.targetProvenanceEventType(); + if (eventType != null) { + if (analyzersForProvenanceEventType.containsKey(eventType)) { + logger.warn("Fo ProvenanceEventType {}, an Analyzer {} is already assigned." + + " Only one analyzer for a type can be registered. Ignoring {}", + eventType, analyzersForProvenanceEventType.get(eventType), analyzer); + } + analyzersForProvenanceEventType.put(eventType, analyzer); + } + }); + logger.info("Loaded NiFiProvenanceEventAnalyzers: componentTypes={}, transitUris={}", analyzersForComponentType, analyzersForTransitUri); + } + + private static Map getAnalyzersForComponentType() { + return analyzersForComponentType; + } + + private static Map getAnalyzersForTransitUri() { + return analyzersForTransitUri; + } + + private static Map getAnalyzersForProvenanceEventType() { + return analyzersForProvenanceEventType; + } + } + + + /** + * Find and retrieve NiFiProvenanceEventAnalyzer implementation for the specified targets. + * Pattern matching is performed by following order, and the one found at first is returned: + *
    + *
  1. Component type name. Use an analyzer supporting the Component type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}. + *
  2. TransitUri. Use an analyzer supporting the TransitUri with its {@link NiFiProvenanceEventAnalyzer#targetTransitUriPattern()}. + *
  3. Provenance Event Type. Use an analyzer supporting the Provenance Event Type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}. + *
+ * @param typeName NiFi component type name. + * @param transitUri Transit URI. + * @param eventType Provenance event type. + * @return Instance of NiFiProvenanceEventAnalyzer if one is found for the specified className, otherwise null. + */ + public static NiFiProvenanceEventAnalyzer getAnalyzer(String typeName, String transitUri, ProvenanceEventType eventType) { + + for (Map.Entry entry + : NiFiProvenanceEventAnalyzerFactory.AnalyzerHolder.getAnalyzersForComponentType().entrySet()) { + if (entry.getKey().matcher(typeName).matches()) { + return entry.getValue(); + } + } + + if (transitUri != null) { + for (Map.Entry entry + : NiFiProvenanceEventAnalyzerFactory.AnalyzerHolder.getAnalyzersForTransitUri().entrySet()) { + if (entry.getKey().matcher(transitUri).matches()) { + return entry.getValue(); + } + } + } + + // If there's no specific implementation, just use generic analyzer. + return NiFiProvenanceEventAnalyzerFactory.AnalyzerHolder.getAnalyzersForProvenanceEventType().get(eventType); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java new file mode 100644 index 0000000..7442d95 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +public class StandardAnalysisContext implements AnalysisContext { + + private final Logger logger = LoggerFactory.getLogger(StandardAnalysisContext.class); + private final NiFiFlow nifiFlow; + private final ClusterResolver clusterResolver; + private final ProvenanceRepository provenanceRepository; + + public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver clusterResolver, + ProvenanceRepository provenanceRepository) { + this.nifiFlow = nifiFlow; + this.clusterResolver = clusterResolver; + this.provenanceRepository = provenanceRepository; + } + + @Override + public List findConnectionTo(String componentId) { + return nifiFlow.getIncomingConnections(componentId); + } + + @Override + public List findConnectionFrom(String componentId) { + return nifiFlow.getOutgoingConnections(componentId); + } + + @Override + public String getNiFiClusterName() { + return nifiFlow.getClusterName(); + } + + @Override + public ClusterResolver getClusterResolver() { + return clusterResolver; + } + + private ComputeLineageResult getLineageResult(long eventId, ComputeLineageSubmission submission) { + final ComputeLineageResult result = submission.getResult(); + try { + if (result.awaitCompletion(10, TimeUnit.SECONDS)) { + return result; + } + logger.warn("Lineage query for {} timed out.", new Object[]{eventId}); + } catch (InterruptedException e) { + logger.warn("Lineage query for {} was interrupted due to {}.", new Object[]{eventId, e}, e); + } finally { + submission.cancel(); + } + + return null; + } + + @Override + public ComputeLineageResult queryLineage(long eventId) { + final ComputeLineageSubmission submission = provenanceRepository.submitLineageComputation(eventId, NIFI_USER); + return getLineageResult(eventId, submission); + } + + public ComputeLineageResult findParents(long eventId) { + final ComputeLineageSubmission submission = provenanceRepository.submitExpandParents(eventId, NIFI_USER); + return getLineageResult(eventId, submission); + } + + // NOTE: This user is required to avoid NullPointerException at PersistentProvenanceRepository.submitLineageComputation + private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser(); + private static class QueryNiFiUser implements NiFiUser { + @Override + public String getIdentity() { + return StandardAnalysisContext.class.getSimpleName(); + } + + @Override + public Set getGroups() { + return Collections.emptySet(); + } + + @Override + public NiFiUser getChain() { + return null; + } + + @Override + public boolean isAnonymous() { + return true; + } + + @Override + public String getClientAddress() { + return null; + } + } + + @Override + public ProvenanceEventRecord getProvenanceEvent(long eventId) { + try { + return provenanceRepository.getEvent(eventId); + } catch (IOException e) { + logger.error("Failed to get provenance event for {} due to {}", new Object[]{eventId, e}, e); + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java new file mode 100644 index 0000000..879b2ee --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer; +import org.apache.nifi.util.Tuple; + +import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_CLUSTER_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.toTableNameStr; + +public abstract class AbstractHiveAnalyzer extends AbstractNiFiProvenanceEventAnalyzer { + + static final String TYPE_DATABASE = "hive_db"; + static final String TYPE_TABLE = "hive_table"; + static final String ATTR_DB = "db"; + + protected Referenceable createDatabaseRef(String clusterName, String databaseName) { + final Referenceable ref = new Referenceable(TYPE_DATABASE); + ref.set(ATTR_NAME, databaseName); + ref.set(ATTR_CLUSTER_NAME, clusterName); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, databaseName)); + return ref; + } + + protected Referenceable createTableRef(String clusterName, Tuple tableName) { + final Referenceable ref = new Referenceable(TYPE_TABLE); + ref.set(ATTR_NAME, tableName.getValue()); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, toTableNameStr(tableName))); + ref.set(ATTR_DB, createDatabaseRef(clusterName, tableName.getKey())); + return ref; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java new file mode 100644 index 0000000..63ab1bf --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +public class DatabaseAnalyzerUtil { + + private static final Logger logger = LoggerFactory.getLogger(DatabaseAnalyzerUtil.class); + + public static String ATTR_INPUT_TABLES = "query.input.tables"; + public static String ATTR_OUTPUT_TABLES = "query.output.tables"; + + public static Set> parseTableNames(String connectedDatabaseName, String tableNamesStr) { + if (tableNamesStr == null || tableNamesStr.isEmpty()) { + return Collections.emptySet(); + } + return Arrays.stream(tableNamesStr.split(",")) + .map(String::trim).filter(s -> !s.isEmpty()) + .map(t -> DatabaseAnalyzerUtil.parseTableName(connectedDatabaseName, t)) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + } + + private static Tuple parseTableName(String connectedDatabaseName, String tableNameStr) { + final String[] tableNameSplit = tableNameStr.split("\\."); + if (tableNameSplit.length != 1 && tableNameSplit.length != 2) { + logger.warn("Unexpected table name format: {}", tableNameStr); + return null; + } + final String databaseName = tableNameSplit.length == 2 ? tableNameSplit[0] : connectedDatabaseName; + final String tableName = tableNameSplit.length == 2 ? tableNameSplit[1] : tableNameSplit[0]; + return new Tuple<>(databaseName, tableName); + } + + public static String toTableNameStr(Tuple tableName) { + return toTableNameStr(tableName.getKey(), tableName.getValue()); + } + + public static String toTableNameStr(String databaseName, String tableName) { + return String.format("%s.%s", databaseName, tableName); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java new file mode 100644 index 0000000..37df736 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.URI; +import java.net.UnknownHostException; + +import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_PATH; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; + +/** + * Analyze a transit URI as a file system path. + *
  • qualifiedName=/path/fileName@hostname (example: /tmp/dir/filename.txt@host.example.com) + *
  • name=/path/fileName (example: /tmp/dir/filename.txt) + */ +public class FilePath extends AbstractNiFiProvenanceEventAnalyzer { + + private static final Logger logger = LoggerFactory.getLogger(FilePath.class); + + private static final String TYPE = "fs_path"; + + @Override + public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { + final Referenceable ref = new Referenceable(TYPE); + final URI uri = parseUri(event.getTransitUri()); + final String clusterName; + try { + // use hostname in uri if available for remote path. + final String uriHost = uri.getHost(); + final String hostname = StringUtils.isEmpty(uriHost) ? InetAddress.getLocalHost().getHostName() : uriHost; + clusterName = context.getClusterResolver().fromHostNames(hostname); + } catch (UnknownHostException e) { + logger.warn("Failed to get localhost name due to " + e, e); + return null; + } + + final String path = uri.getPath(); + ref.set(ATTR_NAME, path); + ref.set(ATTR_PATH, path); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, path)); + + return singleDataSetRef(event.getComponentId(), event.getEventType(), ref); + } + + @Override + public String targetTransitUriPattern() { + return "^file:/.+$"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java new file mode 100644 index 0000000..0f446fb --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_URI; + +/** + * Analyze a transit URI as a HBase table. + *
  • qualifiedName=tableName@clusterName (example: myTable@cl1) + *
  • name=tableName (example: myTable) + */ +public class HBaseTable extends AbstractNiFiProvenanceEventAnalyzer { + + private static final Logger logger = LoggerFactory.getLogger(HBaseTable.class); + private static final String TYPE = "hbase_table"; + + // hbase://masterAddress/hbaseTableName/hbaseRowId(optional) + private static final Pattern URI_PATTERN = Pattern.compile("^hbase://([^/]+)/([^/]+)/?.*$"); + + @Override + public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { + + final String transitUri = event.getTransitUri(); + final Matcher uriMatcher = URI_PATTERN.matcher(transitUri); + if (!uriMatcher.matches()) { + logger.warn("Unexpected transit URI: {}", new Object[]{transitUri}); + return null; + } + + final Referenceable ref = new Referenceable(TYPE); + final String[] hostNames = splitHostNames(uriMatcher.group(1)); + final String clusterName = context.getClusterResolver().fromHostNames(hostNames); + + final String tableName = uriMatcher.group(2); + ref.set(ATTR_NAME, tableName); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, tableName)); + // TODO: 'uri' is a mandatory attribute, but what should we set? + ref.set(ATTR_URI, transitUri); + + return singleDataSetRef(event.getComponentId(), event.getEventType(), ref); + } + + @Override + public String targetTransitUriPattern() { + return "^hbase://.+$"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java new file mode 100644 index 0000000..b1ef828 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +import java.net.URI; + +import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_CLUSTER_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_PATH; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; + +/** + * Analyze a transit URI as a HDFS path. + *
  • qualifiedName=/path/fileName@clusterName (example: /app/warehouse/hive/db/default@cl1) + *
  • name=/path/fileName (example: /app/warehouse/hive/db/default) + */ +public class HDFSPath extends AbstractNiFiProvenanceEventAnalyzer { + + private static final String TYPE = "hdfs_path"; + + @Override + public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { + final Referenceable ref = new Referenceable(TYPE); + final URI uri = parseUri(event.getTransitUri()); + final String clusterName = context.getClusterResolver().fromHostNames(uri.getHost()); + final String path = uri.getPath(); + ref.set(ATTR_NAME, path); + ref.set(ATTR_PATH, path); + ref.set(ATTR_CLUSTER_NAME, clusterName); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, path)); + + return singleDataSetRef(event.getComponentId(), event.getEventType(), ref); + } + + @Override + public String targetTransitUriPattern() { + return "^hdfs://.+$"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java new file mode 100644 index 0000000..e393a09 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.Tuple; + +import java.net.URI; +import java.util.Set; + +import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_INPUT_TABLES; +import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_OUTPUT_TABLES; +import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.parseTableNames; + +/** + * Analyze provenance events for Hive2 using JDBC. + *
      + *
    • If a Provenance event has 'query.input.tables' or 'query.output.tables' attributes then 'hive_table' DataSet reference is created: + *
        + *
      • qualifiedName=tableName@clusterName (example: myTable@cl1) + *
      • name=tableName (example: myTable) + *
      + *
    • + *
    • If not, 'hive_database' DataSet reference is created from transit URI: + *
        + *
      • qualifiedName=dbName@clusterName (example: default@cl1) + *
      • dbName (example: default) + *
      + *
    • + *
    + */ +public class Hive2JDBC extends AbstractHiveAnalyzer { + + @Override + public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { + + // Replace the colon so that the schema in the URI can be parsed correctly. + final String transitUri = event.getTransitUri().replaceFirst("^jdbc:hive2", "jdbc-hive2"); + final URI uri = parseUri(transitUri); + final String clusterName = context.getClusterResolver().fromHostNames(uri.getHost()); + // Remove the heading '/' + final String path = uri.getPath(); + // If uri does not contain database name, then use 'default' as database name. + final String connectedDatabaseName = path == null || path.isEmpty() ? "default" : path.substring(1); + + final Set> inputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_INPUT_TABLES)); + final Set> outputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_OUTPUT_TABLES)); + + if (inputTables.isEmpty() && outputTables.isEmpty()) { + // If input/output tables are unknown, create database level lineage. + return getDatabaseRef(event.getComponentId(), event.getEventType(), + clusterName, connectedDatabaseName); + } + + final DataSetRefs refs = new DataSetRefs(event.getComponentId()); + addRefs(refs, true, clusterName, inputTables); + addRefs(refs, false, clusterName, outputTables); + return refs; + } + + private DataSetRefs getDatabaseRef(String componentId, ProvenanceEventType eventType, + String clusterName, String databaseName) { + final Referenceable ref = createDatabaseRef(clusterName, databaseName); + + return singleDataSetRef(componentId, eventType, ref); + } + + private void addRefs(DataSetRefs refs, boolean isInput, String clusterName, + Set> tableNames) { + tableNames.forEach(tableName -> { + final Referenceable ref = createTableRef(clusterName, tableName); + if (isInput) { + refs.addInput(ref); + } else { + refs.addOutput(ref); + } + }); + } + + @Override + public String targetTransitUriPattern() { + return "^jdbc:hive2://.+$"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java new file mode 100644 index 0000000..e3d4709 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_URI; + +/** + * Analyze a transit URI as a Kafka topic. + *
  • qualifiedName=topicName@clusterName (example: testTopic@cl1) + *
  • name=topicName (example: testTopic) + */ +public class KafkaTopic extends AbstractNiFiProvenanceEventAnalyzer { + + private static final Logger logger = LoggerFactory.getLogger(KafkaTopic.class); + + private static final String TYPE = "kafka_topic"; + private static final String ATTR_TOPIC = "topic"; + + // PLAINTEXT://0.example.com:6667,1.example.com:6667/topicA + private static final Pattern URI_PATTERN = Pattern.compile("^.+://([^/]+)/(.+)$"); + + @Override + public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { + final Referenceable ref = new Referenceable(TYPE); + + final String transitUri = event.getTransitUri(); + if (transitUri == null) { + return null; + } + + final Matcher uriMatcher = URI_PATTERN.matcher(transitUri); + if (!uriMatcher.matches()) { + logger.warn("Unexpected transit URI: {}", new Object[]{transitUri}); + return null; + } + + String clusterName = null; + for (String broker : uriMatcher.group(1).split(",")) { + final String brokerHostname = broker.split(":")[0].trim(); + clusterName = context.getClusterResolver().fromHostNames(brokerHostname); + if (clusterName != null && !clusterName.isEmpty()) { + break; + } + } + + final String topicName = uriMatcher.group(2); + + ref.set(ATTR_NAME, topicName); + ref.set(ATTR_TOPIC, topicName); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, topicName)); + ref.set(ATTR_URI, transitUri); + + return singleDataSetRef(event.getComponentId(), event.getEventType(), ref); + } + + @Override + public String targetComponentTypePattern() { + return "^(Publish|Consume)Kafka.*$"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java new file mode 100644 index 0000000..e2118f7 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; + +/** + * Analyze a transit URI as a NiFi Site-to-Site remote input/output port. + *
  • qualifiedName=remotePortGUID@clusterName (example: 35dbc0ab-015e-1000-144c-a8d71255027d@cl1) + *
  • name=portName (example: input) + */ +public class NiFiRemotePort extends NiFiS2S { + + private static final Logger logger = LoggerFactory.getLogger(NiFiRemotePort.class); + + @Override + public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { + + if (!ProvenanceEventType.SEND.equals(event.getEventType()) + && !ProvenanceEventType.RECEIVE.equals(event.getEventType())) { + return null; + } + + final boolean isRemoteInputPort = event.getComponentType().equals("Remote Input Port"); + final String type = isRemoteInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT; + + final String remotePortId = event.getComponentId(); + + final S2STransitUrl s2sUrl = parseTransitURL(event.getTransitUri(), context.getClusterResolver()); + + // Find connections that connects to/from the remote port. + final List connections = isRemoteInputPort + ? context.findConnectionTo(remotePortId) + : context.findConnectionFrom(remotePortId); + if (connections == null || connections.isEmpty()) { + logger.warn("Connection was not found: {}", new Object[]{event}); + return null; + } + + // The name of remote port can be retrieved from any connection, use the first one. + final ConnectionStatus connection = connections.get(0); + final Referenceable ref = new Referenceable(type); + ref.set(ATTR_NAME, isRemoteInputPort ? connection.getDestinationName() : connection.getSourceName()); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2sUrl.clusterName, s2sUrl.targetPortId)); + + return singleDataSetRef(event.getComponentId(), event.getEventType(), ref); + } + + @Override + public String targetComponentTypePattern() { + return "^Remote (In|Out)put Port$"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java new file mode 100644 index 0000000..4f66025 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; + +/** + * Analyze a provenance event as a NiFi RootGroupPort for Site-to-Site communication at the server side. + *
  • qualifiedName=rootPortGUID (example: 35dbc0ab-015e-1000-144c-a8d71255027d) + *
  • name=portName (example: input) + */ +public class NiFiRootGroupPort extends NiFiS2S { + + private static final Logger logger = LoggerFactory.getLogger(NiFiRootGroupPort.class); + + @Override + public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { + + if (!ProvenanceEventType.SEND.equals(event.getEventType()) + && !ProvenanceEventType.RECEIVE.equals(event.getEventType())) { + return null; + } + + final boolean isInputPort = event.getComponentType().equals("Input Port"); + final String type = isInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT; + final String rootPortId = event.getComponentId(); + + final S2STransitUrl s2sUrl = parseTransitURL(event.getTransitUri(), context.getClusterResolver()); + + // Find connections connecting to/from the remote port. + final List connections = isInputPort + ? context.findConnectionFrom(rootPortId) + : context.findConnectionTo(rootPortId); + if (connections == null || connections.isEmpty()) { + logger.warn("Connection was not found: {}", new Object[]{event}); + return null; + } + + // The name of the port can be retrieved from any connection, use the first one. + final ConnectionStatus connection = connections.get(0); + final Referenceable ref = new Referenceable(type); + ref.set(ATTR_NAME, isInputPort ? connection.getSourceName() : connection.getDestinationName()); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2sUrl.clusterName, rootPortId)); + + return singleDataSetRef(event.getComponentId(), event.getEventType(), ref); + } + + @Override + public String targetComponentTypePattern() { + return "^(In|Out)put Port$"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java new file mode 100644 index 0000000..762a1aa --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public abstract class NiFiS2S extends AbstractNiFiProvenanceEventAnalyzer { + + private static final Logger logger = LoggerFactory.getLogger(NiFiS2S.class); + + private static final Pattern RAW_URL_REGEX = Pattern.compile("([0-9a-zA-Z\\-]+)"); + private static final Pattern HTTP_URL_REGEX = Pattern.compile(".*/nifi-api/data-transfer/(in|out)put-ports/([[0-9a-zA-Z\\-]]+)/transactions/.*"); + + protected S2STransitUrl parseTransitURL(String transitUri, ClusterResolver clusterResolver) { + final URL url = parseUrl(transitUri); + + final String clusterName = clusterResolver.fromHostNames(url.getHost()); + final String targetPortId; + final String protocol = url.getProtocol().toLowerCase(); + switch (protocol) { + + case "http": + case "https": { + final Matcher uriMatcher = matchUrl(url, HTTP_URL_REGEX); + targetPortId = uriMatcher.group(2); + } + break; + + case "nifi": { + final Matcher uriMatcher = matchUrl(url, RAW_URL_REGEX); + targetPortId = uriMatcher.group(1); + } + break; + + default: + throw new IllegalArgumentException("Protocol " + protocol + " is not supported as NiFi S2S transit URL."); + + } + + return new S2STransitUrl(clusterName, targetPortId); + + } + + private Matcher matchUrl(URL url, Pattern pattern) { + final Matcher uriMatcher = pattern.matcher(url.getPath()); + if (!uriMatcher.matches()) { + throw new IllegalArgumentException("Unexpected transit URI: " + url); + } + return uriMatcher; + } + + protected static class S2STransitUrl { + final String clusterName; + final String targetPortId; + + public S2STransitUrl(String clusterName, String targetPortId) { + this.clusterName = clusterName; + this.targetPortId = targetPortId; + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java new file mode 100644 index 0000000..7b41c57 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.util.Tuple; + +import java.net.URI; +import java.util.Set; + +import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_OUTPUT_TABLES; +import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.parseTableNames; + +/** + * Analyze provenance events for PutHiveStreamingProcessor. + *
  • qualifiedName=tableName@clusterName (example: myTable@cl1) + *
  • name=tableName (example: myTable) + */ +public class PutHiveStreaming extends AbstractHiveAnalyzer { + + @Override + public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { + + final URI uri = parseUri(event.getTransitUri()); + final String clusterName = context.getClusterResolver().fromHostNames(uri.getHost()); + final Set> outputTables = parseTableNames(null, event.getAttribute(ATTR_OUTPUT_TABLES)); + if (outputTables.isEmpty()) { + return null; + } + + final DataSetRefs refs = new DataSetRefs(event.getComponentId()); + outputTables.forEach(tableName -> { + final Referenceable ref = createTableRef(clusterName, tableName); + refs.addOutput(ref); + }); + return refs; + } + + @Override + public String targetComponentTypePattern() { + return "^PutHiveStreaming$"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Create.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Create.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Create.java new file mode 100644 index 0000000..9e6726f --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Create.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer.unknown; + +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; + +import java.util.List; + +/** + * Analyze a CREATE event and create 'nifi_data' when there is no specific Analyzer implementation found. + *
  • qualifiedName=NiFiComponentId@clusterName (example: processor GUID@cl1) + *
  • name=NiFiComponentType (example: GenerateFlowFile) + */ +public class Create extends UnknownInput { + + @Override + public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { + + // Check if this component is a processor that generates data. + final String componentId = event.getComponentId(); + final List incomingConnections = context.findConnectionTo(componentId); + if (incomingConnections != null && !incomingConnections.isEmpty()) { + return null; + } + + return super.analyze(context, event); + } + + @Override + public ProvenanceEventType targetProvenanceEventType() { + return ProvenanceEventType.CREATE; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Fetch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Fetch.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Fetch.java new file mode 100644 index 0000000..3fd9d70 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Fetch.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer.unknown; + +import org.apache.nifi.provenance.ProvenanceEventType; + +/** + * Analyze a FETCH event and create 'nifi_data' when there is no specific Analyzer implementation found. + *
  • qualifiedName=NiFiComponentId@clusterName (example: processor GUID@cl1) + *
  • name=NiFiComponentType (example: FetchXXX) + */ +public class Fetch extends UnknownInput { + @Override + public ProvenanceEventType targetProvenanceEventType() { + return ProvenanceEventType.FETCH; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Receive.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Receive.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Receive.java new file mode 100644 index 0000000..9ee2ded --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Receive.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer.unknown; + +import org.apache.nifi.provenance.ProvenanceEventType; + +/** + * Analyze a RECEIVE event and create 'nifi_data' when there is no specific Analyzer implementation found. + *
  • qualifiedName=NiFiComponentId@clusterName (example: processor GUID@cl1) + *
  • name=NiFiComponentType (example: GetXXX) + */ +public class Receive extends UnknownInput { + @Override + public ProvenanceEventType targetProvenanceEventType() { + return ProvenanceEventType.RECEIVE; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/RemoteInvocation.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/RemoteInvocation.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/RemoteInvocation.java new file mode 100644 index 0000000..4d7d205 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/RemoteInvocation.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer.unknown; + +import org.apache.nifi.provenance.ProvenanceEventType; + +/** + * Analyze a REMOTE_INVOCATION event and create 'nifi_data' when there is no specific Analyzer implementation found. + *
  • qualifiedName=NiFiComponentId@clusterName (example: processor GUID@cl1) + *
  • name=NiFiComponentType (example: XXX) + */ +public class RemoteInvocation extends UnknownOutput { + @Override + public ProvenanceEventType targetProvenanceEventType() { + return ProvenanceEventType.REMOTE_INVOCATION; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Send.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Send.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Send.java new file mode 100644 index 0000000..81b4d6f --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Send.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer.unknown; + +import org.apache.nifi.provenance.ProvenanceEventType; + +/** + * Analyze a SEND event and create 'nifi_data' when there is no specific Analyzer implementation found. + *
  • qualifiedName=NiFiComponentId@clusterName (example: processor GUID@cl1) + *
  • name=NiFiComponentType (example: PutXXX) + */ +public class Send extends UnknownOutput { + @Override + public ProvenanceEventType targetProvenanceEventType() { + return ProvenanceEventType.SEND; + } +}