nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [14/18] nifi git commit: NIFI-3709: Export NiFi flow dataset lineage to Apache Atlas
Date Mon, 18 Dec 2017 17:25:08 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java
new file mode 100644
index 0000000..42e407d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer.unknown;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+
+public abstract class UnknownDataSet extends AbstractNiFiProvenanceEventAnalyzer {
+
+    protected static final String TYPE = "nifi_data";
+
+    protected Referenceable createDataSetRef(AnalysisContext context, ProvenanceEventRecord event) {
+        final Referenceable ref = new Referenceable(TYPE);
+        ref.set(ATTR_NAME, event.getComponentType());
+        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(context.getNiFiClusterName(), event.getComponentId()));
+        ref.set(ATTR_DESCRIPTION, event.getEventType() + " was performed by " + event.getComponentType());
+        return ref;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java
new file mode 100644
index 0000000..f16908b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer.unknown;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public abstract class UnknownInput extends UnknownDataSet {
+
+    @Override
+    public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
+
+        final String componentId = event.getComponentId();
+        final DataSetRefs refs = new DataSetRefs(componentId);
+        final Referenceable ref = createDataSetRef(context, event);
+        refs.addInput(ref);
+
+        return refs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java
new file mode 100644
index 0000000..5d564c2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer.unknown;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public abstract class UnknownOutput extends UnknownDataSet {
+
+    @Override
+    public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
+
+        final String componentId = event.getComponentId();
+        final DataSetRefs refs = new DataSetRefs(componentId);
+        final Referenceable ref = createDataSetRef(context, event);
+        refs.addOutput(ref);
+
+        return refs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
new file mode 100644
index 0000000..11d6e8b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiAtlasHook.NIFI_USER;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+
+public abstract class AbstractLineageStrategy implements LineageStrategy {
+
+    protected Logger logger = LoggerFactory.getLogger(getClass());
+    private LineageContext lineageContext;
+
+    public void setLineageContext(LineageContext lineageContext) {
+        this.lineageContext = lineageContext;
+    }
+
+    protected DataSetRefs executeAnalyzer(AnalysisContext analysisContext, ProvenanceEventRecord event) {
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(event.getComponentType(), event.getTransitUri(), event.getEventType());
+        if (analyzer == null) {
+            return null;
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("Analyzer {} is found for event: {}", analyzer, event);
+        }
+        return analyzer.analyze(analysisContext, event);
+    }
+
+    protected void addDataSetRefs(NiFiFlow nifiFlow, DataSetRefs refs) {
+
+        final Set<NiFiFlowPath> flowPaths = refs.getComponentIds().stream()
+                .map(componentId -> {
+                    final NiFiFlowPath flowPath = nifiFlow.findPath(componentId);
+                    if (flowPath == null) {
+                        logger.warn("FlowPath for {} was not found.", componentId);
+                    }
+                    return flowPath;
+                })
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+
+        addDataSetRefs(nifiFlow, flowPaths, refs);
+    }
+
+    protected void addDataSetRefs(NiFiFlow nifiFlow, Set<NiFiFlowPath> flowPaths, DataSetRefs refs) {
+        // create reference to NiFi flow path.
+        final Referenceable flowRef = toReferenceable(nifiFlow);
+        final String clusterName = nifiFlow.getClusterName();
+        final String url = nifiFlow.getUrl();
+
+        for (NiFiFlowPath flowPath : flowPaths) {
+            final Referenceable flowPathRef = toReferenceable(flowPath, flowRef, clusterName, url);
+            addDataSetRefs(refs, flowPathRef);
+        }
+    }
+
+    private Referenceable toReferenceable(NiFiFlow nifiFlow) {
+        final Referenceable flowRef = new Referenceable(TYPE_NIFI_FLOW);
+        flowRef.set(ATTR_NAME, nifiFlow.getFlowName());
+        flowRef.set(ATTR_QUALIFIED_NAME, nifiFlow.getQualifiedName());
+        flowRef.set(ATTR_URL, nifiFlow.getUrl());
+        return flowRef;
+    }
+
+    protected Referenceable toReferenceable(NiFiFlowPath flowPath, NiFiFlow nifiFlow) {
+        return toReferenceable(flowPath, toReferenceable(nifiFlow),
+                nifiFlow.getClusterName(), nifiFlow.getUrl());
+    }
+
+    private Referenceable toReferenceable(NiFiFlowPath flowPath, Referenceable flowRef, String clusterName, String nifiUrl) {
+        final Referenceable flowPathRef = new Referenceable(TYPE_NIFI_FLOW_PATH);
+        flowPathRef.set(ATTR_NAME, flowPath.getName());
+        flowPathRef.set(ATTR_QUALIFIED_NAME, flowPath.getId() + "@" + clusterName);
+        flowPathRef.set(ATTR_NIFI_FLOW, flowRef);
+        flowPathRef.set(ATTR_URL, flowPath.createDeepLinkURL(nifiUrl));
+        // Referenceable has to have GUID assigned, otherwise it will not be stored due to lack of required attribute.
+        // If a Referencible has GUID, Atlas does not validate all required attributes.
+        flowPathRef.set(ATTR_INPUTS, flowPath.getInputs().stream().map(this::toReferenceable).collect(Collectors.toList()));
+        flowPathRef.set(ATTR_OUTPUTS,  flowPath.getOutputs().stream().map(this::toReferenceable).collect(Collectors.toList()));
+        return flowPathRef;
+    }
+
+    private Referenceable toReferenceable(AtlasObjectId id) {
+        return StringUtils.isEmpty(id.getGuid())
+                ? new Referenceable(id.getTypeName(), id.getUniqueAttributes())
+                : new Referenceable(id.getGuid(), id.getTypeName(), id.getUniqueAttributes());
+    }
+
+    protected void createEntity(Referenceable ... entities) {
+        final HookNotification.EntityCreateRequest msg = new HookNotification.EntityCreateRequest(NIFI_USER, entities);
+        lineageContext.addMessage(msg);
+    }
+
+    @SuppressWarnings("unchecked")
+    private boolean addDataSetRefs(Set<Referenceable> refsToAdd, Referenceable nifiFlowPath, String targetAttribute) {
+        if (refsToAdd != null && !refsToAdd.isEmpty()) {
+
+            // If nifiFlowPath already has a given dataSetRef, then it needs not to be created.
+            final Function<Referenceable, String> toTypedQualifiedName = ref -> toTypedQualifiedName(ref.getTypeName(), toStr(ref.get(ATTR_QUALIFIED_NAME)));
+            final Collection<Referenceable> refs = Optional.ofNullable((Collection<Referenceable>) nifiFlowPath.get(targetAttribute)).orElseGet(ArrayList::new);
+            final Set<String> existingRefTypedQualifiedNames = refs.stream().map(toTypedQualifiedName).collect(Collectors.toSet());
+
+            refsToAdd.stream().filter(ref -> !existingRefTypedQualifiedNames.contains(toTypedQualifiedName.apply(ref)))
+                    .forEach(ref -> {
+                        if (ref.getId().isUnassigned()) {
+                            // Create new entity.
+                            logger.debug("Found a new DataSet reference from {} to {}, sending an EntityCreateRequest",
+                                    new Object[]{toTypedQualifiedName.apply(nifiFlowPath), toTypedQualifiedName.apply(ref)});
+                            final HookNotification.EntityCreateRequest createDataSet = new HookNotification.EntityCreateRequest(NIFI_USER, ref);
+                            lineageContext.addMessage(createDataSet);
+                        }
+                        refs.add(ref);
+                    });
+
+            if (refs.size() > existingRefTypedQualifiedNames.size()) {
+                // Something has been added.
+                nifiFlowPath.set(targetAttribute, refs);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    protected void addDataSetRefs(DataSetRefs dataSetRefs, Referenceable flowPathRef) {
+        final boolean inputsAdded = addDataSetRefs(dataSetRefs.getInputs(), flowPathRef, ATTR_INPUTS);
+        final boolean outputsAdded = addDataSetRefs(dataSetRefs.getOutputs(), flowPathRef, ATTR_OUTPUTS);
+        if (inputsAdded || outputsAdded) {
+            lineageContext.addMessage(new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
+                    ATTR_QUALIFIED_NAME, (String) flowPathRef.get(ATTR_QUALIFIED_NAME), flowPathRef));
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
new file mode 100644
index 0000000..4437bfc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+import org.apache.nifi.util.Tuple;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.zip.CRC32;
+
+import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+import static org.apache.nifi.provenance.ProvenanceEventType.DROP;
+
+public class CompleteFlowPathLineage extends AbstractLineageStrategy {
+
+    @Override
+    public ProvenanceEventType[] getTargetEventTypes() {
+        return new ProvenanceEventType[]{DROP};
+    }
+
+    @Override
+    public void processEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event) {
+        if (!ProvenanceEventType.DROP.equals(event.getEventType())) {
+            return;
+        }
+        final ComputeLineageResult lineage = analysisContext.queryLineage(event.getEventId());
+
+        // Construct a tree model to traverse backwards.
+        final Map<String, List<LineageNode>> lineageTree = new HashMap<>();
+        analyzeLineageTree(lineage, lineageTree);
+
+        final LineagePath lineagePath = new LineagePath();
+        extractLineagePaths(analysisContext, lineageTree, lineagePath, event);
+
+        analyzeLineagePath(analysisContext, lineagePath);
+
+        // Input and output data set are both required to report lineage.
+        List<Tuple<NiFiFlowPath, DataSetRefs>> createdFlowPaths = new ArrayList<>();
+        if (lineagePath.isComplete()) {
+            createCompleteFlowPath(nifiFlow, lineagePath, createdFlowPaths);
+            for (Tuple<NiFiFlowPath, DataSetRefs> createdFlowPath : createdFlowPaths) {
+                final NiFiFlowPath flowPath = createdFlowPath.getKey();
+                createEntity(toReferenceable(flowPath, nifiFlow));
+                addDataSetRefs(nifiFlow, Collections.singleton(flowPath), createdFlowPath.getValue());
+            }
+            createdFlowPaths.clear();
+        }
+    }
+
+    private List<LineageNode> findParentEvents(Map<String, List<LineageNode>> lineageTree, ProvenanceEventRecord event) {
+        List<LineageNode> parentNodes = lineageTree.get(String.valueOf(event.getEventId()));
+        return parentNodes == null || parentNodes.isEmpty() ? null : parentNodes.stream()
+                // In case it's not a provenance event (i.e. FLOWFILE_NODE), get one level higher parents.
+                .flatMap(n -> !LineageNodeType.PROVENANCE_EVENT_NODE.equals(n.getNodeType())
+                        ? lineageTree.get(n.getIdentifier()).stream() : Stream.of(n))
+                .collect(Collectors.toList());
+    }
+
+    private void extractLineagePaths(AnalysisContext context, Map<String, List<LineageNode>> lineageTree,
+                                     LineagePath lineagePath, ProvenanceEventRecord lastEvent) {
+
+        lineagePath.getEvents().add(lastEvent);
+        List<LineageNode> parentEvents = findParentEvents(lineageTree, lastEvent);
+
+        final boolean createSeparateParentPath = lineagePath.shouldCreateSeparatePath(lastEvent.getEventType());
+
+        if (createSeparateParentPath && (parentEvents == null || parentEvents.isEmpty())) {
+            // Try expanding the lineage.
+            // This is for the FlowFiles those are FORKed (or JOINed ... etc) other FlowFile(s).
+            // FlowFiles routed to 'original' may have these event types, too, however they have parents fetched together.
+
+            // For example, with these inputs: CREATE(F1), FORK (F1 -> F2, F3), DROP(F1), SEND (F2), SEND(F3), DROP(F2), DROP(F3)
+            // Then when DROP(F1) is queried, FORK(F1) and CREATE(F1) are returned.
+            // For DROP(F2), SEND(F2) and FORK(F2) are returned.
+            // For DROP(F3), SEND(F3) and FORK(F3) are returned.
+            // In this case, FORK(F2) and FORK(F3) have to query their parents again, to get CREATE(F1).
+            final ComputeLineageResult joinedParents = context.findParents(lastEvent.getEventId());
+            analyzeLineageTree(joinedParents, lineageTree);
+
+            parentEvents = findParentEvents(lineageTree, lastEvent);
+        }
+
+        if (parentEvents == null || parentEvents.isEmpty()) {
+            logger.debug("{} does not have any parent, stop extracting lineage path.", lastEvent);
+            return;
+        }
+
+        if (createSeparateParentPath) {
+            // Treat those as separated lineage_path
+            parentEvents.stream()
+                    .map(parentEvent -> context.getProvenanceEvent(Long.parseLong(parentEvent.getIdentifier())))
+                    .filter(Objects::nonNull)
+                    .forEach(parent -> {
+                        final LineagePath parentPath = new LineagePath();
+                        lineagePath.getParents().add(parentPath);
+                        extractLineagePaths(context, lineageTree, parentPath, parent);
+                    });
+        } else {
+            // Simply traverse upwards.
+            if (parentEvents.size() > 1) {
+                throw new IllegalStateException(String.format("Having more than 1 parents for event type %s" +
+                                " is not expected. Should ask NiFi developer for investigation. %s",
+                        lastEvent.getEventType(), lastEvent));
+            }
+            final ProvenanceEventRecord parentEvent = context.getProvenanceEvent(Long.parseLong(parentEvents.get(0).getIdentifier()));
+            if (parentEvent != null) {
+                extractLineagePaths(context, lineageTree, lineagePath, parentEvent);
+            }
+        }
+    }
+
+    private void analyzeLineagePath(AnalysisContext analysisContext, LineagePath lineagePath) {
+        final List<ProvenanceEventRecord> events = lineagePath.getEvents();
+
+        final DataSetRefs parentRefs = new DataSetRefs(events.get(0).getComponentId());
+        events.forEach(event -> {
+            final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+            if (refs == null || refs.isEmpty()) {
+                return;
+            }
+            refs.getInputs().forEach(parentRefs::addInput);
+            refs.getOutputs().forEach(parentRefs::addOutput);
+        });
+
+        lineagePath.setRefs(parentRefs);
+
+        // Analyse parents.
+        lineagePath.getParents().forEach(parent -> analyzeLineagePath(analysisContext, parent));
+    }
+
+    private void analyzeLineageTree(ComputeLineageResult lineage, Map<String, List<LineageNode>> lineageTree) {
+        lineage.getEdges().forEach(edge -> lineageTree
+                        .computeIfAbsent(edge.getDestination().getIdentifier(), k -> new ArrayList<>())
+                        .add(edge.getSource()));
+    }
+
+    /**
+     * Create a new FlowPath from a LineagePath. FlowPaths created by this method will have a hash in its qualified name.
+     *
+     * <p>This method processes parents first to generate a hash, as parent LineagePath hashes contribute child hash
+     * in order to distinguish FlowPaths based on the complete path for a given FlowFile.
+     * For example, even if two lineagePaths have identical componentIds/inputs/outputs,
+     * if those parents have different inputs, those should be treated as different paths.</p>
+     *
+     * @param nifiFlow A reference to current NiFiFlow
+     * @param lineagePath LineagePath from which NiFiFlowPath and DataSet refs are created and added to the {@code createdFlowPaths}.
+     * @param createdFlowPaths A list to buffer created NiFiFlowPaths,
+     *                         in order to defer sending notification to Kafka until all parent FlowPath get analyzed.
+     */
+    private void createCompleteFlowPath(NiFiFlow nifiFlow, LineagePath lineagePath, List<Tuple<NiFiFlowPath, DataSetRefs>> createdFlowPaths) {
+
+        final List<ProvenanceEventRecord> events = lineagePath.getEvents();
+        Collections.reverse(events);
+
+        final List<String> componentIds = events.stream().map(ProvenanceEventRecord::getComponentId).collect(Collectors.toList());
+        final String firstComponentId = events.get(0).getComponentId();
+        final DataSetRefs dataSetRefs = lineagePath.getRefs();
+
+        // Process parents first.
+        Referenceable queueBetweenParent = null;
+        if (!lineagePath.getParents().isEmpty()) {
+            // Add queue between this lineage path and parent.
+            queueBetweenParent = new Referenceable(TYPE_NIFI_QUEUE);
+            // The first event knows why this lineage has parents, e.g. FORK or JOIN.
+            final String firstEventType = events.get(0).getEventType().name();
+            queueBetweenParent.set(ATTR_NAME, firstEventType);
+            dataSetRefs.addInput(queueBetweenParent);
+
+            for (LineagePath parent : lineagePath.getParents()) {
+                parent.getRefs().addOutput(queueBetweenParent);
+                createCompleteFlowPath(nifiFlow, parent, createdFlowPaths);
+            }
+        }
+
+        // Create a variant path.
+        // Calculate a hash from component_ids and input and output resource ids.
+        final Stream<String> ioIds = Stream.concat(dataSetRefs.getInputs().stream(), dataSetRefs.getOutputs()
+                .stream()).map(ref -> toTypedQualifiedName(ref.getTypeName(), toStr(ref.get(ATTR_QUALIFIED_NAME))));
+
+        final Stream<String> parentHashes = lineagePath.getParents().stream().map(p -> String.valueOf(p.getLineagePathHash()));
+        final CRC32 crc32 = new CRC32();
+        crc32.update(Stream.of(componentIds.stream(), ioIds, parentHashes).reduce(Stream::concat).orElseGet(Stream::empty)
+                .sorted().distinct()
+                .collect(Collectors.joining(",")).getBytes(StandardCharsets.UTF_8));
+
+        final long hash = crc32.getValue();
+        lineagePath.setLineagePathHash(hash);
+        final NiFiFlowPath flowPath = new NiFiFlowPath(firstComponentId, hash);
+
+        // In order to differentiate a queue between parents and this flow_path, add the hash into the queue qname.
+        // E.g, FF1 and FF2 read from dirA were merged, vs FF3 and FF4 read from dirB were merged then passed here, these two should be different queue.
+        if (queueBetweenParent != null) {
+            queueBetweenParent.set(ATTR_QUALIFIED_NAME, toQualifiedName(nifiFlow.getClusterName(), firstComponentId + "::" + hash));
+        }
+
+        // If the same components emitted multiple provenance events consecutively, merge it to come up with a simpler name.
+        String previousComponentId = null;
+        List<ProvenanceEventRecord> uniqueEventsForName = new ArrayList<>();
+        for (ProvenanceEventRecord event : events) {
+            if (!event.getComponentId().equals(previousComponentId)) {
+                uniqueEventsForName.add(event);
+            }
+            previousComponentId = event.getComponentId();
+        }
+
+        final String pathName = uniqueEventsForName.stream()
+                // Processor name can be configured by user and more meaningful if available.
+                // If the component is already removed, it may not be available here.
+                .map(event -> nifiFlow.getProcessComponentName(event.getComponentId(), event::getComponentType))
+                .collect(Collectors.joining(", "));
+
+        flowPath.setName(pathName);
+        final NiFiFlowPath staticFlowPath = nifiFlow.findPath(firstComponentId);
+        flowPath.setGroupId(staticFlowPath != null ? staticFlowPath.getGroupId() : nifiFlow.getRootProcessGroupId());
+
+        // To defer send notification until entire lineagePath analysis gets finished, just add the instance into a buffer.
+        createdFlowPaths.add(new Tuple<>(flowPath, dataSetRefs));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java
new file mode 100644
index 0000000..060cfbe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.notification.hook.HookNotification;
+
+public interface LineageContext {
+    void addMessage(HookNotification.HookNotificationMessage message);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineagePath.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineagePath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineagePath.java
new file mode 100644
index 0000000..d1033f3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineagePath.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LineagePath {
+    private List<ProvenanceEventRecord> events = new ArrayList<>();
+    private List<LineagePath> parents = new ArrayList<>();
+    private DataSetRefs refs;
+    private long lineagePathHash;
+
+    /**
+     * NOTE: The list contains provenance events in reversed order, i.e. the last one first.
+     */
+    public List<ProvenanceEventRecord> getEvents() {
+        return events;
+    }
+
+    public List<LineagePath> getParents() {
+        return parents;
+    }
+
+    public DataSetRefs getRefs() {
+        return refs;
+    }
+
+    public void setRefs(DataSetRefs refs) {
+        this.refs = refs;
+    }
+
+    public boolean shouldCreateSeparatePath(ProvenanceEventType eventType) {
+        switch (eventType) {
+            case CLONE:
+            case JOIN:
+            case FORK:
+            case REPLAY:
+                return true;
+        }
+        return false;
+    }
+
+    public boolean isComplete() {
+        // If the FlowFile is DROPed right after create child FlowFile, then the path is not worth for reporting.
+        final boolean isDroppedImmediately = events.size() == 2
+                && events.get(0).getEventType().equals(ProvenanceEventType.DROP)
+                && shouldCreateSeparatePath(events.get(1).getEventType());
+        return !isDroppedImmediately && hasInput() && hasOutput();
+    }
+
+    public boolean hasInput() {
+        return (refs != null && !refs.getInputs().isEmpty()) || parents.stream().anyMatch(parent -> parent.hasInput());
+    }
+
+    public boolean hasOutput() {
+        return (refs != null && !refs.getOutputs().isEmpty()) || parents.stream().anyMatch(parent -> parent.hasOutput());
+    }
+
+    public long getLineagePathHash() {
+        return lineagePathHash;
+    }
+
+    public void setLineagePathHash(long lineagePathHash) {
+        this.lineagePathHash = lineagePathHash;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageStrategy.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageStrategy.java
new file mode 100644
index 0000000..bf1139b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+public interface LineageStrategy {
+
+    default ProvenanceEventType[] getTargetEventTypes(){
+        return new ProvenanceEventType[0];
+    }
+
+    void setLineageContext(LineageContext lineageContext);
+
+    void processEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
new file mode 100644
index 0000000..7ecbed5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+    @Override
+    public void processEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event) {
+        final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+        if (refs == null || (refs.isEmpty())) {
+            return;
+        }
+
+        if ("Remote Input Port".equals(event.getComponentType()) || "Remote Output Port".equals(event.getComponentType())) {
+            processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+        } else {
+            addDataSetRefs(nifiFlow, refs);
+        }
+
+    }
+
+    /**
+     * Create a flow_path entity corresponding to the target RemoteGroupPort when a SEND/RECEIVE event are received.
+     * Because such entity can not be created in advance while analyzing flow statically,
+     * as ReportingTask can not determine whether a component id is a RemoteGroupPort,
+     * since connectionStatus is the only available information in ReportingContext.
+     * ConnectionStatus only knows component id, component type is unknown.
+     * For example, there is no difference to tell if a connected component is a funnel or a RemoteGroupPort.
+     */
+    private void processRemotePortEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+        final boolean isRemoteInputPort = "Remote Input Port".equals(event.getComponentType());
+
+        // Create a RemoteInputPort Process.
+        // event.getComponentId returns UUID for RemoteGroupPort as a client of S2S, and it's different from a remote port UUID (portDataSetid).
+        // See NIFI-4571 for detail.
+        final Referenceable remotePortDataSet = isRemoteInputPort ? analyzedRefs.getOutputs().iterator().next() :  analyzedRefs.getInputs().iterator().next();
+        final String portProcessId = event.getComponentId();
+
+        final NiFiFlowPath remotePortProcess = new NiFiFlowPath(portProcessId);
+        remotePortProcess.setName(event.getComponentType());
+        remotePortProcess.addProcessor(portProcessId);
+
+        // For RemoteInputPort, need to find the previous component connected to this port,
+        // which passed this particular FlowFile.
+        // That is only possible by calling lineage API.
+        if (isRemoteInputPort) {
+            final ProvenanceEventRecord previousEvent = findPreviousProvenanceEvent(analysisContext, event);
+            if (previousEvent == null) {
+                logger.warn("Previous event was not found: {}", new Object[]{event});
+                return;
+            }
+
+            // Set groupId from incoming connection if available.
+            final List<ConnectionStatus> incomingConnections = nifiFlow.getIncomingConnections(portProcessId);
+            if (incomingConnections == null || incomingConnections.isEmpty()) {
+                logger.warn("Incoming relationship was not found: {}", new Object[]{event});
+                return;
+            }
+
+            final ConnectionStatus connection = incomingConnections.get(0);
+            remotePortProcess.setGroupId(connection.getGroupId());
+
+            final Referenceable remotePortProcessRef = toReferenceable(remotePortProcess, nifiFlow);
+            createEntity(remotePortProcessRef);
+
+            // Create a queue.
+            Referenceable queueFromStaticFlowPathToRemotePortProcess = new Referenceable(TYPE_NIFI_QUEUE);
+            queueFromStaticFlowPathToRemotePortProcess.set(ATTR_NAME, "queue");
+            queueFromStaticFlowPathToRemotePortProcess.set(ATTR_QUALIFIED_NAME, nifiFlow.toQualifiedName(portProcessId));
+
+            // Create lineage: Static flow_path -> queue
+            DataSetRefs staticFlowPathRefs = new DataSetRefs(previousEvent.getComponentId());
+            staticFlowPathRefs.addOutput(queueFromStaticFlowPathToRemotePortProcess);
+            addDataSetRefs(nifiFlow, staticFlowPathRefs);
+
+
+            // Create lineage: Queue -> RemoteInputPort process -> RemoteInputPort dataSet
+            DataSetRefs remotePortRefs = new DataSetRefs(portProcessId);
+            remotePortRefs.addInput(queueFromStaticFlowPathToRemotePortProcess);
+            remotePortRefs.addOutput(remotePortDataSet);
+            addDataSetRefs(remotePortRefs, remotePortProcessRef);
+
+        } else {
+            // For RemoteOutputPort, it's possible that multiple processors are connected.
+            // In that case, the received FlowFile is cloned and passed to each connection.
+            // So we need to create multiple DataSetRefs.
+            final List<ConnectionStatus> connections = nifiFlow.getOutgoingConnections(portProcessId);
+            if (connections == null || connections.isEmpty()) {
+                logger.warn("Incoming connection was not found: {}", new Object[]{event});
+                return;
+            }
+
+            // Set group id from outgoing connection if available.
+            remotePortProcess.setGroupId(connections.get(0).getGroupId());
+
+            final Referenceable remotePortProcessRef = toReferenceable(remotePortProcess, nifiFlow);
+            createEntity(remotePortProcessRef);
+
+            // Create lineage: RemoteOutputPort dataSet -> RemoteOutputPort process
+            DataSetRefs remotePortRefs = new DataSetRefs(portProcessId);
+            remotePortRefs.addInput(remotePortDataSet);
+            addDataSetRefs(remotePortRefs, remotePortProcessRef);
+
+            for (ConnectionStatus connection : connections) {
+                final String destinationId = connection.getDestinationId();
+                final NiFiFlowPath destFlowPath = nifiFlow.findPath(destinationId);
+                if (destFlowPath == null) {
+                    // If the destination of a connection is a Remote Input Port,
+                    // then its corresponding flow path may not be created yet.
+                    // In such direct RemoteOutputPort to RemoteInputPort case, do not add a queue from this RemoteOutputPort
+                    // as a queue will be created by the connected RemoteInputPort to connect this RemoteOutputPort.
+                    continue;
+                }
+
+                // Create a queue.
+                Referenceable queueFromRemotePortProcessToStaticFlowPath = new Referenceable(TYPE_NIFI_QUEUE);
+                queueFromRemotePortProcessToStaticFlowPath.set(ATTR_NAME, "queue");
+                queueFromRemotePortProcessToStaticFlowPath.set(ATTR_QUALIFIED_NAME, nifiFlow.toQualifiedName(destinationId));
+
+                // Create lineage: Queue -> Static flow_path
+                DataSetRefs staticFlowPathRefs = new DataSetRefs(destinationId);
+                staticFlowPathRefs.addInput(queueFromRemotePortProcessToStaticFlowPath);
+                addDataSetRefs(nifiFlow, staticFlowPathRefs);
+
+                // Create lineage: RemoteOutputPort dataSet -> RemoteOutputPort process -> Queue
+                remotePortRefs.addOutput(queueFromRemotePortProcessToStaticFlowPath);
+                addDataSetRefs(remotePortRefs, remotePortProcessRef);
+            }
+
+            // Add RemoteOutputPort process, so that it can be found even if it is connected to RemoteInputPort directory without any processor in between.
+            nifiFlow.getFlowPaths().put(remotePortProcess.getId(), remotePortProcess);
+
+        }
+
+    }
+
+    private ProvenanceEventRecord findPreviousProvenanceEvent(AnalysisContext context, ProvenanceEventRecord event) {
+        final ComputeLineageResult lineage = context.queryLineage(event.getEventId());
+        if (lineage == null) {
+            logger.warn("Lineage was not found: {}", new Object[]{event});
+            return null;
+        }
+
+        // If no previous provenance node found due to expired or other reasons, just log a warning msg and do nothing.
+        final LineageNode previousProvenanceNode = traverseLineage(lineage, String.valueOf(event.getEventId()));
+        if (previousProvenanceNode == null) {
+            logger.warn("Traverse lineage could not find any preceding provenance event node: {}", new Object[]{event});
+            return null;
+        }
+
+        final long previousEventId = Long.parseLong(previousProvenanceNode.getIdentifier());
+        return context.getProvenanceEvent(previousEventId);
+    }
+
+    /**
+     * Recursively traverse lineage graph until a preceding provenance event is found.
+     */
+    private LineageNode traverseLineage(ComputeLineageResult lineage, String eventId) {
+        final LineageNode previousNode = lineage.getEdges().stream()
+                .filter(edge -> edge.getDestination().getIdentifier().equals(String.valueOf(eventId)))
+                .findFirst().map(LineageEdge::getSource).orElse(null);
+        if (previousNode == null) {
+            return null;
+        }
+        if (previousNode.getNodeType().equals(LineageNodeType.PROVENANCE_EVENT_NODE)) {
+            return previousNode;
+        }
+        return traverseLineage(lineage, previousNode.getIdentifier());
+    }
+
+
+}


Mime
View raw message