http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
new file mode 100644
index 0000000..2fe7d07
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
@@ -0,0 +1,1233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import org.apache.nifi.atlas.emulator.AtlasAPIV2ServerEmulator;
+import org.apache.nifi.atlas.emulator.Lineage;
+import org.apache.nifi.atlas.emulator.Node;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.lineage.EdgeNode;
+import org.apache.nifi.provenance.lineage.EventNode;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockPropertyValue;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.Attributes;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.Locator;
+import org.xml.sax.SAXException;
+import org.xml.sax.XMLReader;
+
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+import java.util.function.BiConsumer;
+
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_NIFI_URL;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_PASSWORD;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_URLS;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_USER;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.LINEAGE_STRATEGY_COMPLETE_PATH;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_LINEAGE_STRATEGY;
+import static org.apache.nifi.atlas.reporting.SimpleProvenanceRecord.pr;
+import static org.apache.nifi.provenance.ProvenanceEventType.ATTRIBUTES_MODIFIED;
+import static org.apache.nifi.provenance.ProvenanceEventType.CREATE;
+import static org.apache.nifi.provenance.ProvenanceEventType.DROP;
+import static org.apache.nifi.provenance.ProvenanceEventType.FORK;
+import static org.apache.nifi.provenance.ProvenanceEventType.JOIN;
+import static org.apache.nifi.provenance.ProvenanceEventType.RECEIVE;
+import static org.apache.nifi.provenance.ProvenanceEventType.REMOTE_INVOCATION;
+import static org.apache.nifi.provenance.ProvenanceEventType.SEND;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ITReportLineageToAtlas {
+
+ private static final Logger logger = LoggerFactory.getLogger(ITReportLineageToAtlas.class);
+
+ private ProcessGroupStatus loadTemplate(String name) {
+
+ final SAXParserFactory saxParserFactory = SAXParserFactory.newInstance();
+ final SAXParser saxParser;
+ try {
+ saxParser = saxParserFactory.newSAXParser();
+ } catch (ParserConfigurationException|SAXException e) {
+ throw new RuntimeException("Failed to create a SAX parser", e);
+ }
+
+ final XMLReader xmlReader;
+ try {
+ xmlReader = saxParser.getXMLReader();
+ } catch (SAXException e) {
+ throw new RuntimeException("Failed to create a XML reader", e);
+ }
+
+ final String template = ITReportLineageToAtlas.class.getResource("/flow-templates/" + name + ".xml").getPath();
+ final TemplateContentHander handler = new TemplateContentHander(name);
+ xmlReader.setContentHandler(handler);
+ try {
+ xmlReader.parse(template);
+ } catch (IOException|SAXException e) {
+ throw new RuntimeException("Failed to parse template", e);
+ }
+
+ return handler.getRootProcessGroupStatus();
+ }
+
+ private static class TemplateContentHander implements ContentHandler {
+
+ private static class Context {
+ private boolean isConnectionSource;
+ private boolean isConnectionDestination;
+ private boolean isRemoteProcessGroup;
+ private Stack<String> stack = new Stack<>();
+ }
+
+ private final Map<Class, BiConsumer<Object, String>> nameSetters = new HashMap<>();
+ private final Map<Class, BiConsumer<Object, String>> idSetters = new HashMap<>();
+ private final Map<String, Map<Class, BiConsumer<Object, String>>> setters = new HashMap<>();
+ private final Context context = new Context();
+
+ private BiConsumer<Object, String> s(String tag, BiConsumer<Object, String> setter) {
+ return (o, s) -> {
+ // Only apply the function when the element is the first level child.
+ // In order to avoid different 'name', 'id' or other common tags overwriting values.
+ if (tag.equals(context.stack.get(context.stack.size() - 2))) {
+ setter.accept(o, s);
+ }
+ };
+ }
+
+ public TemplateContentHander(String name) {
+ rootPgStatus = new ProcessGroupStatus();
+ rootPgStatus.setId(name);
+ rootPgStatus.setName(name);
+ pgStatus = rootPgStatus;
+ current = rootPgStatus;
+ pgStack.push(rootPgStatus);
+
+ setters.put("id", idSetters);
+ setters.put("name", nameSetters);
+
+ idSetters.put(ProcessGroupStatus.class, s("processGroups",
+ (o, id) -> ((ProcessGroupStatus) o).setId(id)));
+ idSetters.put(ProcessorStatus.class, s("processors",
+ (o, id) -> ((ProcessorStatus) o).setId(id)));
+
+ idSetters.put(PortStatus.class, (o, id) -> ((PortStatus) o).setId(id));
+
+ idSetters.put(ConnectionStatus.class, (o, id) -> {
+ if (context.isConnectionSource) {
+ ((ConnectionStatus) o).setSourceId(id);
+ } else if (context.isConnectionDestination) {
+ ((ConnectionStatus) o).setDestinationId(id);
+ } else {
+ ((ConnectionStatus) o).setId(id);
+ }
+ });
+
+ nameSetters.put(ProcessGroupStatus.class, s("processGroups",
+ (o, n) -> ((ProcessGroupStatus) o).setName(n)));
+
+ nameSetters.put(ProcessorStatus.class, s("processors",
+ (o, n) -> ((ProcessorStatus) o).setName(n)));
+
+ nameSetters.put(PortStatus.class, (o, n) -> ((PortStatus) o).setName(n));
+
+ nameSetters.put(ConnectionStatus.class, s("connections",
+ (o, n) -> ((ConnectionStatus) o).setName(n)));
+ }
+
+ private ProcessGroupStatus rootPgStatus;
+ private ProcessGroupStatus parentPgStatus;
+ private Stack<ProcessGroupStatus> pgStack = new Stack<>();
+ private ProcessGroupStatus pgStatus;
+ private ProcessorStatus processorStatus;
+ private PortStatus portStatus;
+ private ConnectionStatus connectionStatus;
+ private Object current;
+ private StringBuffer stringBuffer;
+ private Map<String, String> componentNames = new HashMap<>();
+
+ public ProcessGroupStatus getRootProcessGroupStatus() {
+ return rootPgStatus;
+ }
+
+ @Override
+ public void setDocumentLocator(Locator locator) {
+
+ }
+
+ @Override
+ public void startDocument() throws SAXException {
+ }
+
+ private void setConnectionNames(ProcessGroupStatus pg) {
+ pg.getConnectionStatus().forEach(c -> setConnectionName(c));
+ pg.getProcessGroupStatus().forEach(child -> setConnectionNames(child));
+ }
+
+ private void setConnectionName(ConnectionStatus c) {
+ if (c.getSourceName() == null || c.getSourceName().isEmpty()) {
+ c.setSourceName(componentNames.get(c.getSourceId()));
+ }
+ if (c.getDestinationName() == null || c.getDestinationName().isEmpty()) {
+ c.setDestinationName(componentNames.get(c.getDestinationId()));
+ }
+ }
+
+ @Override
+ public void endDocument() throws SAXException {
+ setConnectionNames(rootPgStatus);
+ System.out.println("rootPgStatus=" + rootPgStatus);
+ }
+
+ @Override
+ public void startPrefixMapping(String prefix, String uri) throws SAXException {
+
+ }
+
+ @Override
+ public void endPrefixMapping(String prefix) throws SAXException {
+
+ }
+
+ @Override
+ public void startElement(String uri, String localName, String qName, Attributes atts) throws SAXException {
+ // Clear flags.
+ stringBuffer = new StringBuffer();
+
+ switch (qName) {
+ case "processGroups":
+ if (pgStatus != null) {
+ pgStack.push(pgStatus);
+ }
+ parentPgStatus = pgStatus;
+ pgStatus = new ProcessGroupStatus();
+ current = pgStatus;
+ if (parentPgStatus != null) {
+ parentPgStatus.getProcessGroupStatus().add(pgStatus);
+ }
+ break;
+
+ case "processors":
+ processorStatus = new ProcessorStatus();
+ current = processorStatus;
+ pgStatus.getProcessorStatus().add(processorStatus);
+ break;
+
+ case "inputPorts":
+ case "outputPorts":
+ portStatus = new PortStatus();
+ current = portStatus;
+ if (!context.isRemoteProcessGroup) {
+ ("inputPorts".equals(qName)
+ ? pgStatus.getInputPortStatus()
+ : pgStatus.getOutputPortStatus())
+ .add(portStatus);
+ }
+ break;
+
+ case "connections":
+ connectionStatus = new ConnectionStatus();
+ current = connectionStatus;
+ pgStatus.getConnectionStatus().add(connectionStatus);
+ context.isConnectionSource = false;
+ context.isConnectionDestination = false;
+ break;
+
+ case "source":
+ if (current instanceof ConnectionStatus) {
+ context.isConnectionSource = true;
+ }
+ break;
+ case "destination":
+ if (current instanceof ConnectionStatus) {
+ context.isConnectionDestination = true;
+ }
+ break;
+
+ case "remoteProcessGroups":
+ context.isRemoteProcessGroup = true;
+ break;
+ }
+ context.stack.push(qName);
+
+ }
+
+ @Override
+ public void endElement(String uri, String localName, String qName) throws SAXException {
+ switch (qName) {
+ case "processGroups":
+ // At this point pgStatus has id assigned. Set group id of each component within this pg.
+ pgStatus.getProcessorStatus().forEach(s -> s.setGroupId(pgStatus.getId()));
+ pgStatus.getInputPortStatus().forEach(s -> s.setGroupId(pgStatus.getId()));
+ pgStatus.getOutputPortStatus().forEach(s -> s.setGroupId(pgStatus.getId()));
+ pgStatus.getConnectionStatus().forEach(s -> s.setGroupId(pgStatus.getId()));
+
+ // Put the previous ProcessGroup back to current.
+ pgStatus = pgStack.isEmpty() ? null : pgStack.pop();
+ current = pgStatus;
+ break;
+ case "processors":
+ case "connections":
+ current = pgStatus;
+ break;
+ case "inputPorts":
+ case "outputPorts":
+ current = pgStatus;
+ if (context.isRemoteProcessGroup) {
+ componentNames.put(portStatus.getId(), portStatus.getName());
+ }
+ break;
+ case "id":
+ case "name":
+ if (current != null) {
+ final BiConsumer<Object, String> setter = setters.get(qName).get(current.getClass());
+ if (setter == null) {
+ throw new RuntimeException(qName + " setter was not found: " + current.getClass());
+ }
+ setter.accept(current, stringBuffer.toString());
+ }
+ break;
+ case "remoteProcessGroups":
+ context.isRemoteProcessGroup = false;
+ break;
+ }
+ context.stack.pop();
+ }
+
+ @Override
+ public void characters(char[] ch, int start, int length) throws SAXException {
+ stringBuffer.append(ch, start, length);
+ }
+
+ @Override
+ public void ignorableWhitespace(char[] ch, int start, int length) throws SAXException {
+
+ }
+
+ @Override
+ public void processingInstruction(String target, String data) throws SAXException {
+
+ }
+
+ @Override
+ public void skippedEntity(String name) throws SAXException {
+
+ }
+ }
+
+ private static String TARGET_ATLAS_URL = "http://localhost:21000";
+
+ private Stack<Long> requestedLineageComputationIds = new Stack<>();
+ private Stack<Long> requestedExpandParentsIds = new Stack<>();
+
+ private class TestConfiguration {
+ private final ProcessGroupStatus rootPgStatus;
+ private final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ private final ProvenanceRecords provenanceRecords = new ProvenanceRecords();
+ private final Map<Long, ComputeLineageResult> lineageResults = new HashMap<>();
+ private final Map<Long, ComputeLineageResult> parentLineageResults = new HashMap<>();
+
+ private TestConfiguration(String templateName) {
+ this.rootPgStatus = loadTemplate(templateName);
+ }
+
+ private void addLineage(ComputeLineageResult lineage) {
+ lineage.getNodes().forEach(n -> lineageResults.put(Long.parseLong(n.getIdentifier()), lineage));
+ }
+ }
+
+ private void test(TestConfiguration tc) throws InitializationException, IOException {
+ final ReportLineageToAtlas reportingTask = new ReportLineageToAtlas();
+ final MockComponentLog logger = new MockComponentLog("reporting-task-id", reportingTask);
+
+ final ReportingInitializationContext initializationContext = mock(ReportingInitializationContext.class);
+ when(initializationContext.getLogger()).thenReturn(logger);
+ final ConfigurationContext configurationContext = new MockConfigurationContext(tc.properties, null);
+ final ValidationContext validationContext = mock(ValidationContext.class);
+ when(validationContext.getProperty(any())).then(invocation -> new MockPropertyValue(tc.properties.get(invocation.getArguments()[0])));
+ final ReportingContext reportingContext = mock(ReportingContext.class);
+ final MockStateManager stateManager = new MockStateManager(reportingTask);
+ final EventAccess eventAccess = mock(EventAccess.class);
+ when(reportingContext.getProperties()).thenReturn(tc.properties);
+ when(reportingContext.getProperty(any())).then(invocation -> new MockPropertyValue(tc.properties.get(invocation.getArguments()[0])));
+ when(reportingContext.getStateManager()).thenReturn(stateManager);
+ when(reportingContext.getEventAccess()).thenReturn(eventAccess);
+ when(eventAccess.getGroupStatus(eq("root"))).thenReturn(tc.rootPgStatus);
+
+ final ProvenanceRepository provenanceRepository = mock(ProvenanceRepository.class);
+ when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
+ when(eventAccess.getProvenanceEvents(eq(-1L), anyInt())).thenReturn(tc.provenanceRecords);
+ when(provenanceRepository.getMaxEventId()).thenReturn((long) tc.provenanceRecords.size() - 1);
+ when(provenanceRepository.getEvent(anyLong())).then(invocation -> tc.provenanceRecords.get(((Long) invocation.getArguments()[0]).intValue()));
+
+ // To mock this async method invocations, keep the requested event ids in a stack.
+ final ComputeLineageSubmission lineageComputationSubmission = mock(ComputeLineageSubmission.class);
+ when(provenanceRepository.submitLineageComputation(anyLong(), any())).thenAnswer(invocation -> {
+ requestedLineageComputationIds.push((Long) invocation.getArguments()[0]);
+ return lineageComputationSubmission;
+ });
+ when(lineageComputationSubmission.getResult()).then(invocation -> tc.lineageResults.get(requestedLineageComputationIds.pop()));
+
+ final ComputeLineageSubmission expandParentsSubmission = mock(ComputeLineageSubmission.class);
+ when(provenanceRepository.submitExpandParents(anyLong(), any())).thenAnswer(invocation -> {
+ requestedExpandParentsIds.push(((Long) invocation.getArguments()[0]));
+ return expandParentsSubmission;
+ });
+ when(expandParentsSubmission.getResult()).then(invocation -> tc.parentLineageResults.get(requestedExpandParentsIds.pop()));
+
+ tc.properties.put(ATLAS_NIFI_URL, "http://localhost:8080/nifi");
+ tc.properties.put(ATLAS_URLS, TARGET_ATLAS_URL);
+ tc.properties.put(ATLAS_USER, "admin");
+ tc.properties.put(ATLAS_PASSWORD, "admin");
+ tc.properties.put(new PropertyDescriptor.Builder().name("hostnamePattern.example").dynamic(true).build(), ".*");
+
+
+ reportingTask.initialize(initializationContext);
+ reportingTask.validate(validationContext);
+ reportingTask.setup(configurationContext);
+ reportingTask.onTrigger(reportingContext);
+ reportingTask.onUnscheduled();
+ }
+
+ private boolean useEmbeddedEmulator;
+ private AtlasAPIV2ServerEmulator atlasAPIServer;
+
+ public ITReportLineageToAtlas() {
+ useEmbeddedEmulator = Boolean.valueOf(System.getenv("useEmbeddedEmulator"));
+ if (useEmbeddedEmulator) {
+ atlasAPIServer = new AtlasAPIV2ServerEmulator();
+ }
+ }
+
+ @Before
+ public void startEmulator() throws Exception {
+ if (useEmbeddedEmulator) {
+ atlasAPIServer.start();
+ } else {
+ // Clear existing entities.
+ URL url = new URL(TARGET_ATLAS_URL + "/api/atlas/v2/entity/bulk/");
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setDoOutput(true);
+ conn.setRequestMethod("DELETE");
+ conn.connect();
+ conn.getResponseCode();
+ conn.disconnect();
+ }
+ }
+
+ @After
+ public void stopEmulator() throws Exception {
+ if (useEmbeddedEmulator) {
+ atlasAPIServer.stop();
+ }
+ }
+
+ private static class ProvenanceRecords extends ArrayList<ProvenanceEventRecord> {
+ @Override
+ public boolean add(ProvenanceEventRecord record) {
+ ((SimpleProvenanceRecord) record).setEventId(size());
+ return super.add(record);
+ }
+ }
+
+
+ private Lineage getLineage() throws Exception {
+ final URL url = new URL("http://localhost:21000/api/atlas/v2/debug/lineage/");
+ try (InputStream in = url.openStream()) {
+ Lineage lineage = new ObjectMapper().reader().withType(Lineage.class).readValue(in);
+ return lineage;
+ }
+ }
+
+ private void waitNotificationsGetDelivered() throws InterruptedException {
+ Thread.sleep(3_000);
+ }
+
+ @Test
+ public void testSimplestPath() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("SimplestFlowPath");
+ test(tc);
+
+ final Lineage lineage = getLineage();
+ lineage.assertLink("nifi_flow", "SimplestFlowPath", "SimplestFlowPath@example",
+ "nifi_flow_path", "GenerateFlowFile, LogAttribute", "d270e6f0-c5e0-38b9");
+ }
+
+ @Test
+ public void testSingleFlowPath() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("SingleFlowPath");
+ final ProvenanceRecords prs = tc.provenanceRecords;
+ prs.add(pr("2e9a2852-228f-379b", "ConsumeKafka_0_11", RECEIVE, "PLAINTEXT://0.kafka.example.com:6667/topic-a"));
+ prs.add(pr("5a56149a-d82a-3242", "PublishKafka_0_11", SEND, "PLAINTEXT://0.kafka.example.com:6667/topic-b"));
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+ final Node flow = lineage.findNode("nifi_flow", "SingleFlowPath", "SingleFlowPath@example");
+ final Node path = lineage.findNode("nifi_flow_path",
+ "ConsumeKafka_0_11, UpdateAttribute, ConvertJSONToSQL, PutSQL, PublishKafka_0_11",
+ "2e9a2852-228f-379b");
+ final Node topicA = lineage.findNode("kafka_topic", "topic-a@example");
+ final Node topicB = lineage.findNode("kafka_topic", "topic-b@example");
+ lineage.assertLink(flow, path);
+ lineage.assertLink(topicA, path);
+ lineage.assertLink(path, topicB);
+ }
+
+ @Test
+ public void testMultipleProcessGroups() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("MultipleProcessGroups");
+ final ProvenanceRecords prs = tc.provenanceRecords;
+ prs.add(pr("989dabb7-54b9-3c78", "ConsumeKafka_0_11", RECEIVE, "PLAINTEXT://0.kafka.example.com:6667/nifi-test"));
+ prs.add(pr("767c7bd6-75e3-3f32", "PutHDFS", SEND, "hdfs://nn1.example.com:8020/user/nifi/5262553828219"));
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+
+ final Node flow = lineage.findNode("nifi_flow", "MultipleProcessGroups", "MultipleProcessGroups@example");
+ final Node path = lineage.findNode("nifi_flow_path",
+ "ConsumeKafka_0_11, UpdateAttribute, PutHDFS",
+ "989dabb7-54b9-3c78");
+ final Node kafkaTopic = lineage.findNode("kafka_topic", "nifi-test@example");
+ final Node hdfsPath = lineage.findNode("hdfs_path", "/user/nifi/5262553828219@example");
+ lineage.assertLink(flow, path);
+ lineage.assertLink(kafkaTopic, path);
+ lineage.assertLink(path, hdfsPath);
+
+ }
+
+ private EdgeNode createEdge(ProvenanceRecords prs, int srcIdx, int tgtIdx) {
+ // Generate C created a FlowFile
+ final ProvenanceEventRecord srcR = prs.get(srcIdx);
+ // Then Remote Input Port sent it
+ final ProvenanceEventRecord tgtR = prs.get(tgtIdx);
+ final EventNode src = new EventNode(srcR);
+ final EventNode tgt = new EventNode(tgtR);
+ final EdgeNode edge = new EdgeNode(srcR.getComponentType() + " to " + tgtR.getEventType(), src, tgt);
+ return edge;
+ }
+
+ private ComputeLineageResult createLineage(ProvenanceRecords prs, int ... indices) throws InterruptedException {
+ final ComputeLineageResult lineage = mock(ComputeLineageResult.class);
+ when(lineage.awaitCompletion(anyLong(), any())).thenReturn(true);
+ final List<LineageEdge> edges = new ArrayList<>();
+ final Set<LineageNode> nodes = new LinkedHashSet<>();
+ for (int i = 0; i < indices.length - 1; i++) {
+ final EdgeNode edge = createEdge(prs, indices[i], indices[i + 1]);
+ edges.add(edge);
+ nodes.add(edge.getSource());
+ nodes.add(edge.getDestination());
+ }
+ when(lineage.getEdges()).thenReturn(edges);
+ when(lineage.getNodes()).thenReturn(new ArrayList<>(nodes));
+ return lineage;
+ }
+
+ private ComputeLineageResult compositeLineages(ComputeLineageResult ... results) throws InterruptedException {
+ final ComputeLineageResult lineage = mock(ComputeLineageResult.class);
+ when(lineage.awaitCompletion(anyLong(), any())).thenReturn(true);
+ final List<LineageEdge> edges = new ArrayList<>();
+ final Set<LineageNode> nodes = new LinkedHashSet<>();
+ for (int i = 0; i < results.length; i++) {
+ edges.addAll(results[i].getEdges());
+ nodes.addAll(results[i].getNodes());
+ }
+ when(lineage.getEdges()).thenReturn(edges);
+ when(lineage.getNodes()).thenReturn(new ArrayList<>(nodes));
+ return lineage;
+ }
+
+ /**
+ * A client NiFi sends FlowFiles to a remote NiFi.
+ */
+ private void testS2SSend(TestConfiguration tc) throws Exception {
+ final ProvenanceRecords prs = tc.provenanceRecords;
+ prs.add(pr("ca71e4d9-2a4f-3970", "Generate A", CREATE));
+ prs.add(pr("c439cdca-e989-3491", "Generate C", CREATE));
+ prs.add(pr("b775b657-5a5b-3708", "GetTwitter", CREATE));
+
+ // The remote port GUID is different than the Remote Input Ports.
+ prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", SEND,
+ "http://nifi.example.com:8080/nifi-api/data-transfer/input-ports" +
+ "/77919f59-533e-35a3-0000-000000000000/transactions/tx-1/flow-files"));
+
+ prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", SEND,
+ "http://nifi.example.com:8080/nifi-api/data-transfer/input-ports" +
+ "/77919f59-533e-35a3-0000-000000000000/transactions/tx-2/flow-files"));
+
+ prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", DROP)); // C
+ prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", DROP)); // Twitter
+
+ // Generate C created a FlowFile, then it's sent via S2S
+ tc.addLineage(createLineage(prs, 1, 3, 5));
+ // GetTwitter created a FlowFile, then it's sent via S2S
+ tc.addLineage(createLineage(prs, 2, 4, 6));
+
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+
+ final Node flow = lineage.findNode("nifi_flow", "S2SSend", "S2SSend@example");
+ final Node pathA = lineage.findNode("nifi_flow_path", "Generate A", "ca71e4d9-2a4f-3970");
+ final Node pathB = lineage.findNode("nifi_flow_path", "Generate B", "333255b6-eb02-3056");
+ final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491");
+ final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708");
+ final Node pathI = lineage.findNode("nifi_flow_path", "InactiveProcessor", "7033f311-ac68-3cab");
+ // UpdateAttribute has multiple incoming paths, so it generates a queue to receive those.
+ final Node queueU = lineage.findNode("nifi_queue", "queue", "c5392447-e9f1-33ad");
+ final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "c5392447-e9f1-33ad");
+
+ // These are starting paths.
+ lineage.assertLink(flow, pathA);
+ lineage.assertLink(flow, pathB);
+ lineage.assertLink(flow, pathC);
+ lineage.assertLink(flow, pathT);
+ lineage.assertLink(flow, pathI);
+
+ // Multiple paths connected to the same path.
+ lineage.assertLink(pathB, queueU);
+ lineage.assertLink(pathC, queueU);
+ lineage.assertLink(queueU, pathU);
+
+ }
+
+ @Test
+ public void testS2SSendSimple() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("S2SSend");
+
+ testS2SSend(tc);
+
+ final Lineage lineage = getLineage();
+
+ // The FlowFile created by Generate A has not been finished (by DROP event, but SIMPLE_PATH strategy can report it.
+ final Node pathA = lineage.findNode("nifi_flow_path", "Generate A", "ca71e4d9-2a4f-3970");
+ final Node genA = lineage.findNode("nifi_data", "Generate A", "ca71e4d9-2a4f-3970");
+ lineage.assertLink(genA, pathA);
+
+ final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491");
+ final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708");
+
+ // Generate C and GetTwitter have reported proper SEND lineage to the input port.
+ final Node remoteInputPortD = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+ final Node remoteInputPortP = lineage.findNode("nifi_flow_path", "Remote Input Port", "f31a6b53-3077-4c59");
+ final Node remoteInputPortQ = lineage.findNode("nifi_queue", "queue", "f31a6b53-3077-4c59");
+ lineage.assertLink(pathC, remoteInputPortQ);
+ lineage.assertLink(pathT, remoteInputPortQ);
+ lineage.assertLink(remoteInputPortQ, remoteInputPortP);
+ lineage.assertLink(remoteInputPortP, remoteInputPortD);
+
+ // nifi_data is created for each obscure input processor.
+ final Node genC = lineage.findNode("nifi_data", "Generate C", "c439cdca-e989-3491");
+ final Node genT = lineage.findNode("nifi_data", "GetTwitter", "b775b657-5a5b-3708");
+ lineage.assertLink(genC, pathC);
+ lineage.assertLink(genT, pathT);
+ }
+
+ @Test
+ public void testS2SSendComplete() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("S2SSend");
+ tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
+
+ testS2SSend(tc);
+
+ final Lineage lineage = getLineage();
+
+ // Complete path has hash.
+ final Node pathC = lineage.findNode("nifi_flow_path", "Generate C, Remote Input Port",
+ "c439cdca-e989-3491-0000-000000000000::1605753423@example");
+ final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter, Remote Input Port",
+ "b775b657-5a5b-3708-0000-000000000000::3843156947@example");
+
+ // Generate C and GetTwitter have reported proper SEND lineage to the input port.
+ final Node remoteInputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+ lineage.assertLink(pathC, remoteInputPort);
+ lineage.assertLink(pathT, remoteInputPort);
+
+ // nifi_data is created for each obscure input processor.
+ final Node genC = lineage.findNode("nifi_data", "Generate C", "c439cdca-e989-3491");
+ final Node genT = lineage.findNode("nifi_data", "GetTwitter", "b775b657-5a5b-3708");
+ lineage.assertLink(genC, pathC);
+ lineage.assertLink(genT, pathT);
+ }
+
+ /**
+ * A client NiFi gets FlowFiles from a remote NiFi.
+ */
+ @Test
+ public void testS2SGet() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("S2SGet");
+ final ProvenanceRecords prs = tc.provenanceRecords;
+ // The remote port GUID is different than the Remote Output Ports.
+ prs.add(pr("7375f8f6-4604-468d", "Remote Output Port", RECEIVE,
+ "http://nifi.example.com:8080/nifi-api/data-transfer/output-ports" +
+ "/392e7343-3950-329b-0000-000000000000/transactions/tx-1/flow-files"));
+
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+
+ final Node flow = lineage.findNode("nifi_flow", "S2SGet", "S2SGet@example");
+ final Node pathL = lineage.findNode("nifi_flow_path", "LogAttribute", "97cc5b27-22f3-3c3b");
+ final Node pathP = lineage.findNode("nifi_flow_path", "PutFile", "4f3bfa4c-6427-3aac");
+ final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "bb530e58-ee14-3cac");
+
+ // These entities should be created by notification.
+ final Node remoteOutputPortDataSet = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b");
+ final Node remoteOutputPortProcess = lineage.findNode("nifi_flow_path", "Remote Output Port", "7375f8f6-4604-468d");
+ final Node queueL = lineage.findNode("nifi_queue", "queue", "97cc5b27-22f3-3c3b");
+ final Node queueP = lineage.findNode("nifi_queue", "queue", "4f3bfa4c-6427-3aac");
+ final Node queueU = lineage.findNode("nifi_queue", "queue", "bb530e58-ee14-3cac");
+
+ lineage.assertLink(remoteOutputPortDataSet, remoteOutputPortProcess);
+
+ lineage.assertLink(flow, remoteOutputPortProcess);
+ lineage.assertLink(remoteOutputPortProcess, queueL);
+ lineage.assertLink(remoteOutputPortProcess, queueP);
+ lineage.assertLink(remoteOutputPortProcess, queueU);
+
+ lineage.assertLink(queueL, pathL);
+ lineage.assertLink(queueP, pathP);
+ lineage.assertLink(queueU, pathU);
+
+ }
+
+ /**
+ * A remote NiFi transfers FlowFiles to remote client NiFis.
+ * This NiFi instance owns RootProcessGroup output port.
+ */
+ @Test
+ public void testS2STransfer() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("S2STransfer");
+
+ final ProvenanceRecords prs = tc.provenanceRecords;
+ prs.add(pr("392e7343-3950-329b", "Output Port", SEND,
+ "http://nifi.example.com:8080/nifi-api/data-transfer/output-ports" +
+ "/392e7343-3950-329b-0000-000000000000/transactions/tx-1/flow-files"));
+
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+
+ final Node flow = lineage.findNode("nifi_flow", "S2STransfer", "S2STransfer@example");
+ final Node path = lineage.findNode("nifi_flow_path", "GenerateFlowFile, output", "1b9f81db-a0fd-389a");
+ final Node outputPort = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b");
+
+ lineage.assertLink(flow, path);
+ lineage.assertLink(path, outputPort);
+ }
+
+ /**
+ * A remote NiFi receives FlowFiles from remote client NiFis.
+ * This NiFi instance owns RootProcessGroup input port.
+ */
+ @Test
+ public void testS2SReceive() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("S2SReceive");
+
+ final ProvenanceRecords prs = tc.provenanceRecords;
+ prs.add(pr("77919f59-533e-35a3", "Input Port", RECEIVE,
+ "http://nifi.example.com:8080/nifi-api/data-transfer/output-ports" +
+ "/77919f59-533e-35a3-0000-000000000000/transactions/tx-1/flow-files"));
+
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+
+ final Node flow = lineage.findNode("nifi_flow", "S2SReceive", "S2SReceive@example");
+ final Node path = lineage.findNode("nifi_flow_path", "input, UpdateAttribute", "77919f59-533e-35a3");
+ final Node inputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+
+ lineage.assertLink(flow, path);
+ lineage.assertLink(flow, inputPort);
+
+ lineage.assertLink(inputPort, path);
+ }
+
+ @Test
+ public void testS2SReceiveAndSendCombination() throws Exception {
+ testS2SReceive();
+ testS2SSendSimple();
+
+ final Lineage lineage = getLineage();
+
+ final Node remoteFlow = lineage.findNode("nifi_flow", "S2SReceive", "S2SReceive@example");
+ final Node localFlow = lineage.findNode("nifi_flow", "S2SSend", "S2SSend@example");
+ final Node remoteInputPortQ = lineage.findNode("nifi_queue", "queue", "f31a6b53-3077-4c59");
+ final Node remoteInputPortP = lineage.findNode("nifi_flow_path", "Remote Input Port", "f31a6b53-3077-4c59");
+ final Node inputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+ final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491");
+ final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708");
+
+ // Remote flow owns the inputPort.
+ lineage.assertLink(remoteFlow, inputPort);
+
+ // These paths within local flow sends data to the remote flow through the remote input port.
+ lineage.assertLink(localFlow, pathC);
+ lineage.assertLink(localFlow, pathT);
+ lineage.assertLink(pathC, remoteInputPortQ);
+ lineage.assertLink(pathT, remoteInputPortQ);
+ lineage.assertLink(remoteInputPortQ, remoteInputPortP);
+ lineage.assertLink(remoteInputPortP, inputPort);
+
+ }
+
+ @Test
+ public void testS2STransferAndGetCombination() throws Exception {
+ testS2STransfer();
+ testS2SGet();
+
+ final Lineage lineage = getLineage();
+
+ final Node remoteFlow = lineage.findNode("nifi_flow", "S2STransfer", "S2STransfer@example");
+ final Node localFlow = lineage.findNode("nifi_flow", "S2SGet", "S2SGet@example");
+ final Node remoteGen = lineage.findNode("nifi_flow_path", "GenerateFlowFile, output", "1b9f81db-a0fd-389a");
+ final Node outputPort = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b");
+
+ final Node remoteOutputPortP = lineage.findNode("nifi_flow_path", "Remote Output Port", "7375f8f6-4604-468d");
+ final Node queueL = lineage.findNode("nifi_queue", "queue", "97cc5b27-22f3-3c3b");
+ final Node queueP = lineage.findNode("nifi_queue", "queue", "4f3bfa4c-6427-3aac");
+ final Node queueU = lineage.findNode("nifi_queue", "queue", "bb530e58-ee14-3cac");
+ final Node pathL = lineage.findNode("nifi_flow_path", "LogAttribute", "97cc5b27-22f3-3c3b");
+ final Node pathP = lineage.findNode("nifi_flow_path", "PutFile", "4f3bfa4c-6427-3aac");
+ final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "bb530e58-ee14-3cac");
+
+ // Remote flow owns the outputPort and transfer data generated by GenerateFlowFile.
+ lineage.assertLink(remoteFlow, remoteGen);
+ lineage.assertLink(remoteGen, outputPort);
+
+ // The Remote Output Port path in local flow gets data from the remote.
+ lineage.assertLink(localFlow, remoteOutputPortP);
+ lineage.assertLink(outputPort, remoteOutputPortP);
+ lineage.assertLink(remoteOutputPortP, queueL);
+ lineage.assertLink(remoteOutputPortP, queueP);
+ lineage.assertLink(remoteOutputPortP, queueU);
+ lineage.assertLink(queueL, pathL);
+ lineage.assertLink(queueP, pathP);
+ lineage.assertLink(queueU, pathU);
+
+ }
+
+ /**
+ * A client NiFi gets FlowFiles from a remote output port and sends it to a remote input port without doing anything.
+ */
+ @Test
+ public void testS2SDirect() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("S2SDirect");
+ final ProvenanceRecords prs = tc.provenanceRecords;
+
+ prs.add(pr("d73d9115-b987-4ffc", "Remote Output Port", RECEIVE,
+ "http://nifi.example.com:8080/nifi-api/data-transfer/output-ports" +
+ "/015f1040-dcd7-17bd-5c1f-e31afe0a09a4/transactions/tx-1/flow-files"));
+
+ prs.add((pr("a4f14247-89aa-4e6c", "Remote Input Port", SEND,
+ "http://nifi.example.com:8080/nifi-api/data-transfer/input-ports" +
+ "/015f101e-dcd7-17bd-8899-1a723733521a/transactions/tx-2/flow-files")));
+
+ Map<Long, ComputeLineageResult> lineages = tc.lineageResults;
+ // Received from remote output port, then sent it via remote input port
+ lineages.put(1L, createLineage(prs, 0, 1));
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+
+ final Node flow = lineage.findNode("nifi_flow", "S2SDirect", "S2SDirect@example");
+ final Node remoteOutputPort = lineage.findNode("nifi_output_port", "output", "015f1040-dcd7-17bd-5c1f-e31afe0a09a4@example");
+ final Node remoteOutputPortP = lineage.findNode("nifi_flow_path", "Remote Output Port", "d73d9115-b987-4ffc");
+ final Node remoteInputPortQ = lineage.findNode("nifi_queue", "queue", "a4f14247-89aa-4e6c");
+ final Node remoteInputPortP = lineage.findNode("nifi_flow_path", "Remote Input Port", "a4f14247-89aa-4e6c");
+ final Node remoteInputPort = lineage.findNode("nifi_input_port", "input", "015f101e-dcd7-17bd-8899-1a723733521a@example");
+
+ // Even if there is no Processor, lineage can be reported using root flow_path.
+ lineage.assertLink(flow, remoteOutputPortP);
+ lineage.assertLink(remoteOutputPort, remoteOutputPortP);
+ lineage.assertLink(remoteOutputPortP, remoteInputPortQ);
+ lineage.assertLink(remoteInputPortQ, remoteInputPortP);
+ lineage.assertLink(remoteInputPortP, remoteInputPort);
+ }
+
+ @Test
+ public void testRemoteInvocation() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("RemoteInvocation");
+ final ProvenanceRecords prs = tc.provenanceRecords;
+ prs.add(pr("2607ed95-c6ef-3636", "DeleteHDFS", REMOTE_INVOCATION, "hdfs://nn1.example.com:8020/test/2017-10-23"));
+ prs.add(pr("2607ed95-c6ef-3636", "DeleteHDFS", REMOTE_INVOCATION, "hdfs://nn1.example.com:8020/test/2017-10-24"));
+ prs.add(pr("2607ed95-c6ef-3636", "DeleteHDFS", REMOTE_INVOCATION, "hdfs://nn1.example.com:8020/test/2017-10-25"));
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+
+ final Node flow = lineage.findNode("nifi_flow", "RemoteInvocation", "RemoteInvocation@example");
+ final Node path = lineage.findNode("nifi_flow_path",
+ "DeleteHDFS",
+ "2607ed95-c6ef-3636");
+ final Node hdfsPath23 = lineage.findNode("hdfs_path", "/test/2017-10-23@example");
+ final Node hdfsPath24 = lineage.findNode("hdfs_path", "/test/2017-10-24@example");
+ final Node hdfsPath25 = lineage.findNode("hdfs_path", "/test/2017-10-25@example");
+ lineage.assertLink(flow, path);
+ lineage.assertLink(path, hdfsPath23);
+ lineage.assertLink(path, hdfsPath24);
+ lineage.assertLink(path, hdfsPath25);
+
+ }
+
+ @Test
+ public void testSimpleEventLevelSimplePath() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("SimpleEventLevel");
+ final ProvenanceRecords prs = tc.provenanceRecords;
+ prs.add(pr("d9257f7e-b78c-349a", "Generate A", CREATE));
+ prs.add(pr("d84b9bdc-5e42-3b3b", "Generate B", CREATE));
+
+ prs.add((pr("eaf013c1-aec5-39b0", "PutFile", SEND, "file:/tmp/nifi/a.txt")));
+ prs.add((pr("eaf013c1-aec5-39b0", "PutFile", SEND, "file:/tmp/nifi/b.txt")));
+
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+ final Node genA = lineage.findNode("nifi_data", "Generate A", "d9257f7e-b78c-349a");
+ final Node genB = lineage.findNode("nifi_data", "Generate B", "d84b9bdc-5e42-3b3b");
+
+ final Node genAPath = lineage.findNode("nifi_flow_path", "Generate A", "d9257f7e-b78c-349a");
+ final Node genBPath = lineage.findNode("nifi_flow_path", "Generate B", "d84b9bdc-5e42-3b3b");
+
+ final Node queue = lineage.findNode("nifi_queue", "queue", "eaf013c1-aec5-39b0");
+ final Node putFile = lineage.findNode("nifi_flow_path", "PutFile, LogAttribute", "eaf013c1-aec5-39b0");
+
+ final Node outA = lineage.findNode("fs_path", "/tmp/nifi/a.txt@example");
+ final Node outB = lineage.findNode("fs_path", "/tmp/nifi/b.txt@example");
+
+ lineage.assertLink(genA, genAPath);
+ lineage.assertLink(genAPath, queue);
+
+ lineage.assertLink(genB, genBPath);
+ lineage.assertLink(genBPath, queue);
+
+ lineage.assertLink(queue, putFile);
+ lineage.assertLink(putFile, outA);
+ lineage.assertLink(putFile, outB);
+ }
+
+ @Test
+ public void testSimpleEventLevelCompletePath() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("SimpleEventLevel");
+ tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
+ final ProvenanceRecords prs = tc.provenanceRecords;
+
+ String flowFileUUIDA = "A0000000-0000-0000";
+ String flowFileUUIDB = "B0000000-0000-0000";
+ prs.add(pr("d9257f7e-b78c-349a", "Generate A", CREATE, flowFileUUIDA));
+ prs.add(pr("d84b9bdc-5e42-3b3b", "Generate B", CREATE, flowFileUUIDB));
+
+ prs.add((pr("eaf013c1-aec5-39b0", "PutFile", SEND, "file:/tmp/nifi/a.txt", flowFileUUIDA)));
+ prs.add((pr("eaf013c1-aec5-39b0", "PutFile", SEND, "file:/tmp/nifi/b.txt", flowFileUUIDB)));
+
+ prs.add(pr("bfc30bc3-48cf-332a", "LogAttribute", DROP, flowFileUUIDA));
+ prs.add(pr("bfc30bc3-48cf-332a", "LogAttribute", DROP, flowFileUUIDB));
+
+ Map<Long, ComputeLineageResult> lineages = tc.lineageResults;
+ lineages.put(4L, createLineage(prs, 0, 2, 4));
+ lineages.put(5L, createLineage(prs, 1, 3, 5));
+
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+
+ final Node genA = lineage.findNode("nifi_data", "Generate A", "d9257f7e-b78c-349a");
+ final Node genB = lineage.findNode("nifi_data", "Generate B", "d84b9bdc-5e42-3b3b");
+
+ final Node genAPath = lineage.findNode("nifi_flow_path", "Generate A, PutFile, LogAttribute",
+ "d9257f7e-b78c-349a-0000-000000000000::980416504@example");
+ final Node genBPath = lineage.findNode("nifi_flow_path", "Generate B, PutFile, LogAttribute",
+ "d84b9bdc-5e42-3b3b-0000-000000000000::442259660@example");
+
+ final Node outA = lineage.findNode("fs_path", "/tmp/nifi/a.txt@example");
+ final Node outB = lineage.findNode("fs_path", "/tmp/nifi/b.txt@example");
+
+ lineage.assertLink(genA, genAPath);
+ lineage.assertLink(genB, genBPath);
+
+ lineage.assertLink(genAPath, outA);
+ lineage.assertLink(genBPath, outB);
+ }
+
+ @Test
+ public void testMergedEvents() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("MergedEvents");
+ tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
+ final ProvenanceRecords prs = tc.provenanceRecords;
+ final String flowFileUUIDA = "A0000000-0000-0000";
+ final String flowFileUUIDB = "B0000000-0000-0000";
+ final String flowFileUUIDC = "C0000000-0000-0000";
+ final String flowFileUUIDD = "D0000000-0000-0000";
+ // Merged B and C.
+ final String flowFileUUIDBC = "BC000000-0000-0000";
+ prs.add(pr("f585d83b-2a03-37cf", "Generate A", CREATE, flowFileUUIDA)); // 0
+ prs.add(pr("59a7c1f9-9a73-3cc6", "Generate B", CREATE, flowFileUUIDB)); // 1
+ prs.add(pr("d6c3f282-e03d-316c", "Generate C", CREATE, flowFileUUIDC)); // 2
+ prs.add(pr("f9593a5a-f0d5-3e87", "Generate D", CREATE, flowFileUUIDD)); // 3
+ // Original files are dropped.
+ prs.add(pr("c77dd033-bb9e-39ea", "MergeContent", JOIN, flowFileUUIDBC)); // 4
+ prs.add(pr("c77dd033-bb9e-39ea", "MergeContent", DROP, flowFileUUIDB)); // 5
+ prs.add(pr("c77dd033-bb9e-39ea", "MergeContent", DROP, flowFileUUIDC)); // 6
+
+ prs.add((pr("93f8ad14-6ee6-34c1", "PutFile", SEND, "file:/tmp/nifi/a.txt", flowFileUUIDA))); // 7
+ prs.add((pr("93f8ad14-6ee6-34c1", "PutFile", SEND, "file:/tmp/nifi/bc.txt", flowFileUUIDBC))); // 8
+ prs.add((pr("93f8ad14-6ee6-34c1", "PutFile", SEND, "file:/tmp/nifi/d.txt", flowFileUUIDD))); // 9
+
+ prs.add(pr("bfc30bc3-48cf-332a", "LogAttribute", DROP, flowFileUUIDA)); // 10
+ prs.add(pr("bfc30bc3-48cf-332a", "LogAttribute", DROP, flowFileUUIDBC)); // 11
+ prs.add(pr("bfc30bc3-48cf-332a", "LogAttribute", DROP, flowFileUUIDD)); // 12
+
+ Map<Long, ComputeLineageResult> lineages = tc.lineageResults;
+ final ComputeLineageResult lineageB = createLineage(prs, 1, 4, 5);
+ final ComputeLineageResult lineageC = createLineage(prs, 2, 4, 6);
+ lineages.put(5L, lineageB); // B
+ lineages.put(6L, lineageC); // C
+
+ lineages.put(10L, createLineage(prs, 0, 7, 10)); // A
+ lineages.put(11L, createLineage(prs, 4, 8, 11)); // BC
+ lineages.put(12L, createLineage(prs, 3, 9, 12)); // D
+
+ Map<Long, ComputeLineageResult> parents = tc.parentLineageResults;
+ parents.put(4L, compositeLineages(lineageB, lineageC));
+
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+
+ final Node genA = lineage.findNode("nifi_data", "Generate A", "f585d83b-2a03-37cf");
+ final Node genB = lineage.findNode("nifi_data", "Generate B", "59a7c1f9-9a73-3cc6");
+ final Node genC = lineage.findNode("nifi_data", "Generate C", "d6c3f282-e03d-316c");
+ final Node genD = lineage.findNode("nifi_data", "Generate D", "f9593a5a-f0d5-3e87");
+
+ final Node genAPath = lineage.findNode("nifi_flow_path", "Generate A, PutFile, LogAttribute",
+ "f585d83b-2a03-37cf-0000-000000000000::1003499964@example");
+ final Node genBPath = lineage.findNode("nifi_flow_path", "Generate B",
+ "59a7c1f9-9a73-3cc6-0000-000000000000::45412830@example");
+ final Node genCPath = lineage.findNode("nifi_flow_path", "Generate C",
+ "d6c3f282-e03d-316c-0000-000000000000::1968410985@example");
+ final Node genDPath = lineage.findNode("nifi_flow_path", "Generate D, PutFile, LogAttribute",
+ "f9593a5a-f0d5-3e87-0000-000000000000::4257576567@example");
+
+ lineage.assertLink(genA, genAPath);
+ lineage.assertLink(genB, genBPath);
+ lineage.assertLink(genC, genCPath);
+ lineage.assertLink(genD, genDPath);
+
+ // B and C were merged together, while A and D were processed individually.
+ final Node joinBC = lineage.findNode("nifi_queue", "JOIN", "c77dd033-bb9e-39ea-0000-000000000000::2370367315@example");
+ final Node bcPath = lineage.findNode("nifi_flow_path", "MergeContent, PutFile, LogAttribute",
+ "c77dd033-bb9e-39ea-0000-000000000000::2370367315@example");
+ lineage.assertLink(genBPath, joinBC);
+ lineage.assertLink(genCPath, joinBC);
+ lineage.assertLink(joinBC, bcPath);
+
+ final Node outA = lineage.findNode("fs_path", "/tmp/nifi/a.txt@example");
+ final Node outBC = lineage.findNode("fs_path", "/tmp/nifi/bc.txt@example");
+ final Node outD = lineage.findNode("fs_path", "/tmp/nifi/d.txt@example");
+ lineage.assertLink(genAPath, outA);
+ lineage.assertLink(bcPath, outBC);
+ lineage.assertLink(genDPath, outD);
+
+ }
+
+ @Test
+ public void testRecordAndDataSetLevel() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("RecordAndDataSetLevel");
+ tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
+ final ProvenanceRecords prs = tc.provenanceRecords;
+
+ // Publish part
+ final String ffIdA1 = "A1000000";
+ final String ffIdB1 = "B1000000";
+ prs.add(pr("22be62d9-c4a1-3056", "GetFile", RECEIVE, "file:/tmp/input/A1.csv", ffIdA1)); // 0
+ prs.add(pr("22be62d9-c4a1-3056", "GetFile", RECEIVE, "file:/tmp/input/B1.csv", ffIdB1)); // 1
+
+ prs.add(pr("eaf013c1-aec5-39b0", "PutFile", SEND, "file:/tmp/output/A1.csv", ffIdA1)); // 2
+ prs.add(pr("eaf013c1-aec5-39b0", "PutFile", SEND, "file:/tmp/output/B1.csv", ffIdB1)); // 3
+
+ prs.add(pr("97641de3-fb76-3d95", "PublishKafkaRecord_0_10", SEND, "PLAINTEXT://localhost:9092/nifi-test", ffIdA1)); // 4
+ prs.add(pr("97641de3-fb76-3d95", "PublishKafkaRecord_0_10", SEND, "PLAINTEXT://localhost:9092/nifi-test", ffIdB1)); // 5
+
+ prs.add(pr("97641de3-fb76-3d95", "PublishKafkaRecord_0_10", DROP, ffIdA1)); // 6
+ prs.add(pr("97641de3-fb76-3d95", "PublishKafkaRecord_0_10", DROP, ffIdB1)); // 7
+
+ // Consume part
+ final String ffIdK1 = "K1000000";
+ final String ffIdA2 = "A2000000"; // Forked children
+ final String ffIdB2 = "B2000000"; // Forked children
+ prs.add(pr("529e6722-9b49-3b66", "ConsumeKafkaRecord_0_10", RECEIVE, "PLAINTEXT://localhost:9092/nifi-test", ffIdK1)); // 8
+ prs.add(pr("3f6d405e-6e3d-38c9", "PartitionRecord", FORK, ffIdK1)); // 9
+ prs.add(pr("db8bb12c-5cd3-3011", "UpdateAttribute", ATTRIBUTES_MODIFIED, ffIdA2)); // 10
+ prs.add(pr("db8bb12c-5cd3-3011", "UpdateAttribute", ATTRIBUTES_MODIFIED, ffIdB2)); // 11
+ prs.add(pr("062caf95-da40-3a57", "PutFile", SEND, "file:/tmp/consumed/A_20171101_100701.csv", ffIdA2)); // 12
+ prs.add(pr("062caf95-da40-3a57", "PutFile", SEND, "file:/tmp/consumed/B_20171101_100701.csv", ffIdB2)); // 13
+ prs.add(pr("062caf95-da40-3a57", "PutFile", DROP, ffIdA2)); // 14
+ prs.add(pr("062caf95-da40-3a57", "PutFile", DROP, ffIdB2)); // 15
+
+
+ Map<Long, ComputeLineageResult> lineages = tc.lineageResults;
+ Map<Long, ComputeLineageResult> parents = tc.parentLineageResults;
+ lineages.put(6L, createLineage(prs, 0, 2, 4, 6)); // Publish A1
+ lineages.put(7L, createLineage(prs, 1, 3, 5, 7)); // Publish B1
+ parents.put(9L, createLineage(prs, 8, 9)); // Consumed and Forked K1
+ lineages.put(14L, createLineage(prs, 9, 10, 12, 14)); // Processed A2
+ lineages.put(15L, createLineage(prs, 9, 11, 13, 15)); // Processed B2
+
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+
+ // Publish part
+ final Node inputFileA1 = lineage.findNode("fs_path", "/tmp/input/A1.csv@example");
+ final Node inputFileB1 = lineage.findNode("fs_path", "/tmp/input/B1.csv@example");
+ // These two flow paths are derived from the same set of Processors, but with different input files, and resulted different hashes.
+ final Node getFileToPublishKafkaA = lineage.findNode("nifi_flow_path", "GetFile, PutFile, PublishKafkaRecord_0_10",
+ "22be62d9-c4a1-3056-0000-000000000000::2823953997@example");
+ final Node getFileToPublishKafkaB = lineage.findNode("nifi_flow_path", "GetFile, PutFile, PublishKafkaRecord_0_10",
+ "22be62d9-c4a1-3056-0000-000000000000::568010061@example");
+
+ lineage.assertLink(inputFileA1, getFileToPublishKafkaA);
+ lineage.assertLink(inputFileB1, getFileToPublishKafkaB);
+
+ final Node nifiTestTopic = lineage.findNode("kafka_topic", "nifi-test@example");
+ final Node outputFileA = lineage.findNode("fs_path", "/tmp/output/A1.csv@example");
+ final Node outputFileB = lineage.findNode("fs_path", "/tmp/output/B1.csv@example");
+ lineage.assertLink(getFileToPublishKafkaA, nifiTestTopic);
+ lineage.assertLink(getFileToPublishKafkaB, nifiTestTopic);
+ lineage.assertLink(getFileToPublishKafkaA, outputFileA);
+ lineage.assertLink(getFileToPublishKafkaB, outputFileB);
+
+ // Consume part
+ final Node consumeNifiTestTopic = lineage.findNode("nifi_flow_path", "ConsumeKafkaRecord_0_10",
+ "529e6722-9b49-3b66-0000-000000000000::3649132843@example");
+ final Node forkedA = lineage.findNode("nifi_queue", "FORK",
+ "3f6d405e-6e3d-38c9-0000-000000000000::234149075@example");
+ final Node forkedB = lineage.findNode("nifi_queue", "FORK",
+ "3f6d405e-6e3d-38c9-0000-000000000000::2377021542@example");
+ lineage.assertLink(consumeNifiTestTopic, forkedA);
+ lineage.assertLink(consumeNifiTestTopic, forkedB);
+
+ final Node partitionToPutA = lineage.findNode("nifi_flow_path", "PartitionRecord, UpdateAttribute, PutFile",
+ "3f6d405e-6e3d-38c9-0000-000000000000::234149075@example");
+ final Node partitionToPutB = lineage.findNode("nifi_flow_path", "PartitionRecord, UpdateAttribute, PutFile",
+ "3f6d405e-6e3d-38c9-0000-000000000000::2377021542@example");
+ final Node consumedFileA = lineage.findNode("fs_path", "/tmp/consumed/A_20171101_100701.csv@example");
+ final Node consumedFileB = lineage.findNode("fs_path", "/tmp/consumed/B_20171101_100701.csv@example");
+ lineage.assertLink(forkedA, partitionToPutA);
+ lineage.assertLink(forkedB, partitionToPutB);
+ lineage.assertLink(partitionToPutA, consumedFileA);
+ lineage.assertLink(partitionToPutB, consumedFileB);
+ }
+
+ @Test
+ public void testMultiInAndOuts() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("MultiInAndOuts");
+ final ProvenanceRecords prs = tc.provenanceRecords;
+
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+
+ final Node gen1 = lineage.findNode("nifi_flow_path", "Gen1", "a4bfe4ec-570b-3126");
+ final Node gen2 = lineage.findNode("nifi_flow_path", "Gen2", "894218d5-dfe9-3ee5");
+ final Node ua1 = lineage.findNode("nifi_flow_path", "UA1", "5609cb4f-8a95-3b7a");
+ final Node ua2 = lineage.findNode("nifi_flow_path", "UA2", "6f88b3d9-5723-356a");
+ final Node ua3 = lineage.findNode("nifi_flow_path", "UA3, UA4, LogAttribute", "3250aeb6-4026-3969");
+ final Node ua1Q = lineage.findNode("nifi_queue", "queue", "5609cb4f-8a95-3b7a");
+ final Node ua2Q = lineage.findNode("nifi_queue", "queue", "6f88b3d9-5723-356a");
+ final Node ua3Q = lineage.findNode("nifi_queue", "queue", "3250aeb6-4026-3969");
+
+ lineage.assertLink(gen1, ua1Q);
+ lineage.assertLink(gen1, ua2Q);
+
+ lineage.assertLink(gen2, ua2Q);
+
+ lineage.assertLink(ua1Q, ua1);
+ lineage.assertLink(ua2Q, ua2);
+
+ lineage.assertLink(ua1, ua3Q);
+ lineage.assertLink(ua2, ua3Q);
+ lineage.assertLink(ua3Q, ua3);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/SimpleProvenanceRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/SimpleProvenanceRecord.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/SimpleProvenanceRecord.java
new file mode 100644
index 0000000..f02cc88
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/SimpleProvenanceRecord.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SimpleProvenanceRecord implements ProvenanceEventRecord {
+ private long eventId;
+ private String componentId;
+ private String componentType;
+ private String transitUri;
+ private String flowFileUUID;
+ private ProvenanceEventType eventType;
+ private Map<String, String> attributes = new HashMap<>();
+
+ public static SimpleProvenanceRecord pr(String componentId, String componentType, ProvenanceEventType eventType) {
+ return pr(componentId, componentType, eventType, null, null);
+ }
+ public static SimpleProvenanceRecord pr(String componentId, String componentType, ProvenanceEventType eventType, String transitUri) {
+ return pr(componentId, componentType, eventType, transitUri, null);
+ }
+ public static SimpleProvenanceRecord pr(String componentId, String componentType, ProvenanceEventType eventType, String transitUri, String flowFileUUID) {
+ final SimpleProvenanceRecord pr = new SimpleProvenanceRecord();
+ pr.componentId = componentId.length() == 18 ? componentId + "-0000-000000000000" : componentId;
+ pr.componentType = componentType;
+ pr.transitUri = transitUri;
+ pr.eventType = eventType;
+ pr.flowFileUUID = flowFileUUID;
+ return pr;
+ }
+
+ public void setEventId(long eventId) {
+ this.eventId = eventId;
+ }
+
+ @Override
+ public String getComponentId() {
+ return componentId;
+ }
+
+ @Override
+ public String getComponentType() {
+ return componentType;
+ }
+
+ @Override
+ public String getTransitUri() {
+ return transitUri;
+ }
+
+ @Override
+ public ProvenanceEventType getEventType() {
+ return eventType;
+ }
+
+ @Override
+ public Map<String, String> getAttributes() {
+ return attributes;
+ }
+
+ @Override
+ public long getEventId() {
+ return eventId;
+ }
+
+ @Override
+ public long getEventTime() {
+ return 0;
+ }
+
+ @Override
+ public long getFlowFileEntryDate() {
+ return 0;
+ }
+
+ @Override
+ public long getLineageStartDate() {
+ return 0;
+ }
+
+ @Override
+ public long getFileSize() {
+ return 0;
+ }
+
+ @Override
+ public Long getPreviousFileSize() {
+ return null;
+ }
+
+ @Override
+ public long getEventDuration() {
+ return 0;
+ }
+
+ @Override
+ public Map<String, String> getPreviousAttributes() {
+ return null;
+ }
+
+ @Override
+ public Map<String, String> getUpdatedAttributes() {
+ return null;
+ }
+
+ @Override
+ public String getSourceSystemFlowFileIdentifier() {
+ return null;
+ }
+
+ @Override
+ public String getFlowFileUuid() {
+ return null;
+ }
+
+ @Override
+ public List<String> getParentUuids() {
+ return null;
+ }
+
+ @Override
+ public List<String> getChildUuids() {
+ return null;
+ }
+
+ @Override
+ public String getAlternateIdentifierUri() {
+ return null;
+ }
+
+ @Override
+ public String getDetails() {
+ return null;
+ }
+
+ @Override
+ public String getRelationship() {
+ return null;
+ }
+
+ @Override
+ public String getSourceQueueIdentifier() {
+ return null;
+ }
+
+ @Override
+ public String getContentClaimSection() {
+ return null;
+ }
+
+ @Override
+ public String getPreviousContentClaimSection() {
+ return null;
+ }
+
+ @Override
+ public String getContentClaimContainer() {
+ return null;
+ }
+
+ @Override
+ public String getPreviousContentClaimContainer() {
+ return null;
+ }
+
+ @Override
+ public String getContentClaimIdentifier() {
+ return null;
+ }
+
+ @Override
+ public String getPreviousContentClaimIdentifier() {
+ return null;
+ }
+
+ @Override
+ public Long getContentClaimOffset() {
+ return null;
+ }
+
+ @Override
+ public Long getPreviousContentClaimOffset() {
+ return null;
+ }
+
+ @Override
+ public String getBestEventIdentifier() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
new file mode 100644
index 0000000..ae1d63d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockValidationContext;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_NIFI_URL;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_PASSWORD;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_URLS;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_USER;
+import static org.junit.Assert.assertTrue;
+
+public class TestReportLineageToAtlas {
+
+ private final Logger logger = LoggerFactory.getLogger(TestReportLineageToAtlas.class);
+
+ @Test
+ public void validateAtlasUrls() throws Exception {
+ final ReportLineageToAtlas reportingTask = new ReportLineageToAtlas();
+ final MockProcessContext processContext = new MockProcessContext(reportingTask);
+ final MockValidationContext validationContext = new MockValidationContext(processContext);
+
+ processContext.setProperty(ATLAS_NIFI_URL, "http://nifi.example.com:8080/nifi");
+ processContext.setProperty(ATLAS_USER, "admin");
+ processContext.setProperty(ATLAS_PASSWORD, "admin");
+
+ BiConsumer<Collection<ValidationResult>, Consumer<ValidationResult>> assertResults = (rs, a) -> {
+ assertTrue(rs.iterator().hasNext());
+ for (ValidationResult r : rs) {
+ logger.info("{}", r);
+ final String subject = r.getSubject();
+ if (ATLAS_URLS.getDisplayName().equals(subject)) {
+ a.accept(r);
+ }
+ }
+ };
+
+ // Default setting.
+ assertResults.accept(reportingTask.validate(validationContext),
+ r -> assertTrue("Atlas URLs is required", !r.isValid()));
+
+
+ // Invalid URL.
+ processContext.setProperty(ATLAS_URLS, "invalid");
+ assertResults.accept(reportingTask.validate(validationContext),
+ r -> assertTrue("Atlas URLs is invalid", !r.isValid()));
+
+ // Valid URL
+ processContext.setProperty(ATLAS_URLS, "http://atlas.example.com:21000");
+ assertTrue(processContext.isValid());
+
+ // Valid URL with Expression
+ processContext.setProperty(ATLAS_URLS, "http://atlas.example.com:${literal(21000)}");
+ assertTrue(processContext.isValid());
+
+ // Valid URLs
+ processContext.setProperty(ATLAS_URLS, "http://atlas1.example.com:21000, http://atlas2.example.com:21000");
+ assertTrue(processContext.isValid());
+
+ // Invalid and Valid URLs
+ processContext.setProperty(ATLAS_URLS, "invalid, http://atlas2.example.com:21000");
+ assertResults.accept(reportingTask.validate(validationContext),
+ r -> assertTrue("Atlas URLs is invalid", !r.isValid()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/resolver/TestRegexClusterResolver.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/resolver/TestRegexClusterResolver.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/resolver/TestRegexClusterResolver.java
new file mode 100644
index 0000000..bccd8c0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/resolver/TestRegexClusterResolver.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.resolver;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.when;
+
+public class TestRegexClusterResolver {
+
+ private PropertyContext context;
+ private ValidationContext validationContext;
+
+ public void setupMock(Map<String, String> properties) {
+ context = Mockito.mock(PropertyContext.class);
+ validationContext = Mockito.mock(ValidationContext.class);
+ when(validationContext.getAllProperties()).thenReturn(properties);
+ when(context.getAllProperties()).thenReturn(properties);
+ }
+
+ @Test
+ public void testEmptySettings() {
+ setupMock(Collections.EMPTY_MAP);
+ final RegexClusterResolver resolver = new RegexClusterResolver();
+
+ // It should be valid
+ final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
+ Assert.assertEquals(0, validationResults.size());
+ resolver.configure(context);
+
+ Assert.assertNull(resolver.fromHostNames("example.com"));
+ }
+
+ @Test
+ public void testInvalidClusterName() {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put(RegexClusterResolver.PATTERN_PROPERTY_PREFIX, ".*\\.example.com");
+ setupMock(properties);
+ final RegexClusterResolver resolver = new RegexClusterResolver();
+
+ final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
+ Assert.assertEquals(1, validationResults.size());
+ final ValidationResult validationResult = validationResults.iterator().next();
+ Assert.assertEquals(RegexClusterResolver.PATTERN_PROPERTY_PREFIX, validationResult.getSubject());
+
+ try {
+ resolver.configure(context);
+ Assert.fail("Configure method should fail, too");
+ } catch (IllegalArgumentException e) {
+ }
+ }
+
+ @Test
+ public void testEmptyPattern() {
+ final Map<String, String> properties = new HashMap<>();
+ final String propertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1";
+ properties.put(propertyName, "");
+ setupMock(properties);
+ final RegexClusterResolver resolver = new RegexClusterResolver();
+
+ final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
+ Assert.assertEquals(1, validationResults.size());
+ final ValidationResult validationResult = validationResults.iterator().next();
+ Assert.assertEquals(propertyName, validationResult.getSubject());
+
+ try {
+ resolver.configure(context);
+ Assert.fail("Configure method should fail, too");
+ } catch (IllegalArgumentException e) {
+ }
+ }
+
+ @Test
+ public void testSinglePattern() {
+ final Map<String, String> properties = new HashMap<>();
+ final String propertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1";
+ properties.put(propertyName, "^.*\\.example.com$");
+ setupMock(properties);
+ final RegexClusterResolver resolver = new RegexClusterResolver();
+
+ final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
+ Assert.assertEquals(0, validationResults.size());
+
+ resolver.configure(context);
+
+ Assert.assertEquals("Cluster1", resolver.fromHostNames("host1.example.com"));
+ }
+
+ @Test
+ public void testMultiplePatterns() {
+ final Map<String, String> properties = new HashMap<>();
+ final String propertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1";
+ // Hostname or local ip address, delimited with a whitespace
+ properties.put(propertyName, "^.*\\.example.com$\n^192.168.1.[\\d]+$");
+ setupMock(properties);
+ final RegexClusterResolver resolver = new RegexClusterResolver();
+
+ final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
+ Assert.assertEquals(0, validationResults.size());
+
+ resolver.configure(context);
+
+ Assert.assertEquals("Cluster1", resolver.fromHostNames("host1.example.com"));
+ Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.10"));
+ Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.22"));
+ Assert.assertNull(resolver.fromHostNames("192.168.2.30"));
+ }
+
+ @Test
+ public void testMultipleClusters() {
+ final Map<String, String> properties = new HashMap<>();
+ final String c1PropertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1";
+ final String c2PropertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster2";
+ // Hostname or local ip address
+ properties.put(c1PropertyName, "^.*\\.c1\\.example.com$ ^192.168.1.[\\d]+$");
+ properties.put(c2PropertyName, "^.*\\.c2\\.example.com$ ^192.168.2.[\\d]+$");
+ setupMock(properties);
+ final RegexClusterResolver resolver = new RegexClusterResolver();
+
+ final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
+ Assert.assertEquals(0, validationResults.size());
+
+ resolver.configure(context);
+
+ Assert.assertEquals("Cluster1", resolver.fromHostNames("host1.c1.example.com"));
+ Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.10"));
+ Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.22"));
+ Assert.assertEquals("Cluster2", resolver.fromHostNames("host2.c2.example.com"));
+ Assert.assertEquals("Cluster2", resolver.fromHostNames("192.168.2.10"));
+ Assert.assertEquals("Cluster2", resolver.fromHostNames("192.168.2.22"));
+ Assert.assertNull(resolver.fromHostNames("192.168.3.30"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties
new file mode 100644
index 0000000..927347d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+atlas.cluster.name=AtlasCluster
+
+# atlas.kafka.bootstrap.servers=atlas.example.com:6667
+atlas.kafka.bootstrap.servers=localhost:9092
|