nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [06/18] nifi git commit: NIFI-3709: Export NiFi flow dataset lineage to Apache Atlas
Date Mon, 18 Dec 2017 17:25:00 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
new file mode 100644
index 0000000..2fe7d07
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
@@ -0,0 +1,1233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import org.apache.nifi.atlas.emulator.AtlasAPIV2ServerEmulator;
+import org.apache.nifi.atlas.emulator.Lineage;
+import org.apache.nifi.atlas.emulator.Node;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.lineage.EdgeNode;
+import org.apache.nifi.provenance.lineage.EventNode;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockPropertyValue;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.Attributes;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.Locator;
+import org.xml.sax.SAXException;
+import org.xml.sax.XMLReader;
+
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+import java.util.function.BiConsumer;
+
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_NIFI_URL;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_PASSWORD;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_URLS;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_USER;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.LINEAGE_STRATEGY_COMPLETE_PATH;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_LINEAGE_STRATEGY;
+import static org.apache.nifi.atlas.reporting.SimpleProvenanceRecord.pr;
+import static org.apache.nifi.provenance.ProvenanceEventType.ATTRIBUTES_MODIFIED;
+import static org.apache.nifi.provenance.ProvenanceEventType.CREATE;
+import static org.apache.nifi.provenance.ProvenanceEventType.DROP;
+import static org.apache.nifi.provenance.ProvenanceEventType.FORK;
+import static org.apache.nifi.provenance.ProvenanceEventType.JOIN;
+import static org.apache.nifi.provenance.ProvenanceEventType.RECEIVE;
+import static org.apache.nifi.provenance.ProvenanceEventType.REMOTE_INVOCATION;
+import static org.apache.nifi.provenance.ProvenanceEventType.SEND;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ITReportLineageToAtlas {
+
+    private static final Logger logger = LoggerFactory.getLogger(ITReportLineageToAtlas.class);
+
+    private ProcessGroupStatus loadTemplate(String name) {
+
+        final SAXParserFactory saxParserFactory = SAXParserFactory.newInstance();
+        final SAXParser saxParser;
+        try {
+            saxParser = saxParserFactory.newSAXParser();
+        } catch (ParserConfigurationException|SAXException e) {
+            throw new RuntimeException("Failed to create a SAX parser", e);
+        }
+
+        final XMLReader xmlReader;
+        try {
+            xmlReader = saxParser.getXMLReader();
+        } catch (SAXException e) {
+            throw new RuntimeException("Failed to create a XML reader", e);
+        }
+
+        final String template = ITReportLineageToAtlas.class.getResource("/flow-templates/" + name + ".xml").getPath();
+        final TemplateContentHander handler = new TemplateContentHander(name);
+        xmlReader.setContentHandler(handler);
+        try {
+            xmlReader.parse(template);
+        } catch (IOException|SAXException e) {
+            throw new RuntimeException("Failed to parse template", e);
+        }
+
+        return handler.getRootProcessGroupStatus();
+    }
+
+    private static class TemplateContentHander implements ContentHandler {
+
+        private static class Context {
+            private boolean isConnectionSource;
+            private boolean isConnectionDestination;
+            private boolean isRemoteProcessGroup;
+            private Stack<String> stack = new Stack<>();
+        }
+
+        private final Map<Class, BiConsumer<Object, String>> nameSetters = new HashMap<>();
+        private final Map<Class, BiConsumer<Object, String>> idSetters = new HashMap<>();
+        private final Map<String, Map<Class, BiConsumer<Object, String>>> setters = new HashMap<>();
+        private final Context context = new Context();
+
+        private BiConsumer<Object, String> s(String tag, BiConsumer<Object, String> setter) {
+            return (o, s) -> {
+                // Only apply the function when the element is the first level child.
+                // In order to avoid different 'name', 'id' or other common tags overwriting values.
+                if (tag.equals(context.stack.get(context.stack.size() - 2))) {
+                    setter.accept(o, s);
+                }
+            };
+        }
+
+        public TemplateContentHander(String name) {
+            rootPgStatus = new ProcessGroupStatus();
+            rootPgStatus.setId(name);
+            rootPgStatus.setName(name);
+            pgStatus = rootPgStatus;
+            current = rootPgStatus;
+            pgStack.push(rootPgStatus);
+
+            setters.put("id", idSetters);
+            setters.put("name", nameSetters);
+
+            idSetters.put(ProcessGroupStatus.class, s("processGroups",
+                    (o, id) -> ((ProcessGroupStatus) o).setId(id)));
+            idSetters.put(ProcessorStatus.class, s("processors",
+                    (o, id) -> ((ProcessorStatus) o).setId(id)));
+
+            idSetters.put(PortStatus.class, (o, id) -> ((PortStatus) o).setId(id));
+
+            idSetters.put(ConnectionStatus.class, (o, id) -> {
+                if (context.isConnectionSource) {
+                    ((ConnectionStatus) o).setSourceId(id);
+                } else if (context.isConnectionDestination) {
+                    ((ConnectionStatus) o).setDestinationId(id);
+                } else {
+                    ((ConnectionStatus) o).setId(id);
+                }
+            });
+
+            nameSetters.put(ProcessGroupStatus.class, s("processGroups",
+                    (o, n) -> ((ProcessGroupStatus) o).setName(n)));
+
+            nameSetters.put(ProcessorStatus.class, s("processors",
+                    (o, n) -> ((ProcessorStatus) o).setName(n)));
+
+            nameSetters.put(PortStatus.class, (o, n) -> ((PortStatus) o).setName(n));
+
+            nameSetters.put(ConnectionStatus.class, s("connections",
+                    (o, n) -> ((ConnectionStatus) o).setName(n)));
+        }
+
+        private ProcessGroupStatus rootPgStatus;
+        private ProcessGroupStatus parentPgStatus;
+        private Stack<ProcessGroupStatus> pgStack = new Stack<>();
+        private ProcessGroupStatus pgStatus;
+        private ProcessorStatus processorStatus;
+        private PortStatus portStatus;
+        private ConnectionStatus connectionStatus;
+        private Object current;
+        private StringBuffer stringBuffer;
+        private Map<String, String> componentNames = new HashMap<>();
+
+        public ProcessGroupStatus getRootProcessGroupStatus() {
+            return rootPgStatus;
+        }
+
+        @Override
+        public void setDocumentLocator(Locator locator) {
+
+        }
+
+        @Override
+        public void startDocument() throws SAXException {
+        }
+
+        private void setConnectionNames(ProcessGroupStatus pg) {
+            pg.getConnectionStatus().forEach(c -> setConnectionName(c));
+            pg.getProcessGroupStatus().forEach(child -> setConnectionNames(child));
+        }
+
+        private void setConnectionName(ConnectionStatus c) {
+            if (c.getSourceName() == null || c.getSourceName().isEmpty()) {
+                c.setSourceName(componentNames.get(c.getSourceId()));
+            }
+            if (c.getDestinationName() == null || c.getDestinationName().isEmpty()) {
+                c.setDestinationName(componentNames.get(c.getDestinationId()));
+            }
+        }
+
+        @Override
+        public void endDocument() throws SAXException {
+            setConnectionNames(rootPgStatus);
+            System.out.println("rootPgStatus=" + rootPgStatus);
+        }
+
+        @Override
+        public void startPrefixMapping(String prefix, String uri) throws SAXException {
+
+        }
+
+        @Override
+        public void endPrefixMapping(String prefix) throws SAXException {
+
+        }
+
+        @Override
+        public void startElement(String uri, String localName, String qName, Attributes atts) throws SAXException {
+            // Clear flags.
+            stringBuffer = new StringBuffer();
+
+            switch (qName) {
+                case "processGroups":
+                    if (pgStatus != null) {
+                        pgStack.push(pgStatus);
+                    }
+                    parentPgStatus = pgStatus;
+                    pgStatus = new ProcessGroupStatus();
+                    current = pgStatus;
+                    if (parentPgStatus != null) {
+                        parentPgStatus.getProcessGroupStatus().add(pgStatus);
+                    }
+                    break;
+
+                case "processors":
+                    processorStatus = new ProcessorStatus();
+                    current = processorStatus;
+                    pgStatus.getProcessorStatus().add(processorStatus);
+                    break;
+
+                case "inputPorts":
+                case "outputPorts":
+                    portStatus = new PortStatus();
+                    current = portStatus;
+                    if (!context.isRemoteProcessGroup) {
+                        ("inputPorts".equals(qName)
+                                ? pgStatus.getInputPortStatus()
+                                : pgStatus.getOutputPortStatus())
+                                .add(portStatus);
+                    }
+                    break;
+
+                case "connections":
+                    connectionStatus = new ConnectionStatus();
+                    current = connectionStatus;
+                    pgStatus.getConnectionStatus().add(connectionStatus);
+                    context.isConnectionSource = false;
+                    context.isConnectionDestination = false;
+                    break;
+
+                case "source":
+                    if (current instanceof ConnectionStatus) {
+                        context.isConnectionSource = true;
+                    }
+                    break;
+                case "destination":
+                    if (current instanceof ConnectionStatus) {
+                        context.isConnectionDestination = true;
+                    }
+                    break;
+
+                case "remoteProcessGroups":
+                    context.isRemoteProcessGroup = true;
+                    break;
+            }
+            context.stack.push(qName);
+
+        }
+
+        @Override
+        public void endElement(String uri, String localName, String qName) throws SAXException {
+            switch (qName) {
+                case "processGroups":
+                    // At this point pgStatus has id assigned. Set group id of each component within this pg.
+                    pgStatus.getProcessorStatus().forEach(s -> s.setGroupId(pgStatus.getId()));
+                    pgStatus.getInputPortStatus().forEach(s -> s.setGroupId(pgStatus.getId()));
+                    pgStatus.getOutputPortStatus().forEach(s -> s.setGroupId(pgStatus.getId()));
+                    pgStatus.getConnectionStatus().forEach(s -> s.setGroupId(pgStatus.getId()));
+
+                    // Put the previous ProcessGroup back to current.
+                    pgStatus = pgStack.isEmpty() ? null : pgStack.pop();
+                    current = pgStatus;
+                    break;
+                case "processors":
+                case "connections":
+                    current = pgStatus;
+                    break;
+                case "inputPorts":
+                case "outputPorts":
+                    current = pgStatus;
+                    if (context.isRemoteProcessGroup) {
+                        componentNames.put(portStatus.getId(), portStatus.getName());
+                    }
+                    break;
+                case "id":
+                case "name":
+                    if (current != null) {
+                        final BiConsumer<Object, String> setter = setters.get(qName).get(current.getClass());
+                        if (setter == null) {
+                            throw new RuntimeException(qName + " setter was not found: " + current.getClass());
+                        }
+                        setter.accept(current, stringBuffer.toString());
+                    }
+                    break;
+                case "remoteProcessGroups":
+                    context.isRemoteProcessGroup = false;
+                    break;
+            }
+            context.stack.pop();
+        }
+
+        @Override
+        public void characters(char[] ch, int start, int length) throws SAXException {
+                stringBuffer.append(ch, start, length);
+        }
+
+        @Override
+        public void ignorableWhitespace(char[] ch, int start, int length) throws SAXException {
+
+        }
+
+        @Override
+        public void processingInstruction(String target, String data) throws SAXException {
+
+        }
+
+        @Override
+        public void skippedEntity(String name) throws SAXException {
+
+        }
+    }
+
+    private static String TARGET_ATLAS_URL = "http://localhost:21000";
+
+    private Stack<Long> requestedLineageComputationIds = new Stack<>();
+    private Stack<Long> requestedExpandParentsIds = new Stack<>();
+
+    private class TestConfiguration {
+        private final ProcessGroupStatus rootPgStatus;
+        private final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        private final ProvenanceRecords provenanceRecords = new ProvenanceRecords();
+        private final Map<Long, ComputeLineageResult> lineageResults = new HashMap<>();
+        private final Map<Long, ComputeLineageResult> parentLineageResults = new HashMap<>();
+
+        private TestConfiguration(String templateName) {
+            this.rootPgStatus = loadTemplate(templateName);
+        }
+
+        private void addLineage(ComputeLineageResult lineage) {
+            lineage.getNodes().forEach(n -> lineageResults.put(Long.parseLong(n.getIdentifier()), lineage));
+        }
+    }
+
+    private void test(TestConfiguration tc) throws InitializationException, IOException {
+        final ReportLineageToAtlas reportingTask = new ReportLineageToAtlas();
+        final MockComponentLog logger = new MockComponentLog("reporting-task-id", reportingTask);
+
+        final ReportingInitializationContext initializationContext = mock(ReportingInitializationContext.class);
+        when(initializationContext.getLogger()).thenReturn(logger);
+        final ConfigurationContext configurationContext = new MockConfigurationContext(tc.properties, null);
+        final ValidationContext validationContext = mock(ValidationContext.class);
+        when(validationContext.getProperty(any())).then(invocation -> new MockPropertyValue(tc.properties.get(invocation.getArguments()[0])));
+        final ReportingContext reportingContext = mock(ReportingContext.class);
+        final MockStateManager stateManager = new MockStateManager(reportingTask);
+        final EventAccess eventAccess = mock(EventAccess.class);
+        when(reportingContext.getProperties()).thenReturn(tc.properties);
+        when(reportingContext.getProperty(any())).then(invocation -> new MockPropertyValue(tc.properties.get(invocation.getArguments()[0])));
+        when(reportingContext.getStateManager()).thenReturn(stateManager);
+        when(reportingContext.getEventAccess()).thenReturn(eventAccess);
+        when(eventAccess.getGroupStatus(eq("root"))).thenReturn(tc.rootPgStatus);
+
+        final ProvenanceRepository provenanceRepository = mock(ProvenanceRepository.class);
+        when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
+        when(eventAccess.getProvenanceEvents(eq(-1L), anyInt())).thenReturn(tc.provenanceRecords);
+        when(provenanceRepository.getMaxEventId()).thenReturn((long) tc.provenanceRecords.size() - 1);
+        when(provenanceRepository.getEvent(anyLong())).then(invocation -> tc.provenanceRecords.get(((Long) invocation.getArguments()[0]).intValue()));
+
+        // To mock this async method invocations, keep the requested event ids in a stack.
+        final ComputeLineageSubmission lineageComputationSubmission = mock(ComputeLineageSubmission.class);
+        when(provenanceRepository.submitLineageComputation(anyLong(), any())).thenAnswer(invocation -> {
+            requestedLineageComputationIds.push((Long) invocation.getArguments()[0]);
+            return lineageComputationSubmission;
+        });
+        when(lineageComputationSubmission.getResult()).then(invocation -> tc.lineageResults.get(requestedLineageComputationIds.pop()));
+
+        final ComputeLineageSubmission expandParentsSubmission = mock(ComputeLineageSubmission.class);
+        when(provenanceRepository.submitExpandParents(anyLong(), any())).thenAnswer(invocation -> {
+            requestedExpandParentsIds.push(((Long) invocation.getArguments()[0]));
+            return expandParentsSubmission;
+        });
+        when(expandParentsSubmission.getResult()).then(invocation -> tc.parentLineageResults.get(requestedExpandParentsIds.pop()));
+
+        tc.properties.put(ATLAS_NIFI_URL, "http://localhost:8080/nifi");
+        tc.properties.put(ATLAS_URLS, TARGET_ATLAS_URL);
+        tc.properties.put(ATLAS_USER, "admin");
+        tc.properties.put(ATLAS_PASSWORD, "admin");
+        tc.properties.put(new PropertyDescriptor.Builder().name("hostnamePattern.example").dynamic(true).build(), ".*");
+
+
+        reportingTask.initialize(initializationContext);
+        reportingTask.validate(validationContext);
+        reportingTask.setup(configurationContext);
+        reportingTask.onTrigger(reportingContext);
+        reportingTask.onUnscheduled();
+    }
+
+    private boolean useEmbeddedEmulator;
+    private AtlasAPIV2ServerEmulator atlasAPIServer;
+
+    public ITReportLineageToAtlas() {
+        useEmbeddedEmulator = Boolean.valueOf(System.getenv("useEmbeddedEmulator"));
+        if (useEmbeddedEmulator) {
+            atlasAPIServer = new AtlasAPIV2ServerEmulator();
+        }
+    }
+
+    @Before
+    public void startEmulator() throws Exception {
+        if (useEmbeddedEmulator) {
+            atlasAPIServer.start();
+        } else {
+            // Clear existing entities.
+            URL url = new URL(TARGET_ATLAS_URL + "/api/atlas/v2/entity/bulk/");
+            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+            conn.setDoOutput(true);
+            conn.setRequestMethod("DELETE");
+            conn.connect();
+            conn.getResponseCode();
+            conn.disconnect();
+        }
+    }
+
+    @After
+    public void stopEmulator() throws Exception {
+        if (useEmbeddedEmulator) {
+            atlasAPIServer.stop();
+        }
+    }
+
+    private static class ProvenanceRecords extends ArrayList<ProvenanceEventRecord> {
+        @Override
+        public boolean add(ProvenanceEventRecord record) {
+            ((SimpleProvenanceRecord) record).setEventId(size());
+            return super.add(record);
+        }
+    }
+
+
+    private Lineage getLineage() throws Exception {
+        final URL url = new URL("http://localhost:21000/api/atlas/v2/debug/lineage/");
+        try (InputStream in = url.openStream()) {
+            Lineage lineage = new ObjectMapper().reader().withType(Lineage.class).readValue(in);
+            return lineage;
+        }
+    }
+
+    private void waitNotificationsGetDelivered() throws InterruptedException {
+        Thread.sleep(3_000);
+    }
+
+    @Test
+    public void testSimplestPath() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("SimplestFlowPath");
+        test(tc);
+
+        final Lineage lineage = getLineage();
+        lineage.assertLink("nifi_flow", "SimplestFlowPath", "SimplestFlowPath@example",
+                "nifi_flow_path", "GenerateFlowFile, LogAttribute", "d270e6f0-c5e0-38b9");
+    }
+
+    @Test
+    public void testSingleFlowPath() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("SingleFlowPath");
+        final ProvenanceRecords prs = tc.provenanceRecords;
+        prs.add(pr("2e9a2852-228f-379b", "ConsumeKafka_0_11", RECEIVE, "PLAINTEXT://0.kafka.example.com:6667/topic-a"));
+        prs.add(pr("5a56149a-d82a-3242", "PublishKafka_0_11", SEND, "PLAINTEXT://0.kafka.example.com:6667/topic-b"));
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+        final Node flow = lineage.findNode("nifi_flow", "SingleFlowPath", "SingleFlowPath@example");
+        final Node path = lineage.findNode("nifi_flow_path",
+                "ConsumeKafka_0_11, UpdateAttribute, ConvertJSONToSQL, PutSQL, PublishKafka_0_11",
+                "2e9a2852-228f-379b");
+        final Node topicA = lineage.findNode("kafka_topic", "topic-a@example");
+        final Node topicB = lineage.findNode("kafka_topic", "topic-b@example");
+        lineage.assertLink(flow, path);
+        lineage.assertLink(topicA, path);
+        lineage.assertLink(path, topicB);
+    }
+
+    @Test
+    public void testMultipleProcessGroups() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("MultipleProcessGroups");
+        final ProvenanceRecords prs = tc.provenanceRecords;
+        prs.add(pr("989dabb7-54b9-3c78", "ConsumeKafka_0_11", RECEIVE, "PLAINTEXT://0.kafka.example.com:6667/nifi-test"));
+        prs.add(pr("767c7bd6-75e3-3f32", "PutHDFS", SEND, "hdfs://nn1.example.com:8020/user/nifi/5262553828219"));
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+
+        final Node flow = lineage.findNode("nifi_flow", "MultipleProcessGroups", "MultipleProcessGroups@example");
+        final Node path = lineage.findNode("nifi_flow_path",
+                "ConsumeKafka_0_11, UpdateAttribute, PutHDFS",
+                "989dabb7-54b9-3c78");
+        final Node kafkaTopic = lineage.findNode("kafka_topic", "nifi-test@example");
+        final Node hdfsPath = lineage.findNode("hdfs_path", "/user/nifi/5262553828219@example");
+        lineage.assertLink(flow, path);
+        lineage.assertLink(kafkaTopic, path);
+        lineage.assertLink(path, hdfsPath);
+
+    }
+
+    private EdgeNode createEdge(ProvenanceRecords prs, int srcIdx, int tgtIdx) {
+        // Generate C created a FlowFile
+        final ProvenanceEventRecord srcR = prs.get(srcIdx);
+        // Then Remote Input Port sent it
+        final ProvenanceEventRecord tgtR = prs.get(tgtIdx);
+        final EventNode src = new EventNode(srcR);
+        final EventNode tgt = new EventNode(tgtR);
+        final EdgeNode edge = new EdgeNode(srcR.getComponentType() + " to " + tgtR.getEventType(), src, tgt);
+        return edge;
+    }
+
+    private ComputeLineageResult createLineage(ProvenanceRecords prs, int ... indices) throws InterruptedException {
+        final ComputeLineageResult lineage = mock(ComputeLineageResult.class);
+        when(lineage.awaitCompletion(anyLong(), any())).thenReturn(true);
+        final List<LineageEdge> edges = new ArrayList<>();
+        final Set<LineageNode> nodes = new LinkedHashSet<>();
+        for (int i = 0; i < indices.length - 1; i++) {
+            final EdgeNode edge = createEdge(prs, indices[i], indices[i + 1]);
+            edges.add(edge);
+            nodes.add(edge.getSource());
+            nodes.add(edge.getDestination());
+        }
+        when(lineage.getEdges()).thenReturn(edges);
+        when(lineage.getNodes()).thenReturn(new ArrayList<>(nodes));
+        return lineage;
+    }
+
+    private ComputeLineageResult compositeLineages(ComputeLineageResult ... results) throws InterruptedException {
+        final ComputeLineageResult lineage = mock(ComputeLineageResult.class);
+        when(lineage.awaitCompletion(anyLong(), any())).thenReturn(true);
+        final List<LineageEdge> edges = new ArrayList<>();
+        final Set<LineageNode> nodes = new LinkedHashSet<>();
+        for (int i = 0; i < results.length; i++) {
+            edges.addAll(results[i].getEdges());
+            nodes.addAll(results[i].getNodes());
+        }
+        when(lineage.getEdges()).thenReturn(edges);
+        when(lineage.getNodes()).thenReturn(new ArrayList<>(nodes));
+        return lineage;
+    }
+
+    /**
+     * A client NiFi sends FlowFiles to a remote NiFi.
+     */
+    private void testS2SSend(TestConfiguration tc) throws Exception {
+        final ProvenanceRecords prs = tc.provenanceRecords;
+        prs.add(pr("ca71e4d9-2a4f-3970", "Generate A", CREATE));
+        prs.add(pr("c439cdca-e989-3491", "Generate C", CREATE));
+        prs.add(pr("b775b657-5a5b-3708", "GetTwitter", CREATE));
+
+        // The remote port GUID is different than the Remote Input Ports.
+        prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", SEND,
+                "http://nifi.example.com:8080/nifi-api/data-transfer/input-ports" +
+                        "/77919f59-533e-35a3-0000-000000000000/transactions/tx-1/flow-files"));
+
+        prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", SEND,
+                "http://nifi.example.com:8080/nifi-api/data-transfer/input-ports" +
+                        "/77919f59-533e-35a3-0000-000000000000/transactions/tx-2/flow-files"));
+
+        prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", DROP)); // C
+        prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", DROP)); // Twitter
+
+        // Generate C created a FlowFile, then it's sent via S2S
+        tc.addLineage(createLineage(prs, 1, 3, 5));
+        // GetTwitter created a FlowFile, then it's sent via S2S
+        tc.addLineage(createLineage(prs, 2, 4, 6));
+
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+
+        final Node flow = lineage.findNode("nifi_flow", "S2SSend", "S2SSend@example");
+        final Node pathA = lineage.findNode("nifi_flow_path", "Generate A", "ca71e4d9-2a4f-3970");
+        final Node pathB = lineage.findNode("nifi_flow_path", "Generate B", "333255b6-eb02-3056");
+        final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491");
+        final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708");
+        final Node pathI = lineage.findNode("nifi_flow_path", "InactiveProcessor", "7033f311-ac68-3cab");
+        // UpdateAttribute has multiple incoming paths, so it generates a queue to receive those.
+        final Node queueU = lineage.findNode("nifi_queue", "queue", "c5392447-e9f1-33ad");
+        final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "c5392447-e9f1-33ad");
+
+        // These are starting paths.
+        lineage.assertLink(flow, pathA);
+        lineage.assertLink(flow, pathB);
+        lineage.assertLink(flow, pathC);
+        lineage.assertLink(flow, pathT);
+        lineage.assertLink(flow, pathI);
+
+        // Multiple paths connected to the same path.
+        lineage.assertLink(pathB, queueU);
+        lineage.assertLink(pathC, queueU);
+        lineage.assertLink(queueU, pathU);
+
+    }
+
+    @Test
+    public void testS2SSendSimple() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("S2SSend");
+
+        testS2SSend(tc);
+
+        final Lineage lineage = getLineage();
+
+        // The FlowFile created by Generate A has not been finished (by DROP event, but SIMPLE_PATH strategy can report it.
+        final Node pathA = lineage.findNode("nifi_flow_path", "Generate A", "ca71e4d9-2a4f-3970");
+        final Node genA = lineage.findNode("nifi_data", "Generate A", "ca71e4d9-2a4f-3970");
+        lineage.assertLink(genA, pathA);
+
+        final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491");
+        final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708");
+
+        // Generate C and GetTwitter have reported proper SEND lineage to the input port.
+        final Node remoteInputPortD = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+        final Node remoteInputPortP = lineage.findNode("nifi_flow_path", "Remote Input Port", "f31a6b53-3077-4c59");
+        final Node remoteInputPortQ = lineage.findNode("nifi_queue", "queue", "f31a6b53-3077-4c59");
+        lineage.assertLink(pathC, remoteInputPortQ);
+        lineage.assertLink(pathT, remoteInputPortQ);
+        lineage.assertLink(remoteInputPortQ, remoteInputPortP);
+        lineage.assertLink(remoteInputPortP, remoteInputPortD);
+
+        // nifi_data is created for each obscure input processor.
+        final Node genC = lineage.findNode("nifi_data", "Generate C", "c439cdca-e989-3491");
+        final Node genT = lineage.findNode("nifi_data", "GetTwitter", "b775b657-5a5b-3708");
+        lineage.assertLink(genC, pathC);
+        lineage.assertLink(genT, pathT);
+    }
+
+    @Test
+    public void testS2SSendComplete() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("S2SSend");
+        tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
+
+        testS2SSend(tc);
+
+        final Lineage lineage = getLineage();
+
+        // Complete path has hash.
+        final Node pathC = lineage.findNode("nifi_flow_path", "Generate C, Remote Input Port",
+                "c439cdca-e989-3491-0000-000000000000::1605753423@example");
+        final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter, Remote Input Port",
+                "b775b657-5a5b-3708-0000-000000000000::3843156947@example");
+
+        // Generate C and GetTwitter have reported proper SEND lineage to the input port.
+        final Node remoteInputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+        lineage.assertLink(pathC, remoteInputPort);
+        lineage.assertLink(pathT, remoteInputPort);
+
+        // nifi_data is created for each obscure input processor.
+        final Node genC = lineage.findNode("nifi_data", "Generate C", "c439cdca-e989-3491");
+        final Node genT = lineage.findNode("nifi_data", "GetTwitter", "b775b657-5a5b-3708");
+        lineage.assertLink(genC, pathC);
+        lineage.assertLink(genT, pathT);
+    }
+
+    /**
+     * A client NiFi gets FlowFiles from a remote NiFi.
+     */
+    @Test
+    public void testS2SGet() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("S2SGet");
+        final ProvenanceRecords prs = tc.provenanceRecords;
+        // The remote port GUID is different than the Remote Output Ports.
+        prs.add(pr("7375f8f6-4604-468d", "Remote Output Port", RECEIVE,
+                "http://nifi.example.com:8080/nifi-api/data-transfer/output-ports" +
+                        "/392e7343-3950-329b-0000-000000000000/transactions/tx-1/flow-files"));
+
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+
+        final Node flow = lineage.findNode("nifi_flow", "S2SGet", "S2SGet@example");
+        final Node pathL = lineage.findNode("nifi_flow_path", "LogAttribute", "97cc5b27-22f3-3c3b");
+        final Node pathP = lineage.findNode("nifi_flow_path", "PutFile", "4f3bfa4c-6427-3aac");
+        final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "bb530e58-ee14-3cac");
+
+        // These entities should be created by notification.
+        final Node remoteOutputPortDataSet = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b");
+        final Node remoteOutputPortProcess = lineage.findNode("nifi_flow_path", "Remote Output Port", "7375f8f6-4604-468d");
+        final Node queueL = lineage.findNode("nifi_queue", "queue", "97cc5b27-22f3-3c3b");
+        final Node queueP = lineage.findNode("nifi_queue", "queue", "4f3bfa4c-6427-3aac");
+        final Node queueU = lineage.findNode("nifi_queue", "queue", "bb530e58-ee14-3cac");
+
+        lineage.assertLink(remoteOutputPortDataSet, remoteOutputPortProcess);
+
+        lineage.assertLink(flow, remoteOutputPortProcess);
+        lineage.assertLink(remoteOutputPortProcess, queueL);
+        lineage.assertLink(remoteOutputPortProcess, queueP);
+        lineage.assertLink(remoteOutputPortProcess, queueU);
+
+        lineage.assertLink(queueL, pathL);
+        lineage.assertLink(queueP, pathP);
+        lineage.assertLink(queueU, pathU);
+
+    }
+
+    /**
+     * A remote NiFi transfers FlowFiles to remote client NiFis.
+     * This NiFi instance owns RootProcessGroup output port.
+     */
+    @Test
+    public void testS2STransfer() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("S2STransfer");
+
+        final ProvenanceRecords prs = tc.provenanceRecords;
+        prs.add(pr("392e7343-3950-329b", "Output Port", SEND,
+                "http://nifi.example.com:8080/nifi-api/data-transfer/output-ports" +
+                        "/392e7343-3950-329b-0000-000000000000/transactions/tx-1/flow-files"));
+
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+
+        final Node flow = lineage.findNode("nifi_flow", "S2STransfer", "S2STransfer@example");
+        final Node path = lineage.findNode("nifi_flow_path", "GenerateFlowFile, output", "1b9f81db-a0fd-389a");
+        final Node outputPort = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b");
+
+        lineage.assertLink(flow, path);
+        lineage.assertLink(path, outputPort);
+    }
+
+    /**
+     * A remote NiFi receives FlowFiles from remote client NiFis.
+     * This NiFi instance owns RootProcessGroup input port.
+     */
+    @Test
+    public void testS2SReceive() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("S2SReceive");
+
+        final ProvenanceRecords prs = tc.provenanceRecords;
+        prs.add(pr("77919f59-533e-35a3", "Input Port", RECEIVE,
+                "http://nifi.example.com:8080/nifi-api/data-transfer/output-ports" +
+                        "/77919f59-533e-35a3-0000-000000000000/transactions/tx-1/flow-files"));
+
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+
+        final Node flow = lineage.findNode("nifi_flow", "S2SReceive", "S2SReceive@example");
+        final Node path = lineage.findNode("nifi_flow_path", "input, UpdateAttribute", "77919f59-533e-35a3");
+        final Node inputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+
+        lineage.assertLink(flow, path);
+        lineage.assertLink(flow, inputPort);
+
+        lineage.assertLink(inputPort, path);
+    }
+
+    @Test
+    public void testS2SReceiveAndSendCombination() throws Exception {
+        testS2SReceive();
+        testS2SSendSimple();
+
+        final Lineage lineage = getLineage();
+
+        final Node remoteFlow = lineage.findNode("nifi_flow", "S2SReceive", "S2SReceive@example");
+        final Node localFlow = lineage.findNode("nifi_flow", "S2SSend", "S2SSend@example");
+        final Node remoteInputPortQ = lineage.findNode("nifi_queue", "queue", "f31a6b53-3077-4c59");
+        final Node remoteInputPortP = lineage.findNode("nifi_flow_path", "Remote Input Port", "f31a6b53-3077-4c59");
+        final Node inputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+        final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491");
+        final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708");
+
+        // Remote flow owns the inputPort.
+        lineage.assertLink(remoteFlow, inputPort);
+
+        // These paths within local flow sends data to the remote flow through the remote input port.
+        lineage.assertLink(localFlow, pathC);
+        lineage.assertLink(localFlow, pathT);
+        lineage.assertLink(pathC, remoteInputPortQ);
+        lineage.assertLink(pathT, remoteInputPortQ);
+        lineage.assertLink(remoteInputPortQ, remoteInputPortP);
+        lineage.assertLink(remoteInputPortP, inputPort);
+
+    }
+
+    @Test
+    public void testS2STransferAndGetCombination() throws Exception {
+        testS2STransfer();
+        testS2SGet();
+
+        final Lineage lineage = getLineage();
+
+        final Node remoteFlow = lineage.findNode("nifi_flow", "S2STransfer", "S2STransfer@example");
+        final Node localFlow = lineage.findNode("nifi_flow", "S2SGet", "S2SGet@example");
+        final Node remoteGen = lineage.findNode("nifi_flow_path", "GenerateFlowFile, output", "1b9f81db-a0fd-389a");
+        final Node outputPort = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b");
+
+        final Node remoteOutputPortP = lineage.findNode("nifi_flow_path", "Remote Output Port", "7375f8f6-4604-468d");
+        final Node queueL = lineage.findNode("nifi_queue", "queue", "97cc5b27-22f3-3c3b");
+        final Node queueP = lineage.findNode("nifi_queue", "queue", "4f3bfa4c-6427-3aac");
+        final Node queueU = lineage.findNode("nifi_queue", "queue", "bb530e58-ee14-3cac");
+        final Node pathL = lineage.findNode("nifi_flow_path", "LogAttribute", "97cc5b27-22f3-3c3b");
+        final Node pathP = lineage.findNode("nifi_flow_path", "PutFile", "4f3bfa4c-6427-3aac");
+        final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "bb530e58-ee14-3cac");
+
+        // Remote flow owns the outputPort and transfer data generated by GenerateFlowFile.
+        lineage.assertLink(remoteFlow, remoteGen);
+        lineage.assertLink(remoteGen, outputPort);
+
+        // The Remote Output Port path in local flow gets data from the remote.
+        lineage.assertLink(localFlow, remoteOutputPortP);
+        lineage.assertLink(outputPort, remoteOutputPortP);
+        lineage.assertLink(remoteOutputPortP, queueL);
+        lineage.assertLink(remoteOutputPortP, queueP);
+        lineage.assertLink(remoteOutputPortP, queueU);
+        lineage.assertLink(queueL, pathL);
+        lineage.assertLink(queueP, pathP);
+        lineage.assertLink(queueU, pathU);
+
+    }
+
+    /**
+     * A client NiFi gets FlowFiles from a remote output port and sends it to a remote input port without doing anything.
+     */
+    @Test
+    public void testS2SDirect() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("S2SDirect");
+        final ProvenanceRecords prs = tc.provenanceRecords;
+
+        prs.add(pr("d73d9115-b987-4ffc", "Remote Output Port", RECEIVE,
+                "http://nifi.example.com:8080/nifi-api/data-transfer/output-ports" +
+                        "/015f1040-dcd7-17bd-5c1f-e31afe0a09a4/transactions/tx-1/flow-files"));
+
+        prs.add((pr("a4f14247-89aa-4e6c", "Remote Input Port", SEND,
+                "http://nifi.example.com:8080/nifi-api/data-transfer/input-ports" +
+                        "/015f101e-dcd7-17bd-8899-1a723733521a/transactions/tx-2/flow-files")));
+
+        Map<Long, ComputeLineageResult> lineages = tc.lineageResults;
+        // Received from remote output port, then sent it via remote input port
+        lineages.put(1L, createLineage(prs, 0, 1));
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+
+        final Node flow = lineage.findNode("nifi_flow", "S2SDirect", "S2SDirect@example");
+        final Node remoteOutputPort = lineage.findNode("nifi_output_port", "output", "015f1040-dcd7-17bd-5c1f-e31afe0a09a4@example");
+        final Node remoteOutputPortP = lineage.findNode("nifi_flow_path", "Remote Output Port", "d73d9115-b987-4ffc");
+        final Node remoteInputPortQ = lineage.findNode("nifi_queue", "queue", "a4f14247-89aa-4e6c");
+        final Node remoteInputPortP = lineage.findNode("nifi_flow_path", "Remote Input Port", "a4f14247-89aa-4e6c");
+        final Node remoteInputPort = lineage.findNode("nifi_input_port", "input", "015f101e-dcd7-17bd-8899-1a723733521a@example");
+
+        // Even if there is no Processor, lineage can be reported using root flow_path.
+        lineage.assertLink(flow, remoteOutputPortP);
+        lineage.assertLink(remoteOutputPort, remoteOutputPortP);
+        lineage.assertLink(remoteOutputPortP, remoteInputPortQ);
+        lineage.assertLink(remoteInputPortQ, remoteInputPortP);
+        lineage.assertLink(remoteInputPortP, remoteInputPort);
+    }
+
+    @Test
+    public void testRemoteInvocation() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("RemoteInvocation");
+        final ProvenanceRecords prs = tc.provenanceRecords;
+        prs.add(pr("2607ed95-c6ef-3636", "DeleteHDFS", REMOTE_INVOCATION, "hdfs://nn1.example.com:8020/test/2017-10-23"));
+        prs.add(pr("2607ed95-c6ef-3636", "DeleteHDFS", REMOTE_INVOCATION, "hdfs://nn1.example.com:8020/test/2017-10-24"));
+        prs.add(pr("2607ed95-c6ef-3636", "DeleteHDFS", REMOTE_INVOCATION, "hdfs://nn1.example.com:8020/test/2017-10-25"));
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+
+        final Node flow = lineage.findNode("nifi_flow", "RemoteInvocation", "RemoteInvocation@example");
+        final Node path = lineage.findNode("nifi_flow_path",
+                "DeleteHDFS",
+                "2607ed95-c6ef-3636");
+        final Node hdfsPath23 = lineage.findNode("hdfs_path", "/test/2017-10-23@example");
+        final Node hdfsPath24 = lineage.findNode("hdfs_path", "/test/2017-10-24@example");
+        final Node hdfsPath25 = lineage.findNode("hdfs_path", "/test/2017-10-25@example");
+        lineage.assertLink(flow, path);
+        lineage.assertLink(path, hdfsPath23);
+        lineage.assertLink(path, hdfsPath24);
+        lineage.assertLink(path, hdfsPath25);
+
+    }
+
+    @Test
+    public void testSimpleEventLevelSimplePath() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("SimpleEventLevel");
+        final ProvenanceRecords prs = tc.provenanceRecords;
+        prs.add(pr("d9257f7e-b78c-349a", "Generate A", CREATE));
+        prs.add(pr("d84b9bdc-5e42-3b3b", "Generate B", CREATE));
+
+        prs.add((pr("eaf013c1-aec5-39b0", "PutFile", SEND, "file:/tmp/nifi/a.txt")));
+        prs.add((pr("eaf013c1-aec5-39b0", "PutFile", SEND, "file:/tmp/nifi/b.txt")));
+
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+        final Node genA = lineage.findNode("nifi_data", "Generate A", "d9257f7e-b78c-349a");
+        final Node genB = lineage.findNode("nifi_data", "Generate B", "d84b9bdc-5e42-3b3b");
+
+        final Node genAPath = lineage.findNode("nifi_flow_path", "Generate A", "d9257f7e-b78c-349a");
+        final Node genBPath = lineage.findNode("nifi_flow_path", "Generate B", "d84b9bdc-5e42-3b3b");
+
+        final Node queue = lineage.findNode("nifi_queue", "queue", "eaf013c1-aec5-39b0");
+        final Node putFile = lineage.findNode("nifi_flow_path", "PutFile, LogAttribute", "eaf013c1-aec5-39b0");
+
+        final Node outA = lineage.findNode("fs_path", "/tmp/nifi/a.txt@example");
+        final Node outB = lineage.findNode("fs_path", "/tmp/nifi/b.txt@example");
+
+        lineage.assertLink(genA, genAPath);
+        lineage.assertLink(genAPath, queue);
+
+        lineage.assertLink(genB, genBPath);
+        lineage.assertLink(genBPath, queue);
+
+        lineage.assertLink(queue, putFile);
+        lineage.assertLink(putFile, outA);
+        lineage.assertLink(putFile, outB);
+    }
+
+    @Test
+    public void testSimpleEventLevelCompletePath() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("SimpleEventLevel");
+        tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
+        final ProvenanceRecords prs = tc.provenanceRecords;
+
+        String flowFileUUIDA = "A0000000-0000-0000";
+        String flowFileUUIDB = "B0000000-0000-0000";
+        prs.add(pr("d9257f7e-b78c-349a", "Generate A", CREATE, flowFileUUIDA));
+        prs.add(pr("d84b9bdc-5e42-3b3b", "Generate B", CREATE, flowFileUUIDB));
+
+        prs.add((pr("eaf013c1-aec5-39b0", "PutFile", SEND, "file:/tmp/nifi/a.txt", flowFileUUIDA)));
+        prs.add((pr("eaf013c1-aec5-39b0", "PutFile", SEND, "file:/tmp/nifi/b.txt", flowFileUUIDB)));
+
+        prs.add(pr("bfc30bc3-48cf-332a", "LogAttribute", DROP, flowFileUUIDA));
+        prs.add(pr("bfc30bc3-48cf-332a", "LogAttribute", DROP, flowFileUUIDB));
+
+        Map<Long, ComputeLineageResult> lineages = tc.lineageResults;
+        lineages.put(4L, createLineage(prs, 0, 2, 4));
+        lineages.put(5L, createLineage(prs, 1, 3, 5));
+
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+
+        final Node genA = lineage.findNode("nifi_data", "Generate A", "d9257f7e-b78c-349a");
+        final Node genB = lineage.findNode("nifi_data", "Generate B", "d84b9bdc-5e42-3b3b");
+
+        final Node genAPath = lineage.findNode("nifi_flow_path", "Generate A, PutFile, LogAttribute",
+                "d9257f7e-b78c-349a-0000-000000000000::980416504@example");
+        final Node genBPath = lineage.findNode("nifi_flow_path", "Generate B, PutFile, LogAttribute",
+                "d84b9bdc-5e42-3b3b-0000-000000000000::442259660@example");
+
+        final Node outA = lineage.findNode("fs_path", "/tmp/nifi/a.txt@example");
+        final Node outB = lineage.findNode("fs_path", "/tmp/nifi/b.txt@example");
+
+        lineage.assertLink(genA, genAPath);
+        lineage.assertLink(genB, genBPath);
+
+        lineage.assertLink(genAPath, outA);
+        lineage.assertLink(genBPath, outB);
+    }
+
+    @Test
+    public void testMergedEvents() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("MergedEvents");
+        tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
+        final ProvenanceRecords prs = tc.provenanceRecords;
+        final String flowFileUUIDA = "A0000000-0000-0000";
+        final String flowFileUUIDB = "B0000000-0000-0000";
+        final String flowFileUUIDC = "C0000000-0000-0000";
+        final String flowFileUUIDD = "D0000000-0000-0000";
+        // Merged B and C.
+        final String flowFileUUIDBC = "BC000000-0000-0000";
+        prs.add(pr("f585d83b-2a03-37cf", "Generate A", CREATE, flowFileUUIDA)); // 0
+        prs.add(pr("59a7c1f9-9a73-3cc6", "Generate B", CREATE, flowFileUUIDB)); // 1
+        prs.add(pr("d6c3f282-e03d-316c", "Generate C", CREATE, flowFileUUIDC)); // 2
+        prs.add(pr("f9593a5a-f0d5-3e87", "Generate D", CREATE, flowFileUUIDD)); // 3
+        // Original files are dropped.
+        prs.add(pr("c77dd033-bb9e-39ea", "MergeContent", JOIN, flowFileUUIDBC)); // 4
+        prs.add(pr("c77dd033-bb9e-39ea", "MergeContent", DROP, flowFileUUIDB)); // 5
+        prs.add(pr("c77dd033-bb9e-39ea", "MergeContent", DROP, flowFileUUIDC)); // 6
+
+        prs.add((pr("93f8ad14-6ee6-34c1", "PutFile", SEND, "file:/tmp/nifi/a.txt", flowFileUUIDA))); // 7
+        prs.add((pr("93f8ad14-6ee6-34c1", "PutFile", SEND, "file:/tmp/nifi/bc.txt", flowFileUUIDBC))); // 8
+        prs.add((pr("93f8ad14-6ee6-34c1", "PutFile", SEND, "file:/tmp/nifi/d.txt", flowFileUUIDD))); // 9
+
+        prs.add(pr("bfc30bc3-48cf-332a", "LogAttribute", DROP, flowFileUUIDA)); // 10
+        prs.add(pr("bfc30bc3-48cf-332a", "LogAttribute", DROP, flowFileUUIDBC)); // 11
+        prs.add(pr("bfc30bc3-48cf-332a", "LogAttribute", DROP, flowFileUUIDD)); // 12
+
+        Map<Long, ComputeLineageResult> lineages = tc.lineageResults;
+        final ComputeLineageResult lineageB = createLineage(prs, 1, 4, 5);
+        final ComputeLineageResult lineageC = createLineage(prs, 2, 4, 6);
+        lineages.put(5L, lineageB); // B
+        lineages.put(6L, lineageC); // C
+
+        lineages.put(10L, createLineage(prs, 0, 7, 10)); // A
+        lineages.put(11L, createLineage(prs, 4, 8, 11)); // BC
+        lineages.put(12L, createLineage(prs, 3, 9, 12)); // D
+
+        Map<Long, ComputeLineageResult> parents = tc.parentLineageResults;
+        parents.put(4L, compositeLineages(lineageB, lineageC));
+
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+
+        final Node genA = lineage.findNode("nifi_data", "Generate A", "f585d83b-2a03-37cf");
+        final Node genB = lineage.findNode("nifi_data", "Generate B", "59a7c1f9-9a73-3cc6");
+        final Node genC = lineage.findNode("nifi_data", "Generate C", "d6c3f282-e03d-316c");
+        final Node genD = lineage.findNode("nifi_data", "Generate D", "f9593a5a-f0d5-3e87");
+
+        final Node genAPath = lineage.findNode("nifi_flow_path", "Generate A, PutFile, LogAttribute",
+                "f585d83b-2a03-37cf-0000-000000000000::1003499964@example");
+        final Node genBPath = lineage.findNode("nifi_flow_path", "Generate B",
+                "59a7c1f9-9a73-3cc6-0000-000000000000::45412830@example");
+        final Node genCPath = lineage.findNode("nifi_flow_path", "Generate C",
+                "d6c3f282-e03d-316c-0000-000000000000::1968410985@example");
+        final Node genDPath = lineage.findNode("nifi_flow_path", "Generate D, PutFile, LogAttribute",
+                "f9593a5a-f0d5-3e87-0000-000000000000::4257576567@example");
+
+        lineage.assertLink(genA, genAPath);
+        lineage.assertLink(genB, genBPath);
+        lineage.assertLink(genC, genCPath);
+        lineage.assertLink(genD, genDPath);
+
+        // B and C were merged together, while A and D were processed individually.
+        final Node joinBC = lineage.findNode("nifi_queue", "JOIN", "c77dd033-bb9e-39ea-0000-000000000000::2370367315@example");
+        final Node bcPath = lineage.findNode("nifi_flow_path", "MergeContent, PutFile, LogAttribute",
+                "c77dd033-bb9e-39ea-0000-000000000000::2370367315@example");
+        lineage.assertLink(genBPath, joinBC);
+        lineage.assertLink(genCPath, joinBC);
+        lineage.assertLink(joinBC, bcPath);
+
+        final Node outA = lineage.findNode("fs_path", "/tmp/nifi/a.txt@example");
+        final Node outBC = lineage.findNode("fs_path", "/tmp/nifi/bc.txt@example");
+        final Node outD = lineage.findNode("fs_path", "/tmp/nifi/d.txt@example");
+        lineage.assertLink(genAPath, outA);
+        lineage.assertLink(bcPath, outBC);
+        lineage.assertLink(genDPath, outD);
+
+    }
+
+    @Test
+    public void testRecordAndDataSetLevel() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("RecordAndDataSetLevel");
+        tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
+        final ProvenanceRecords prs = tc.provenanceRecords;
+
+        // Publish part
+        final String ffIdA1 = "A1000000";
+        final String ffIdB1 = "B1000000";
+        prs.add(pr("22be62d9-c4a1-3056", "GetFile", RECEIVE, "file:/tmp/input/A1.csv", ffIdA1)); // 0
+        prs.add(pr("22be62d9-c4a1-3056", "GetFile", RECEIVE, "file:/tmp/input/B1.csv", ffIdB1)); // 1
+
+        prs.add(pr("eaf013c1-aec5-39b0", "PutFile", SEND, "file:/tmp/output/A1.csv", ffIdA1)); // 2
+        prs.add(pr("eaf013c1-aec5-39b0", "PutFile", SEND, "file:/tmp/output/B1.csv", ffIdB1)); // 3
+
+        prs.add(pr("97641de3-fb76-3d95", "PublishKafkaRecord_0_10", SEND, "PLAINTEXT://localhost:9092/nifi-test", ffIdA1)); // 4
+        prs.add(pr("97641de3-fb76-3d95", "PublishKafkaRecord_0_10", SEND, "PLAINTEXT://localhost:9092/nifi-test", ffIdB1)); // 5
+
+        prs.add(pr("97641de3-fb76-3d95", "PublishKafkaRecord_0_10", DROP, ffIdA1)); // 6
+        prs.add(pr("97641de3-fb76-3d95", "PublishKafkaRecord_0_10", DROP, ffIdB1)); // 7
+
+        // Consume part
+        final String ffIdK1 = "K1000000";
+        final String ffIdA2 = "A2000000"; // Forked children
+        final String ffIdB2 = "B2000000"; // Forked children
+        prs.add(pr("529e6722-9b49-3b66", "ConsumeKafkaRecord_0_10", RECEIVE, "PLAINTEXT://localhost:9092/nifi-test", ffIdK1)); // 8
+        prs.add(pr("3f6d405e-6e3d-38c9", "PartitionRecord", FORK, ffIdK1)); // 9
+        prs.add(pr("db8bb12c-5cd3-3011", "UpdateAttribute", ATTRIBUTES_MODIFIED, ffIdA2)); // 10
+        prs.add(pr("db8bb12c-5cd3-3011", "UpdateAttribute", ATTRIBUTES_MODIFIED, ffIdB2)); // 11
+        prs.add(pr("062caf95-da40-3a57", "PutFile", SEND, "file:/tmp/consumed/A_20171101_100701.csv", ffIdA2)); // 12
+        prs.add(pr("062caf95-da40-3a57", "PutFile", SEND, "file:/tmp/consumed/B_20171101_100701.csv", ffIdB2)); // 13
+        prs.add(pr("062caf95-da40-3a57", "PutFile", DROP, ffIdA2)); // 14
+        prs.add(pr("062caf95-da40-3a57", "PutFile", DROP, ffIdB2)); // 15
+
+
+        Map<Long, ComputeLineageResult> lineages = tc.lineageResults;
+        Map<Long, ComputeLineageResult> parents = tc.parentLineageResults;
+        lineages.put(6L, createLineage(prs, 0, 2, 4, 6)); // Publish A1
+        lineages.put(7L, createLineage(prs, 1, 3, 5, 7)); // Publish B1
+        parents.put(9L, createLineage(prs, 8, 9)); // Consumed and Forked K1
+        lineages.put(14L, createLineage(prs, 9, 10, 12, 14)); // Processed A2
+        lineages.put(15L, createLineage(prs, 9, 11, 13, 15)); // Processed B2
+
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+
+        // Publish part
+        final Node inputFileA1 = lineage.findNode("fs_path", "/tmp/input/A1.csv@example");
+        final Node inputFileB1 = lineage.findNode("fs_path", "/tmp/input/B1.csv@example");
+        // These two flow paths are derived from the same set of Processors, but with different input files, and resulted different hashes.
+        final Node getFileToPublishKafkaA = lineage.findNode("nifi_flow_path", "GetFile, PutFile, PublishKafkaRecord_0_10",
+                "22be62d9-c4a1-3056-0000-000000000000::2823953997@example");
+        final Node getFileToPublishKafkaB = lineage.findNode("nifi_flow_path", "GetFile, PutFile, PublishKafkaRecord_0_10",
+                "22be62d9-c4a1-3056-0000-000000000000::568010061@example");
+
+        lineage.assertLink(inputFileA1, getFileToPublishKafkaA);
+        lineage.assertLink(inputFileB1, getFileToPublishKafkaB);
+
+        final Node nifiTestTopic = lineage.findNode("kafka_topic", "nifi-test@example");
+        final Node outputFileA = lineage.findNode("fs_path", "/tmp/output/A1.csv@example");
+        final Node outputFileB = lineage.findNode("fs_path", "/tmp/output/B1.csv@example");
+        lineage.assertLink(getFileToPublishKafkaA, nifiTestTopic);
+        lineage.assertLink(getFileToPublishKafkaB, nifiTestTopic);
+        lineage.assertLink(getFileToPublishKafkaA, outputFileA);
+        lineage.assertLink(getFileToPublishKafkaB, outputFileB);
+
+        // Consume part
+        final Node consumeNifiTestTopic = lineage.findNode("nifi_flow_path", "ConsumeKafkaRecord_0_10",
+                "529e6722-9b49-3b66-0000-000000000000::3649132843@example");
+        final Node forkedA = lineage.findNode("nifi_queue", "FORK",
+                "3f6d405e-6e3d-38c9-0000-000000000000::234149075@example");
+        final Node forkedB = lineage.findNode("nifi_queue", "FORK",
+                "3f6d405e-6e3d-38c9-0000-000000000000::2377021542@example");
+        lineage.assertLink(consumeNifiTestTopic, forkedA);
+        lineage.assertLink(consumeNifiTestTopic, forkedB);
+
+        final Node partitionToPutA = lineage.findNode("nifi_flow_path", "PartitionRecord, UpdateAttribute, PutFile",
+                "3f6d405e-6e3d-38c9-0000-000000000000::234149075@example");
+        final Node partitionToPutB = lineage.findNode("nifi_flow_path", "PartitionRecord, UpdateAttribute, PutFile",
+                "3f6d405e-6e3d-38c9-0000-000000000000::2377021542@example");
+        final Node consumedFileA = lineage.findNode("fs_path", "/tmp/consumed/A_20171101_100701.csv@example");
+        final Node consumedFileB = lineage.findNode("fs_path", "/tmp/consumed/B_20171101_100701.csv@example");
+        lineage.assertLink(forkedA, partitionToPutA);
+        lineage.assertLink(forkedB, partitionToPutB);
+        lineage.assertLink(partitionToPutA, consumedFileA);
+        lineage.assertLink(partitionToPutB, consumedFileB);
+    }
+
+    @Test
+    public void testMultiInAndOuts() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("MultiInAndOuts");
+        final ProvenanceRecords prs = tc.provenanceRecords;
+
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+
+        final Node gen1 = lineage.findNode("nifi_flow_path", "Gen1", "a4bfe4ec-570b-3126");
+        final Node gen2 = lineage.findNode("nifi_flow_path", "Gen2", "894218d5-dfe9-3ee5");
+        final Node ua1 = lineage.findNode("nifi_flow_path", "UA1", "5609cb4f-8a95-3b7a");
+        final Node ua2 = lineage.findNode("nifi_flow_path", "UA2", "6f88b3d9-5723-356a");
+        final Node ua3 = lineage.findNode("nifi_flow_path", "UA3, UA4, LogAttribute", "3250aeb6-4026-3969");
+        final Node ua1Q = lineage.findNode("nifi_queue", "queue", "5609cb4f-8a95-3b7a");
+        final Node ua2Q = lineage.findNode("nifi_queue", "queue", "6f88b3d9-5723-356a");
+        final Node ua3Q = lineage.findNode("nifi_queue", "queue", "3250aeb6-4026-3969");
+
+        lineage.assertLink(gen1, ua1Q);
+        lineage.assertLink(gen1, ua2Q);
+
+        lineage.assertLink(gen2, ua2Q);
+
+        lineage.assertLink(ua1Q, ua1);
+        lineage.assertLink(ua2Q, ua2);
+
+        lineage.assertLink(ua1, ua3Q);
+        lineage.assertLink(ua2, ua3Q);
+        lineage.assertLink(ua3Q, ua3);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/SimpleProvenanceRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/SimpleProvenanceRecord.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/SimpleProvenanceRecord.java
new file mode 100644
index 0000000..f02cc88
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/SimpleProvenanceRecord.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SimpleProvenanceRecord implements ProvenanceEventRecord {
+    private long eventId;
+    private String componentId;
+    private String componentType;
+    private String transitUri;
+    private String flowFileUUID;
+    private ProvenanceEventType eventType;
+    private Map<String, String> attributes = new HashMap<>();
+
+    public static SimpleProvenanceRecord pr(String componentId, String componentType, ProvenanceEventType eventType) {
+        return pr(componentId, componentType, eventType, null, null);
+    }
+    public static SimpleProvenanceRecord pr(String componentId, String componentType, ProvenanceEventType eventType, String transitUri) {
+        return pr(componentId, componentType, eventType, transitUri, null);
+    }
+    public static SimpleProvenanceRecord pr(String componentId, String componentType, ProvenanceEventType eventType, String transitUri, String flowFileUUID) {
+        final SimpleProvenanceRecord pr = new SimpleProvenanceRecord();
+        pr.componentId = componentId.length() == 18 ? componentId + "-0000-000000000000" : componentId;
+        pr.componentType = componentType;
+        pr.transitUri = transitUri;
+        pr.eventType = eventType;
+        pr.flowFileUUID = flowFileUUID;
+        return pr;
+    }
+
+    public void setEventId(long eventId) {
+        this.eventId = eventId;
+    }
+
+    @Override
+    public String getComponentId() {
+        return componentId;
+    }
+
+    @Override
+    public String getComponentType() {
+        return componentType;
+    }
+
+    @Override
+    public String getTransitUri() {
+        return transitUri;
+    }
+
+    @Override
+    public ProvenanceEventType getEventType() {
+        return eventType;
+    }
+
+    @Override
+    public Map<String, String> getAttributes() {
+        return attributes;
+    }
+
+    @Override
+    public long getEventId() {
+        return eventId;
+    }
+
+    @Override
+    public long getEventTime() {
+        return 0;
+    }
+
+    @Override
+    public long getFlowFileEntryDate() {
+        return 0;
+    }
+
+    @Override
+    public long getLineageStartDate() {
+        return 0;
+    }
+
+    @Override
+    public long getFileSize() {
+        return 0;
+    }
+
+    @Override
+    public Long getPreviousFileSize() {
+        return null;
+    }
+
+    @Override
+    public long getEventDuration() {
+        return 0;
+    }
+
+    @Override
+    public Map<String, String> getPreviousAttributes() {
+        return null;
+    }
+
+    @Override
+    public Map<String, String> getUpdatedAttributes() {
+        return null;
+    }
+
+    @Override
+    public String getSourceSystemFlowFileIdentifier() {
+        return null;
+    }
+
+    @Override
+    public String getFlowFileUuid() {
+        return null;
+    }
+
+    @Override
+    public List<String> getParentUuids() {
+        return null;
+    }
+
+    @Override
+    public List<String> getChildUuids() {
+        return null;
+    }
+
+    @Override
+    public String getAlternateIdentifierUri() {
+        return null;
+    }
+
+    @Override
+    public String getDetails() {
+        return null;
+    }
+
+    @Override
+    public String getRelationship() {
+        return null;
+    }
+
+    @Override
+    public String getSourceQueueIdentifier() {
+        return null;
+    }
+
+    @Override
+    public String getContentClaimSection() {
+        return null;
+    }
+
+    @Override
+    public String getPreviousContentClaimSection() {
+        return null;
+    }
+
+    @Override
+    public String getContentClaimContainer() {
+        return null;
+    }
+
+    @Override
+    public String getPreviousContentClaimContainer() {
+        return null;
+    }
+
+    @Override
+    public String getContentClaimIdentifier() {
+        return null;
+    }
+
+    @Override
+    public String getPreviousContentClaimIdentifier() {
+        return null;
+    }
+
+    @Override
+    public Long getContentClaimOffset() {
+        return null;
+    }
+
+    @Override
+    public Long getPreviousContentClaimOffset() {
+        return null;
+    }
+
+    @Override
+    public String getBestEventIdentifier() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
new file mode 100644
index 0000000..ae1d63d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockValidationContext;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_NIFI_URL;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_PASSWORD;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_URLS;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_USER;
+import static org.junit.Assert.assertTrue;
+
+public class TestReportLineageToAtlas {
+
+    private final Logger logger = LoggerFactory.getLogger(TestReportLineageToAtlas.class);
+
+    @Test
+    public void validateAtlasUrls() throws Exception {
+        final ReportLineageToAtlas reportingTask = new ReportLineageToAtlas();
+        final MockProcessContext processContext = new MockProcessContext(reportingTask);
+        final MockValidationContext validationContext = new MockValidationContext(processContext);
+
+        processContext.setProperty(ATLAS_NIFI_URL, "http://nifi.example.com:8080/nifi");
+        processContext.setProperty(ATLAS_USER, "admin");
+        processContext.setProperty(ATLAS_PASSWORD, "admin");
+
+        BiConsumer<Collection<ValidationResult>, Consumer<ValidationResult>> assertResults = (rs, a) -> {
+            assertTrue(rs.iterator().hasNext());
+            for (ValidationResult r : rs) {
+                logger.info("{}", r);
+                final String subject = r.getSubject();
+                if (ATLAS_URLS.getDisplayName().equals(subject)) {
+                    a.accept(r);
+                }
+            }
+        };
+
+        // Default setting.
+        assertResults.accept(reportingTask.validate(validationContext),
+                r -> assertTrue("Atlas URLs is required", !r.isValid()));
+
+
+        // Invalid URL.
+        processContext.setProperty(ATLAS_URLS, "invalid");
+        assertResults.accept(reportingTask.validate(validationContext),
+                r -> assertTrue("Atlas URLs is invalid", !r.isValid()));
+
+        // Valid URL
+        processContext.setProperty(ATLAS_URLS, "http://atlas.example.com:21000");
+        assertTrue(processContext.isValid());
+
+        // Valid URL with Expression
+        processContext.setProperty(ATLAS_URLS, "http://atlas.example.com:${literal(21000)}");
+        assertTrue(processContext.isValid());
+
+        // Valid URLs
+        processContext.setProperty(ATLAS_URLS, "http://atlas1.example.com:21000, http://atlas2.example.com:21000");
+        assertTrue(processContext.isValid());
+
+        // Invalid and Valid URLs
+        processContext.setProperty(ATLAS_URLS, "invalid, http://atlas2.example.com:21000");
+        assertResults.accept(reportingTask.validate(validationContext),
+                r -> assertTrue("Atlas URLs is invalid", !r.isValid()));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/resolver/TestRegexClusterResolver.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/resolver/TestRegexClusterResolver.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/resolver/TestRegexClusterResolver.java
new file mode 100644
index 0000000..bccd8c0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/resolver/TestRegexClusterResolver.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.resolver;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.when;
+
+public class TestRegexClusterResolver {
+
+    private PropertyContext context;
+    private ValidationContext validationContext;
+
+    public void setupMock(Map<String, String> properties) {
+        context = Mockito.mock(PropertyContext.class);
+        validationContext = Mockito.mock(ValidationContext.class);
+        when(validationContext.getAllProperties()).thenReturn(properties);
+        when(context.getAllProperties()).thenReturn(properties);
+    }
+
+    @Test
+    public void testEmptySettings() {
+        setupMock(Collections.EMPTY_MAP);
+        final RegexClusterResolver resolver = new RegexClusterResolver();
+
+        // It should be valid
+        final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
+        Assert.assertEquals(0, validationResults.size());
+        resolver.configure(context);
+
+        Assert.assertNull(resolver.fromHostNames("example.com"));
+    }
+
+    @Test
+    public void testInvalidClusterName() {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put(RegexClusterResolver.PATTERN_PROPERTY_PREFIX, ".*\\.example.com");
+        setupMock(properties);
+        final RegexClusterResolver resolver = new RegexClusterResolver();
+
+        final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
+        Assert.assertEquals(1, validationResults.size());
+        final ValidationResult validationResult = validationResults.iterator().next();
+        Assert.assertEquals(RegexClusterResolver.PATTERN_PROPERTY_PREFIX, validationResult.getSubject());
+
+        try {
+            resolver.configure(context);
+            Assert.fail("Configure method should fail, too");
+        } catch (IllegalArgumentException e) {
+        }
+    }
+
+    @Test
+    public void testEmptyPattern() {
+        final Map<String, String> properties = new HashMap<>();
+        final String propertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1";
+        properties.put(propertyName, "");
+        setupMock(properties);
+        final RegexClusterResolver resolver = new RegexClusterResolver();
+
+        final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
+        Assert.assertEquals(1, validationResults.size());
+        final ValidationResult validationResult = validationResults.iterator().next();
+        Assert.assertEquals(propertyName, validationResult.getSubject());
+
+        try {
+            resolver.configure(context);
+            Assert.fail("Configure method should fail, too");
+        } catch (IllegalArgumentException e) {
+        }
+    }
+
+    @Test
+    public void testSinglePattern() {
+        final Map<String, String> properties = new HashMap<>();
+        final String propertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1";
+        properties.put(propertyName, "^.*\\.example.com$");
+        setupMock(properties);
+        final RegexClusterResolver resolver = new RegexClusterResolver();
+
+        final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
+        Assert.assertEquals(0, validationResults.size());
+
+        resolver.configure(context);
+
+        Assert.assertEquals("Cluster1", resolver.fromHostNames("host1.example.com"));
+    }
+
+    @Test
+    public void testMultiplePatterns() {
+        final Map<String, String> properties = new HashMap<>();
+        final String propertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1";
+        // Hostname or local ip address, delimited with a whitespace
+        properties.put(propertyName, "^.*\\.example.com$\n^192.168.1.[\\d]+$");
+        setupMock(properties);
+        final RegexClusterResolver resolver = new RegexClusterResolver();
+
+        final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
+        Assert.assertEquals(0, validationResults.size());
+
+        resolver.configure(context);
+
+        Assert.assertEquals("Cluster1", resolver.fromHostNames("host1.example.com"));
+        Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.10"));
+        Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.22"));
+        Assert.assertNull(resolver.fromHostNames("192.168.2.30"));
+    }
+
+    @Test
+    public void testMultipleClusters() {
+        final Map<String, String> properties = new HashMap<>();
+        final String c1PropertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1";
+        final String c2PropertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster2";
+        // Hostname or local ip address
+        properties.put(c1PropertyName, "^.*\\.c1\\.example.com$ ^192.168.1.[\\d]+$");
+        properties.put(c2PropertyName, "^.*\\.c2\\.example.com$ ^192.168.2.[\\d]+$");
+        setupMock(properties);
+        final RegexClusterResolver resolver = new RegexClusterResolver();
+
+        final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
+        Assert.assertEquals(0, validationResults.size());
+
+        resolver.configure(context);
+
+        Assert.assertEquals("Cluster1", resolver.fromHostNames("host1.c1.example.com"));
+        Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.10"));
+        Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.22"));
+        Assert.assertEquals("Cluster2", resolver.fromHostNames("host2.c2.example.com"));
+        Assert.assertEquals("Cluster2", resolver.fromHostNames("192.168.2.10"));
+        Assert.assertEquals("Cluster2", resolver.fromHostNames("192.168.2.22"));
+        Assert.assertNull(resolver.fromHostNames("192.168.3.30"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties
new file mode 100644
index 0000000..927347d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+atlas.cluster.name=AtlasCluster
+
+# atlas.kafka.bootstrap.servers=atlas.example.com:6667
+atlas.kafka.bootstrap.servers=localhost:9092


Mime
View raw message