nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [15/18] nifi git commit: NIFI-3709: Export NiFi flow dataset lineage to Apache Atlas
Date Mon, 18 Dec 2017 17:25:09 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiTypes.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiTypes.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiTypes.java
new file mode 100644
index 0000000..1e02449
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiTypes.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class NiFiTypes {
+
+    public static final String TYPE_ASSET = "Asset";
+    public static final String TYPE_REFERENCEABLE = "Referenceable";
+    public static final String TYPE_PROCESS = "Process";
+    public static final String TYPE_DATASET = "DataSet";
+    public static final String TYPE_NIFI_COMPONENT = "nifi_component";
+    public static final String TYPE_NIFI_FLOW = "nifi_flow";
+    public static final String TYPE_NIFI_FLOW_PATH = "nifi_flow_path";
+    public static final String TYPE_NIFI_DATA = "nifi_data";
+    public static final String TYPE_NIFI_QUEUE = "nifi_queue";
+    public static final String TYPE_NIFI_INPUT_PORT = "nifi_input_port";
+    public static final String TYPE_NIFI_OUTPUT_PORT = "nifi_output_port";
+
+    public static final String ATTR_GUID = "guid";
+    public static final String ATTR_TYPENAME = "typeName";
+    public static final String ATTR_NAME = "name";
+    public static final String ATTR_CLUSTER_NAME = "clusterName";
+    public static final String ATTR_DESCRIPTION = "description";
+    public static final String ATTR_INPUTS = "inputs";
+    public static final String ATTR_OUTPUTS = "outputs";
+    public static final String ATTR_URL = "url";
+    public static final String ATTR_URI = "uri";
+    public static final String ATTR_PATH = "path";
+    public static final String ATTR_QUALIFIED_NAME = "qualifiedName";
+    public static final String ATTR_NIFI_FLOW = "nifiFlow";
+    public static final String ATTR_FLOW_PATHS = "flowPaths";
+    public static final String ATTR_QUEUES = "queues";
+    public static final String ATTR_INPUT_PORTS = "inputPorts";
+    public static final String ATTR_OUTPUT_PORTS = "outputPorts";
+
+    @FunctionalInterface
+    interface EntityDefinition {
+        void define(AtlasEntityDef entity, Set<String> superTypes, List<AtlasAttributeDef> attributes);
+    }
+
+    private static String arrayOf(String typeName) {
+        return "array<" + typeName + ">";
+    }
+
+    private static EntityDefinition NIFI_FLOW = (entity, superTypes, attributes) -> {
+        entity.setVersion(1L);
+        superTypes.add(TYPE_REFERENCEABLE);
+        superTypes.add(TYPE_ASSET);
+
+        final AtlasAttributeDef url = new AtlasAttributeDef(ATTR_URL, "string");
+
+        final AtlasAttributeDef flowPaths = new AtlasAttributeDef(ATTR_FLOW_PATHS, arrayOf(TYPE_NIFI_FLOW_PATH));
+        flowPaths.setIsOptional(true);
+        // Set ownedRef so that child flowPaths entities those no longer exist can be deleted when a NiFi is updated.
+        final AtlasConstraintDef ownedRef = new AtlasConstraintDef("ownedRef");
+        flowPaths.addConstraint(ownedRef);
+
+        final AtlasAttributeDef queues = new AtlasAttributeDef(ATTR_QUEUES, arrayOf(TYPE_NIFI_QUEUE));
+        queues.setIsOptional(true);
+        queues.addConstraint(ownedRef);
+
+        final AtlasAttributeDef inputPorts = new AtlasAttributeDef(ATTR_INPUT_PORTS, arrayOf(TYPE_NIFI_INPUT_PORT));
+        inputPorts.setIsOptional(true);
+        inputPorts.addConstraint(ownedRef);
+
+        final AtlasAttributeDef outputPorts = new AtlasAttributeDef(ATTR_OUTPUT_PORTS, arrayOf(TYPE_NIFI_OUTPUT_PORT));
+        outputPorts.setIsOptional(true);
+        outputPorts.addConstraint(ownedRef);
+
+        attributes.add(url);
+        attributes.add(flowPaths);
+        attributes.add(queues);
+        attributes.add(inputPorts);
+        attributes.add(outputPorts);
+    };
+
+    private static EntityDefinition NIFI_COMPONENT = (entity, superTypes, attributes) -> {
+        entity.setVersion(1L);
+
+        final AtlasAttributeDef nifiFlow = new AtlasAttributeDef(ATTR_NIFI_FLOW, TYPE_NIFI_FLOW);
+        nifiFlow.setIsOptional(true);
+
+        attributes.add(nifiFlow);
+    };
+
+    private static EntityDefinition NIFI_FLOW_PATH = (entity, superTypes, attributes) -> {
+        entity.setVersion(1L);
+        superTypes.add(TYPE_PROCESS);
+        superTypes.add(TYPE_NIFI_COMPONENT);
+
+        final AtlasAttributeDef url = new AtlasAttributeDef(ATTR_URL, "string");
+
+        attributes.add(url);
+    };
+
+    private static EntityDefinition NIFI_DATA = (entity, superTypes, attributes) -> {
+        entity.setVersion(1L);
+        superTypes.add(TYPE_DATASET);
+        superTypes.add(TYPE_NIFI_COMPONENT);
+    };
+
+    private static EntityDefinition NIFI_QUEUE = (entity, superTypes, attributes) -> {
+        entity.setVersion(1L);
+        superTypes.add(TYPE_DATASET);
+        superTypes.add(TYPE_NIFI_COMPONENT);
+    };
+
+    private static EntityDefinition NIFI_INPUT_PORT = (entity, superTypes, attributes) -> {
+        entity.setVersion(1L);
+        superTypes.add(TYPE_DATASET);
+        superTypes.add(TYPE_NIFI_COMPONENT);
+    };
+
+    private static EntityDefinition NIFI_OUTPUT_PORT = (entity, superTypes, attributes) -> {
+        entity.setVersion(1L);
+        superTypes.add(TYPE_DATASET);
+        superTypes.add(TYPE_NIFI_COMPONENT);
+    };
+
+    static Map<String, EntityDefinition> ENTITIES = new HashMap<>();
+    static {
+        ENTITIES.put(TYPE_NIFI_COMPONENT, NIFI_COMPONENT);
+        ENTITIES.put(TYPE_NIFI_DATA, NIFI_DATA);
+        ENTITIES.put(TYPE_NIFI_QUEUE, NIFI_QUEUE);
+        ENTITIES.put(TYPE_NIFI_INPUT_PORT, NIFI_INPUT_PORT);
+        ENTITIES.put(TYPE_NIFI_OUTPUT_PORT, NIFI_OUTPUT_PORT);
+        ENTITIES.put(TYPE_NIFI_FLOW_PATH, NIFI_FLOW_PATH);
+        ENTITIES.put(TYPE_NIFI_FLOW, NIFI_FLOW);
+    }
+
+    static final String[] NIFI_TYPES = ENTITIES.keySet().toArray(new String[ENTITIES.size()]);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java
new file mode 100644
index 0000000..069276a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.Arrays;
+
+public abstract class AbstractNiFiProvenanceEventAnalyzer implements NiFiProvenanceEventAnalyzer {
+
+    /**
+     * Utility method to parse a string uri silently.
+     * @param uri uri to parse
+     * @return parsed URI instance
+     */
+    protected URI parseUri(String uri) {
+        try {
+            return new URI(uri);
+        } catch (URISyntaxException e) {
+            final String msg = String.format("Failed to parse uri %s due to %s", uri, e);
+            throw new IllegalArgumentException(msg, e);
+        }
+    }
+
+    /**
+     * Utility method to parse a string uri silently.
+     * @param url url to parse
+     * @return parsed URL instance
+     */
+    protected URL parseUrl(String url) {
+        try {
+            return new URL(url);
+        } catch (MalformedURLException e) {
+            final String msg = String.format("Failed to parse url %s due to %s", url, e);
+            throw new IllegalArgumentException(msg, e);
+        }
+    }
+
+    protected DataSetRefs singleDataSetRef(String componentId, ProvenanceEventType eventType, Referenceable ref) {
+        final DataSetRefs refs = new DataSetRefs(componentId);
+        switch (eventType) {
+            case SEND:
+            case REMOTE_INVOCATION:
+                refs.addOutput(ref);
+                break;
+            case FETCH:
+            case RECEIVE:
+                refs.addInput(ref);
+                break;
+        }
+
+        return refs;
+    }
+
+    /**
+     * Utility method to split comma separated host names. Port number will be removed.
+     */
+    protected String[] splitHostNames(String hostNames) {
+        return Arrays.stream(hostNames.split(","))
+                .map(hostName -> hostName.split(":")[0].trim())
+                .toArray(String[]::new);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
new file mode 100644
index 0000000..c0071f0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+
+import java.util.List;
+
+public interface AnalysisContext {
+    String getNiFiClusterName();
+    ClusterResolver getClusterResolver();
+    List<ConnectionStatus> findConnectionTo(String componentId);
+    List<ConnectionStatus> findConnectionFrom(String componentId);
+    ComputeLineageResult queryLineage(long eventId);
+    ComputeLineageResult findParents(long eventId);
+    ProvenanceEventRecord getProvenanceEvent(long eventId);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java
new file mode 100644
index 0000000..4f745d1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.atlas.typesystem.Referenceable;
+
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+public class DataSetRefs {
+    private final Set<String> componentIds;
+    private Set<Referenceable> inputs;
+    private Set<Referenceable> outputs;
+    private boolean referableFromRootPath;
+
+    public DataSetRefs(String componentId) {
+        this.componentIds = Collections.singleton(componentId);
+    }
+
+    public DataSetRefs(Set<String> componentIds) {
+        this.componentIds = componentIds;
+    }
+
+    public Set<String> getComponentIds() {
+        return componentIds;
+    }
+
+    public Set<Referenceable> getInputs() {
+        return inputs != null ? inputs : Collections.emptySet();
+    }
+
+    public void addInput(Referenceable input) {
+        if (inputs == null) {
+            inputs = new LinkedHashSet<>();
+        }
+        inputs.add(input);
+    }
+
+    public Set<Referenceable> getOutputs() {
+        return outputs != null ? outputs : Collections.emptySet();
+    }
+
+    public void addOutput(Referenceable output) {
+        if (outputs == null) {
+            outputs = new LinkedHashSet<>();
+        }
+        outputs.add(output);
+    }
+
+    public boolean isEmpty() {
+        return (inputs == null || inputs.isEmpty()) && (outputs == null || outputs.isEmpty());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzer.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzer.java
new file mode 100644
index 0000000..a937c43
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+/**
+ * Responsible for analyzing NiFi provenance event data to generate Atlas DataSet reference.
+ * Implementations of this interface should be thread safe.
+ */
+public interface NiFiProvenanceEventAnalyzer {
+
+    DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event);
+
+    /**
+     * Returns target component type pattern that this Analyzer supports.
+     * Note that a component type of NiFi provenance event only has processor type name without package name.
+     * @return A RegularExpression to match with a component type of a provenance event.
+     */
+    default String targetComponentTypePattern() {
+        return null;
+    }
+
+    /**
+     * Returns target transit URI pattern that this Analyzer supports.
+     * @return A RegularExpression to match with a transit URI of a provenance event.
+     */
+    default String targetTransitUriPattern() {
+        return null;
+    }
+
+    /**
+     * Returns target provenance event type that this Analyzer supports.
+     * @return A Provenance event type
+     */
+    default ProvenanceEventType targetProvenanceEventType() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java
new file mode 100644
index 0000000..21f0231
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
+
+public class NiFiProvenanceEventAnalyzerFactory {
+
+    /**
+     * This holder class is used to implement initialization-on-demand holder idiom to avoid double-checked locking anti-pattern.
+     * The static initializer is performed only once for a class loader.
+     * See these links for detail:
+     * <ul>
+     *     <li><a href="https://en.wikipedia.org/wiki/Double-checked_locking">Double-checked locking</a></li>
+     *     <li><a href="https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom">Initialization-on-demand holder</a></li>
+     * </ul>
+     */
+    private static class AnalyzerHolder {
+        private static final Logger logger = LoggerFactory.getLogger(NiFiProvenanceEventAnalyzerFactory.AnalyzerHolder.class);
+        private static final Map<Pattern, NiFiProvenanceEventAnalyzer> analyzersForComponentType = new ConcurrentHashMap<>();
+        private static final Map<Pattern, NiFiProvenanceEventAnalyzer> analyzersForTransitUri = new ConcurrentHashMap<>();
+        private static final Map<ProvenanceEventType, NiFiProvenanceEventAnalyzer> analyzersForProvenanceEventType = new ConcurrentHashMap<>();
+
+        private static void addAnalyzer(String patternStr, Map<Pattern, NiFiProvenanceEventAnalyzer> toAdd,
+                                        NiFiProvenanceEventAnalyzer analyzer) {
+            if (patternStr != null && !patternStr.isEmpty()) {
+                Pattern pattern = Pattern.compile(patternStr.trim());
+                toAdd.put(pattern, analyzer);
+            }
+        }
+
+        static {
+            logger.debug("Loading NiFiProvenanceEventAnalyzer ...");
+            final ServiceLoader<NiFiProvenanceEventAnalyzer> serviceLoader
+                    = ServiceLoader.load(NiFiProvenanceEventAnalyzer.class);
+            serviceLoader.forEach(analyzer -> {
+                addAnalyzer(analyzer.targetComponentTypePattern(), analyzersForComponentType, analyzer);
+                addAnalyzer(analyzer.targetTransitUriPattern(), analyzersForTransitUri, analyzer);
+                final ProvenanceEventType eventType = analyzer.targetProvenanceEventType();
+                if (eventType != null) {
+                    if (analyzersForProvenanceEventType.containsKey(eventType)) {
+                        logger.warn("Fo ProvenanceEventType {}, an Analyzer {} is already assigned." +
+                                        " Only one analyzer for a type can be registered. Ignoring {}",
+                                eventType, analyzersForProvenanceEventType.get(eventType), analyzer);
+                    }
+                    analyzersForProvenanceEventType.put(eventType, analyzer);
+                }
+            });
+            logger.info("Loaded NiFiProvenanceEventAnalyzers: componentTypes={}, transitUris={}", analyzersForComponentType, analyzersForTransitUri);
+        }
+
+        private static Map<Pattern, NiFiProvenanceEventAnalyzer> getAnalyzersForComponentType() {
+            return analyzersForComponentType;
+        }
+
+        private static Map<Pattern, NiFiProvenanceEventAnalyzer> getAnalyzersForTransitUri() {
+            return analyzersForTransitUri;
+        }
+
+        private static Map<ProvenanceEventType, NiFiProvenanceEventAnalyzer> getAnalyzersForProvenanceEventType() {
+            return analyzersForProvenanceEventType;
+        }
+    }
+
+
+    /**
+     * Find and retrieve NiFiProvenanceEventAnalyzer implementation for the specified targets.
+     * Pattern matching is performed by following order, and the one found at first is returned:
+     * <ol>
+     * <li>Component type name. Use an analyzer supporting the Component type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+     * <li>TransitUri. Use an analyzer supporting the TransitUri with its {@link NiFiProvenanceEventAnalyzer#targetTransitUriPattern()}.
+     * <li>Provenance Event Type. Use an analyzer supporting the Provenance Event Type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+     * </ol>
+     * @param typeName NiFi component type name.
+     * @param transitUri Transit URI.
+     * @param eventType Provenance event type.
+     * @return Instance of NiFiProvenanceEventAnalyzer if one is found for the specified className, otherwise null.
+     */
+    public static NiFiProvenanceEventAnalyzer getAnalyzer(String typeName, String transitUri, ProvenanceEventType eventType) {
+
+        for (Map.Entry<Pattern, NiFiProvenanceEventAnalyzer> entry
+                : NiFiProvenanceEventAnalyzerFactory.AnalyzerHolder.getAnalyzersForComponentType().entrySet()) {
+            if (entry.getKey().matcher(typeName).matches()) {
+                return entry.getValue();
+            }
+        }
+
+        if (transitUri != null) {
+            for (Map.Entry<Pattern, NiFiProvenanceEventAnalyzer> entry
+                    : NiFiProvenanceEventAnalyzerFactory.AnalyzerHolder.getAnalyzersForTransitUri().entrySet()) {
+                if (entry.getKey().matcher(transitUri).matches()) {
+                    return entry.getValue();
+                }
+            }
+        }
+
+        // If there's no specific implementation, just use generic analyzer.
+        return NiFiProvenanceEventAnalyzerFactory.AnalyzerHolder.getAnalyzersForProvenanceEventType().get(eventType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
new file mode 100644
index 0000000..7442d95
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+    private final Logger logger = LoggerFactory.getLogger(StandardAnalysisContext.class);
+    private final NiFiFlow nifiFlow;
+    private final ClusterResolver clusterResolver;
+    private final ProvenanceRepository provenanceRepository;
+
+    public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver clusterResolver,
+                                   ProvenanceRepository provenanceRepository) {
+        this.nifiFlow = nifiFlow;
+        this.clusterResolver = clusterResolver;
+        this.provenanceRepository = provenanceRepository;
+    }
+
+    @Override
+    public List<ConnectionStatus> findConnectionTo(String componentId) {
+        return nifiFlow.getIncomingConnections(componentId);
+    }
+
+    @Override
+    public List<ConnectionStatus> findConnectionFrom(String componentId) {
+        return nifiFlow.getOutgoingConnections(componentId);
+    }
+
+    @Override
+    public String getNiFiClusterName() {
+        return nifiFlow.getClusterName();
+    }
+
+    @Override
+    public ClusterResolver getClusterResolver() {
+        return clusterResolver;
+    }
+
+    private ComputeLineageResult getLineageResult(long eventId, ComputeLineageSubmission submission) {
+        final ComputeLineageResult result = submission.getResult();
+        try {
+            if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
+                return result;
+            }
+            logger.warn("Lineage query for {} timed out.", new Object[]{eventId});
+        } catch (InterruptedException e) {
+            logger.warn("Lineage query for {} was interrupted due to {}.", new Object[]{eventId, e}, e);
+        } finally {
+            submission.cancel();
+        }
+
+        return null;
+    }
+
+    @Override
+    public ComputeLineageResult queryLineage(long eventId) {
+        final ComputeLineageSubmission submission = provenanceRepository.submitLineageComputation(eventId, NIFI_USER);
+        return getLineageResult(eventId, submission);
+    }
+
+    public ComputeLineageResult findParents(long eventId) {
+        final ComputeLineageSubmission submission = provenanceRepository.submitExpandParents(eventId, NIFI_USER);
+        return getLineageResult(eventId, submission);
+    }
+
+    // NOTE: This user is required to avoid NullPointerException at PersistentProvenanceRepository.submitLineageComputation
+    private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser();
+    private static class QueryNiFiUser implements NiFiUser {
+        @Override
+        public String getIdentity() {
+            return StandardAnalysisContext.class.getSimpleName();
+        }
+
+        @Override
+        public Set<String> getGroups() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public NiFiUser getChain() {
+            return null;
+        }
+
+        @Override
+        public boolean isAnonymous() {
+            return true;
+        }
+
+        @Override
+        public String getClientAddress() {
+            return null;
+        }
+    }
+
+    @Override
+    public ProvenanceEventRecord getProvenanceEvent(long eventId) {
+        try {
+            return provenanceRepository.getEvent(eventId);
+        } catch (IOException e) {
+            logger.error("Failed to get provenance event for {} due to {}", new Object[]{eventId, e}, e);
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java
new file mode 100644
index 0000000..879b2ee
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
+import org.apache.nifi.util.Tuple;
+
+import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_CLUSTER_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.toTableNameStr;
+
+public abstract class AbstractHiveAnalyzer extends AbstractNiFiProvenanceEventAnalyzer {
+
+    static final String TYPE_DATABASE = "hive_db";
+    static final String TYPE_TABLE = "hive_table";
+    static final String ATTR_DB = "db";
+
+    protected Referenceable createDatabaseRef(String clusterName, String databaseName) {
+        final Referenceable ref = new Referenceable(TYPE_DATABASE);
+        ref.set(ATTR_NAME, databaseName);
+        ref.set(ATTR_CLUSTER_NAME, clusterName);
+        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, databaseName));
+        return ref;
+    }
+
+    protected Referenceable createTableRef(String clusterName, Tuple<String, String> tableName) {
+        final Referenceable ref = new Referenceable(TYPE_TABLE);
+        ref.set(ATTR_NAME, tableName.getValue());
+        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, toTableNameStr(tableName)));
+        ref.set(ATTR_DB, createDatabaseRef(clusterName, tableName.getKey()));
+        return ref;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java
new file mode 100644
index 0000000..63ab1bf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DatabaseAnalyzerUtil {
+
+    private static final Logger logger = LoggerFactory.getLogger(DatabaseAnalyzerUtil.class);
+
+    public static String ATTR_INPUT_TABLES = "query.input.tables";
+    public static String ATTR_OUTPUT_TABLES = "query.output.tables";
+
+    public static  Set<Tuple<String, String>> parseTableNames(String connectedDatabaseName, String tableNamesStr) {
+        if (tableNamesStr == null || tableNamesStr.isEmpty()) {
+            return Collections.emptySet();
+        }
+        return Arrays.stream(tableNamesStr.split(","))
+                .map(String::trim).filter(s -> !s.isEmpty())
+                .map(t -> DatabaseAnalyzerUtil.parseTableName(connectedDatabaseName, t))
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+    }
+
+    private static  Tuple<String, String> parseTableName(String connectedDatabaseName, String tableNameStr) {
+        final String[] tableNameSplit = tableNameStr.split("\\.");
+        if (tableNameSplit.length != 1 && tableNameSplit.length != 2) {
+            logger.warn("Unexpected table name format: {}", tableNameStr);
+            return null;
+        }
+        final String databaseName = tableNameSplit.length == 2 ? tableNameSplit[0] : connectedDatabaseName;
+        final String tableName = tableNameSplit.length == 2 ? tableNameSplit[1] : tableNameSplit[0];
+        return new Tuple<>(databaseName, tableName);
+    }
+
+    public static String toTableNameStr(Tuple<String, String> tableName) {
+        return toTableNameStr(tableName.getKey(), tableName.getValue());
+    }
+
+    public static String toTableNameStr(String databaseName, String tableName) {
+        return String.format("%s.%s", databaseName, tableName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java
new file mode 100644
index 0000000..37df736
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+
+import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+
+/**
+ * Analyze a transit URI as a file system path.
+ * <li>qualifiedName=/path/fileName@hostname (example: /tmp/dir/filename.txt@host.example.com)
+ * <li>name=/path/fileName (example: /tmp/dir/filename.txt)
+ */
+public class FilePath extends AbstractNiFiProvenanceEventAnalyzer {
+
+    private static final Logger logger = LoggerFactory.getLogger(FilePath.class);
+
+    private static final String TYPE = "fs_path";
+
+    @Override
+    public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
+        final Referenceable ref = new Referenceable(TYPE);
+        final URI uri = parseUri(event.getTransitUri());
+        final String clusterName;
+        try {
+            // use hostname in uri if available for remote path.
+            final String uriHost = uri.getHost();
+            final String hostname = StringUtils.isEmpty(uriHost) ? InetAddress.getLocalHost().getHostName() : uriHost;
+            clusterName = context.getClusterResolver().fromHostNames(hostname);
+        } catch (UnknownHostException e) {
+            logger.warn("Failed to get localhost name due to " + e, e);
+            return null;
+        }
+
+        final String path = uri.getPath();
+        ref.set(ATTR_NAME, path);
+        ref.set(ATTR_PATH, path);
+        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, path));
+
+        return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
+    }
+
+    @Override
+    public String targetTransitUriPattern() {
+        return "^file:/.+$";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java
new file mode 100644
index 0000000..0f446fb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URI;
+
+/**
+ * Analyze a transit URI as a HBase table.
+ * <li>qualifiedName=tableName@clusterName (example: myTable@cl1)
+ * <li>name=tableName (example: myTable)
+ */
+public class HBaseTable extends AbstractNiFiProvenanceEventAnalyzer {
+
+    private static final Logger logger = LoggerFactory.getLogger(HBaseTable.class);
+    private static final String TYPE = "hbase_table";
+
+    // hbase://masterAddress/hbaseTableName/hbaseRowId(optional)
+    private static final Pattern URI_PATTERN = Pattern.compile("^hbase://([^/]+)/([^/]+)/?.*$");
+
+    @Override
+    public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
+
+        final String transitUri = event.getTransitUri();
+        final Matcher uriMatcher = URI_PATTERN.matcher(transitUri);
+        if (!uriMatcher.matches()) {
+            logger.warn("Unexpected transit URI: {}", new Object[]{transitUri});
+            return null;
+        }
+
+        final Referenceable ref = new Referenceable(TYPE);
+        final String[] hostNames = splitHostNames(uriMatcher.group(1));
+        final String clusterName = context.getClusterResolver().fromHostNames(hostNames);
+
+        final String tableName = uriMatcher.group(2);
+        ref.set(ATTR_NAME, tableName);
+        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, tableName));
+        // TODO: 'uri' is a mandatory attribute, but what should we set?
+        ref.set(ATTR_URI, transitUri);
+
+        return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
+    }
+
+    @Override
+    public String targetTransitUriPattern() {
+        return "^hbase://.+$";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
new file mode 100644
index 0000000..b1ef828
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+import java.net.URI;
+
+import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_CLUSTER_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+
+/**
+ * Analyze a transit URI as a HDFS path.
+ * <li>qualifiedName=/path/fileName@clusterName (example: /app/warehouse/hive/db/default@cl1)
+ * <li>name=/path/fileName (example: /app/warehouse/hive/db/default)
+ */
+public class HDFSPath extends AbstractNiFiProvenanceEventAnalyzer {
+
+    private static final String TYPE = "hdfs_path";
+
+    @Override
+    public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
+        final Referenceable ref = new Referenceable(TYPE);
+        final URI uri = parseUri(event.getTransitUri());
+        final String clusterName = context.getClusterResolver().fromHostNames(uri.getHost());
+        final String path = uri.getPath();
+        ref.set(ATTR_NAME, path);
+        ref.set(ATTR_PATH, path);
+        ref.set(ATTR_CLUSTER_NAME, clusterName);
+        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, path));
+
+        return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
+    }
+
+    @Override
+    public String targetTransitUriPattern() {
+        return "^hdfs://.+$";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
new file mode 100644
index 0000000..e393a09
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.Tuple;
+
+import java.net.URI;
+import java.util.Set;
+
+import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_INPUT_TABLES;
+import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_OUTPUT_TABLES;
+import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.parseTableNames;
+
+/**
+ * Analyze provenance events for Hive2 using JDBC.
+ * <ul>
+ * <li>If a Provenance event has 'query.input.tables' or 'query.output.tables' attributes then 'hive_table' DataSet reference is created:
+ * <ul>
+ * <li>qualifiedName=tableName@clusterName (example: myTable@cl1)
+ * <li>name=tableName (example: myTable)
+ * </ul>
+ * </li>
+ * <li>If not, 'hive_database' DataSet reference is created from transit URI:
+ * <ul>
+ * <li>qualifiedName=dbName@clusterName (example: default@cl1)
+ * <li>dbName (example: default)
+ * </ul>
+ * </li>
+ * </ul>
+ */
+public class Hive2JDBC extends AbstractHiveAnalyzer {
+
+    @Override
+    public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
+
+        // Replace the colon so that the schema in the URI can be parsed correctly.
+        final String transitUri = event.getTransitUri().replaceFirst("^jdbc:hive2", "jdbc-hive2");
+        final URI uri = parseUri(transitUri);
+        final String clusterName = context.getClusterResolver().fromHostNames(uri.getHost());
+        // Remove the heading '/'
+        final String path = uri.getPath();
+        // If uri does not contain database name, then use 'default' as database name.
+        final String connectedDatabaseName = path == null || path.isEmpty() ? "default" : path.substring(1);
+
+        final Set<Tuple<String, String>> inputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_INPUT_TABLES));
+        final Set<Tuple<String, String>> outputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_OUTPUT_TABLES));
+
+        if (inputTables.isEmpty() && outputTables.isEmpty()) {
+            // If input/output tables are unknown, create database level lineage.
+            return getDatabaseRef(event.getComponentId(), event.getEventType(),
+                    clusterName, connectedDatabaseName);
+        }
+
+        final DataSetRefs refs = new DataSetRefs(event.getComponentId());
+        addRefs(refs, true, clusterName, inputTables);
+        addRefs(refs, false, clusterName, outputTables);
+        return refs;
+    }
+
+    private DataSetRefs getDatabaseRef(String componentId, ProvenanceEventType eventType,
+                                       String clusterName, String databaseName) {
+        final Referenceable ref = createDatabaseRef(clusterName, databaseName);
+
+        return singleDataSetRef(componentId, eventType, ref);
+    }
+
+    private void addRefs(DataSetRefs refs, boolean isInput, String clusterName,
+                                       Set<Tuple<String, String>> tableNames) {
+        tableNames.forEach(tableName -> {
+            final Referenceable ref = createTableRef(clusterName, tableName);
+            if (isInput) {
+                refs.addInput(ref);
+            } else {
+                refs.addOutput(ref);
+            }
+        });
+    }
+
+    @Override
+    public String targetTransitUriPattern() {
+        return "^jdbc:hive2://.+$";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java
new file mode 100644
index 0000000..e3d4709
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URI;
+
+/**
+ * Analyze a transit URI as a Kafka topic.
+ * <li>qualifiedName=topicName@clusterName (example: testTopic@cl1)
+ * <li>name=topicName (example: testTopic)
+ */
+public class KafkaTopic extends AbstractNiFiProvenanceEventAnalyzer {
+
+    private static final Logger logger = LoggerFactory.getLogger(KafkaTopic.class);
+
+    private static final String TYPE = "kafka_topic";
+    private static final String ATTR_TOPIC = "topic";
+
+    // PLAINTEXT://0.example.com:6667,1.example.com:6667/topicA
+    private static final Pattern URI_PATTERN = Pattern.compile("^.+://([^/]+)/(.+)$");
+
+    @Override
+    public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
+        final Referenceable ref = new Referenceable(TYPE);
+
+        final String transitUri = event.getTransitUri();
+        if (transitUri == null) {
+            return null;
+        }
+
+        final Matcher uriMatcher = URI_PATTERN.matcher(transitUri);
+        if (!uriMatcher.matches()) {
+            logger.warn("Unexpected transit URI: {}", new Object[]{transitUri});
+            return null;
+        }
+
+        String clusterName = null;
+        for (String broker : uriMatcher.group(1).split(",")) {
+            final String brokerHostname = broker.split(":")[0].trim();
+            clusterName = context.getClusterResolver().fromHostNames(brokerHostname);
+            if (clusterName != null && !clusterName.isEmpty()) {
+                break;
+            }
+        }
+
+        final String topicName = uriMatcher.group(2);
+
+        ref.set(ATTR_NAME, topicName);
+        ref.set(ATTR_TOPIC, topicName);
+        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, topicName));
+        ref.set(ATTR_URI, transitUri);
+
+        return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
+    }
+
+    @Override
+    public String targetComponentTypePattern() {
+        return "^(Publish|Consume)Kafka.*$";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
new file mode 100644
index 0000000..e2118f7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+
+/**
+ * Analyze a transit URI as a NiFi Site-to-Site remote input/output port.
+ * <li>qualifiedName=remotePortGUID@clusterName (example: 35dbc0ab-015e-1000-144c-a8d71255027d@cl1)
+ * <li>name=portName (example: input)
+ */
+public class NiFiRemotePort extends NiFiS2S {
+
+    private static final Logger logger = LoggerFactory.getLogger(NiFiRemotePort.class);
+
+    @Override
+    public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
+
+        if (!ProvenanceEventType.SEND.equals(event.getEventType())
+                && !ProvenanceEventType.RECEIVE.equals(event.getEventType())) {
+            return null;
+        }
+
+        final boolean isRemoteInputPort = event.getComponentType().equals("Remote Input Port");
+        final String type = isRemoteInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT;
+
+        final String remotePortId = event.getComponentId();
+
+        final S2STransitUrl s2sUrl = parseTransitURL(event.getTransitUri(), context.getClusterResolver());
+
+        // Find connections that connects to/from the remote port.
+        final List<ConnectionStatus> connections = isRemoteInputPort
+                ? context.findConnectionTo(remotePortId)
+                : context.findConnectionFrom(remotePortId);
+        if (connections == null || connections.isEmpty()) {
+            logger.warn("Connection was not found: {}", new Object[]{event});
+            return null;
+        }
+
+        // The name of remote port can be retrieved from any connection, use the first one.
+        final ConnectionStatus connection = connections.get(0);
+        final Referenceable ref = new Referenceable(type);
+        ref.set(ATTR_NAME, isRemoteInputPort ? connection.getDestinationName() : connection.getSourceName());
+        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2sUrl.clusterName, s2sUrl.targetPortId));
+
+        return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
+    }
+
+    @Override
+    public String targetComponentTypePattern() {
+        return "^Remote (In|Out)put Port$";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
new file mode 100644
index 0000000..4f66025
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+
+/**
+ * Analyze a provenance event as a NiFi RootGroupPort for Site-to-Site communication at the server side.
+ * <li>qualifiedName=rootPortGUID (example: 35dbc0ab-015e-1000-144c-a8d71255027d)
+ * <li>name=portName (example: input)
+ */
+public class NiFiRootGroupPort extends NiFiS2S {
+
+    private static final Logger logger = LoggerFactory.getLogger(NiFiRootGroupPort.class);
+
+    @Override
+    public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
+
+        if (!ProvenanceEventType.SEND.equals(event.getEventType())
+                && !ProvenanceEventType.RECEIVE.equals(event.getEventType())) {
+            return null;
+        }
+
+        final boolean isInputPort = event.getComponentType().equals("Input Port");
+        final String type = isInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT;
+        final String rootPortId = event.getComponentId();
+
+        final S2STransitUrl s2sUrl = parseTransitURL(event.getTransitUri(), context.getClusterResolver());
+
+        // Find connections connecting to/from the remote port.
+        final List<ConnectionStatus> connections = isInputPort
+                ? context.findConnectionFrom(rootPortId)
+                : context.findConnectionTo(rootPortId);
+        if (connections == null || connections.isEmpty()) {
+            logger.warn("Connection was not found: {}", new Object[]{event});
+            return null;
+        }
+
+        // The name of the port can be retrieved from any connection, use the first one.
+        final ConnectionStatus connection = connections.get(0);
+        final Referenceable ref = new Referenceable(type);
+        ref.set(ATTR_NAME, isInputPort ? connection.getSourceName() : connection.getDestinationName());
+        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2sUrl.clusterName, rootPortId));
+
+        return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
+    }
+
+    @Override
+    public String targetComponentTypePattern() {
+        return "^(In|Out)put Port$";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java
new file mode 100644
index 0000000..762a1aa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public abstract class NiFiS2S extends AbstractNiFiProvenanceEventAnalyzer {
+
+    private static final Logger logger = LoggerFactory.getLogger(NiFiS2S.class);
+
+    private static final Pattern RAW_URL_REGEX = Pattern.compile("([0-9a-zA-Z\\-]+)");
+    private static final Pattern HTTP_URL_REGEX = Pattern.compile(".*/nifi-api/data-transfer/(in|out)put-ports/([[0-9a-zA-Z\\-]]+)/transactions/.*");
+
+    protected S2STransitUrl parseTransitURL(String transitUri, ClusterResolver clusterResolver) {
+        final URL url = parseUrl(transitUri);
+
+        final String clusterName = clusterResolver.fromHostNames(url.getHost());
+        final String targetPortId;
+        final String protocol = url.getProtocol().toLowerCase();
+        switch (protocol) {
+
+            case "http":
+            case "https": {
+                final Matcher uriMatcher = matchUrl(url, HTTP_URL_REGEX);
+                targetPortId = uriMatcher.group(2);
+            }
+            break;
+
+            case "nifi": {
+                final Matcher uriMatcher = matchUrl(url, RAW_URL_REGEX);
+                targetPortId = uriMatcher.group(1);
+            }
+            break;
+
+            default:
+                throw new IllegalArgumentException("Protocol " + protocol + " is not supported as NiFi S2S transit URL.");
+
+        }
+
+        return new S2STransitUrl(clusterName, targetPortId);
+
+    }
+
+    private Matcher matchUrl(URL url, Pattern pattern) {
+        final Matcher uriMatcher = pattern.matcher(url.getPath());
+        if (!uriMatcher.matches()) {
+            throw new IllegalArgumentException("Unexpected transit URI: " + url);
+        }
+        return uriMatcher;
+    }
+
+    protected static class S2STransitUrl {
+        final String clusterName;
+        final String targetPortId;
+
+        public S2STransitUrl(String clusterName, String targetPortId) {
+            this.clusterName = clusterName;
+            this.targetPortId = targetPortId;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java
new file mode 100644
index 0000000..7b41c57
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.util.Tuple;
+
+import java.net.URI;
+import java.util.Set;
+
+import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_OUTPUT_TABLES;
+import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.parseTableNames;
+
+/**
+ * Analyze provenance events for PutHiveStreamingProcessor.
+ * <li>qualifiedName=tableName@clusterName (example: myTable@cl1)
+ * <li>name=tableName (example: myTable)
+ */
+public class PutHiveStreaming extends AbstractHiveAnalyzer {
+
+    @Override
+    public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
+
+        final URI uri = parseUri(event.getTransitUri());
+        final String clusterName = context.getClusterResolver().fromHostNames(uri.getHost());
+        final Set<Tuple<String, String>> outputTables = parseTableNames(null, event.getAttribute(ATTR_OUTPUT_TABLES));
+        if (outputTables.isEmpty()) {
+            return null;
+        }
+
+        final DataSetRefs refs = new DataSetRefs(event.getComponentId());
+        outputTables.forEach(tableName -> {
+            final Referenceable ref = createTableRef(clusterName, tableName);
+            refs.addOutput(ref);
+        });
+        return refs;
+    }
+
+    @Override
+    public String targetComponentTypePattern() {
+        return "^PutHiveStreaming$";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Create.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Create.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Create.java
new file mode 100644
index 0000000..9e6726f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Create.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer.unknown;
+
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+import java.util.List;
+
+/**
+ * Analyze a CREATE event and create 'nifi_data' when there is no specific Analyzer implementation found.
+ * <li>qualifiedName=NiFiComponentId@clusterName (example: processor GUID@cl1)
+ * <li>name=NiFiComponentType (example: GenerateFlowFile)
+ */
+public class Create extends UnknownInput {
+
+    @Override
+    public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
+
+        // Check if this component is a processor that generates data.
+        final String componentId = event.getComponentId();
+        final List<ConnectionStatus> incomingConnections = context.findConnectionTo(componentId);
+        if (incomingConnections != null && !incomingConnections.isEmpty()) {
+            return null;
+        }
+
+        return super.analyze(context, event);
+    }
+
+    @Override
+    public ProvenanceEventType targetProvenanceEventType() {
+        return ProvenanceEventType.CREATE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Fetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Fetch.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Fetch.java
new file mode 100644
index 0000000..3fd9d70
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Fetch.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer.unknown;
+
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+/**
+ * Analyze a FETCH event and create 'nifi_data' when there is no specific Analyzer implementation found.
+ * <li>qualifiedName=NiFiComponentId@clusterName (example: processor GUID@cl1)
+ * <li>name=NiFiComponentType (example: FetchXXX)
+ */
+public class Fetch extends UnknownInput {
+    @Override
+    public ProvenanceEventType targetProvenanceEventType() {
+        return ProvenanceEventType.FETCH;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Receive.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Receive.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Receive.java
new file mode 100644
index 0000000..9ee2ded
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Receive.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer.unknown;
+
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+/**
+ * Analyze a RECEIVE event and create 'nifi_data' when there is no specific Analyzer implementation found.
+ * <li>qualifiedName=NiFiComponentId@clusterName (example: processor GUID@cl1)
+ * <li>name=NiFiComponentType (example: GetXXX)
+ */
+public class Receive extends UnknownInput {
+    @Override
+    public ProvenanceEventType targetProvenanceEventType() {
+        return ProvenanceEventType.RECEIVE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/RemoteInvocation.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/RemoteInvocation.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/RemoteInvocation.java
new file mode 100644
index 0000000..4d7d205
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/RemoteInvocation.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer.unknown;
+
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+/**
+ * Analyze a REMOTE_INVOCATION event and create 'nifi_data' when there is no specific Analyzer implementation found.
+ * <li>qualifiedName=NiFiComponentId@clusterName (example: processor GUID@cl1)
+ * <li>name=NiFiComponentType (example: XXX)
+ */
+public class RemoteInvocation extends UnknownOutput {
+    @Override
+    public ProvenanceEventType targetProvenanceEventType() {
+        return ProvenanceEventType.REMOTE_INVOCATION;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Send.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Send.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Send.java
new file mode 100644
index 0000000..81b4d6f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Send.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer.unknown;
+
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+/**
+ * Analyze a SEND event and create 'nifi_data' when there is no specific Analyzer implementation found.
+ * <li>qualifiedName=NiFiComponentId@clusterName (example: processor GUID@cl1)
+ * <li>name=NiFiComponentType (example: PutXXX)
+ */
+public class Send extends UnknownOutput {
+    @Override
+    public ProvenanceEventType targetProvenanceEventType() {
+        return ProvenanceEventType.SEND;
+    }
+}


Mime
View raw message