nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [07/18] nifi git commit: NIFI-3709: Export NiFi flow dataset lineage to Apache Atlas
Date Mon, 18 Dec 2017 17:25:01 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
new file mode 100644
index 0000000..9e1a92c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_INPUT_TABLES;
+import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_OUTPUT_TABLES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.when;
+
+public class TestHive2JDBC {
+
+    /**
+     * If a provenance event does not have table name attributes,
+     * then a database lineage should be created.
+     */
+    @Test
+    public void testDatabaseLineage() {
+        final String processorName = "PutHiveQL";
+        final String transitUri = "jdbc:hive2://0.example.com:10000/databaseA";
+        final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
+        when(record.getComponentType()).thenReturn(processorName);
+        when(record.getTransitUri()).thenReturn(transitUri);
+        when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName,
transitUri, record.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, record);
+        assertEquals(0, refs.getInputs().size());
+        assertEquals(1, refs.getOutputs().size());
+        Referenceable ref = refs.getOutputs().iterator().next();
+        assertEquals("hive_db", ref.getTypeName());
+        assertEquals("databaseA", ref.get(ATTR_NAME));
+        assertEquals("databaseA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+    /**
+     * If a provenance event has table name attributes,
+     * then table lineages can be created.
+     */
+    @Test
+    public void testTableLineage() {
+        final String processorName = "PutHiveQL";
+        final String transitUri = "jdbc:hive2://0.example.com:10000/databaseA";
+        final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
+        when(record.getComponentType()).thenReturn(processorName);
+        when(record.getTransitUri()).thenReturn(transitUri);
+        when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
+        // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join
tableA2 a2 where a1.id = a2.id
+        when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2");
+        when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName,
transitUri, record.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, record);
+        assertEquals(2, refs.getInputs().size());
+        // QualifiedName : Name
+        final Map<String, String> expectedInputRefs = new HashMap<>();
+        expectedInputRefs.put("databaseA.tableA1@cluster1", "tableA1");
+        expectedInputRefs.put("databaseA.tableA2@cluster1", "tableA2");
+        for (Referenceable ref : refs.getInputs()) {
+            final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
+            assertTrue(expectedInputRefs.containsKey(qName));
+            assertEquals(expectedInputRefs.get(qName), ref.get(ATTR_NAME));
+        }
+
+        assertEquals(1, refs.getOutputs().size());
+        Referenceable ref = refs.getOutputs().iterator().next();
+        assertEquals("hive_table", ref.getTypeName());
+        assertEquals("tableB1", ref.get(ATTR_NAME));
+        assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+    /**
+     * If a provenance event has table name attributes, then table lineages can be created.
+     * In this case, if its transit URI does not contain database name, use 'default'.
+     */
+    @Test
+    public void testTableLineageWithDefaultTableName() {
+        final String processorName = "PutHiveQL";
+        final String transitUri = "jdbc:hive2://0.example.com:10000";
+        final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
+        when(record.getComponentType()).thenReturn(processorName);
+        when(record.getTransitUri()).thenReturn(transitUri);
+        when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
+        // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join
tableA2 a2 where a1.id = a2.id
+        when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2");
+        when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName,
transitUri, record.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, record);
+        assertEquals(2, refs.getInputs().size());
+        // QualifiedName : Name
+        final Map<String, String> expectedInputRefs = new HashMap<>();
+        expectedInputRefs.put("default.tableA1@cluster1", "tableA1");
+        expectedInputRefs.put("default.tableA2@cluster1", "tableA2");
+        for (Referenceable ref : refs.getInputs()) {
+            final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
+            assertTrue(expectedInputRefs.containsKey(qName));
+            assertEquals(expectedInputRefs.get(qName), ref.get(ATTR_NAME));
+        }
+
+        assertEquals(1, refs.getOutputs().size());
+        Referenceable ref = refs.getOutputs().iterator().next();
+        assertEquals("hive_table", ref.getTypeName());
+        assertEquals("tableB1", ref.get(ATTR_NAME));
+        assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java
new file mode 100644
index 0000000..5c0fd0e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.when;
+
+public class TestKafkaTopic {
+
+    @Test
+    public void testPublishKafka() {
+        final String processorName = "PublishKafka";
+        final String transitUri = "PLAINTEXT://0.example.com:6667/topicA";
+        final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
+        when(record.getComponentType()).thenReturn(processorName);
+        when(record.getTransitUri()).thenReturn(transitUri);
+        when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName,
transitUri, record.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, record);
+        assertEquals(0, refs.getInputs().size());
+        assertEquals(1, refs.getOutputs().size());
+        Referenceable ref = refs.getOutputs().iterator().next();
+        assertEquals("topicA", ref.get(ATTR_NAME));
+        assertEquals("topicA", ref.get("topic"));
+        assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+    @Test
+    public void testPublishKafkaMultipleBrokers() {
+        final String processorName = "PublishKafka";
+        final String transitUri = "PLAINTEXT://0.example.com:6667,1.example.com:6667/topicA";
+        final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
+        when(record.getComponentType()).thenReturn(processorName);
+        when(record.getTransitUri()).thenReturn(transitUri);
+        when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName,
transitUri, record.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, record);
+        assertEquals(0, refs.getInputs().size());
+        assertEquals(1, refs.getOutputs().size());
+        Referenceable ref = refs.getOutputs().iterator().next();
+        assertEquals("topicA", ref.get(ATTR_NAME));
+        assertEquals("topicA", ref.get("topic"));
+        assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+    @Test
+    public void testConsumeKafka() {
+        final String processorName = "ConsumeKafka";
+        final String transitUri = "PLAINTEXT://0.example.com:6667/topicA";
+        final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
+        when(record.getComponentType()).thenReturn(processorName);
+        when(record.getTransitUri()).thenReturn(transitUri);
+        when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName,
transitUri, record.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, record);
+        assertEquals(1, refs.getInputs().size());
+        assertEquals(0, refs.getOutputs().size());
+        Referenceable ref = refs.getInputs().iterator().next();
+        assertEquals("kafka_topic", ref.getTypeName());
+        assertEquals("topicA", ref.get(ATTR_NAME));
+        assertEquals("topicA", ref.get("topic"));
+        assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+    @Test
+    public void testConsumeKafkaRecord_0_10() {
+        final String processorName = "ConsumeKafkaRecord_0_10";
+        final String transitUri = "PLAINTEXT://0.example.com:6667/topicA";
+        final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
+        when(record.getComponentType()).thenReturn(processorName);
+        when(record.getTransitUri()).thenReturn(transitUri);
+        when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName,
transitUri, record.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, record);
+        assertEquals(1, refs.getInputs().size());
+        assertEquals(0, refs.getOutputs().size());
+        Referenceable ref = refs.getInputs().iterator().next();
+        assertEquals("kafka_topic", ref.getTypeName());
+        assertEquals("topicA", ref.get(ATTR_NAME));
+        assertEquals("topicA", ref.get("topic"));
+        assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
new file mode 100644
index 0000000..3040d50
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
+import org.apache.nifi.atlas.reporting.ITReportLineageToAtlas;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for RemotePorts.
+ * More complex and detailed tests are available at {@link ITReportLineageToAtlas}.
+ */
+public class TestNiFiRemotePort {
+
+    @Test
+    public void testRemoteInputPort() {
+        final String componentType = "Remote Input Port";
+        final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/input-ports/port-guid/transactions/tx-guid/flow-files";
+        final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class);
+        when(sendEvent.getEventId()).thenReturn(123L);
+        when(sendEvent.getComponentId()).thenReturn("port-guid");
+        when(sendEvent.getComponentType()).thenReturn(componentType);
+        when(sendEvent.getTransitUri()).thenReturn(transitUri);
+        when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND);
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final List<ConnectionStatus> connections = new ArrayList<>();
+        final ConnectionStatus connection = new ConnectionStatus();
+        connection.setDestinationId("port-guid");
+        connection.setDestinationName("inputPortA");
+        connections.add(connection);
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+        when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType,
transitUri, sendEvent.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, sendEvent);
+        assertEquals(0, refs.getInputs().size());
+        assertEquals(1, refs.getOutputs().size());
+        assertEquals(1, refs.getComponentIds().size());
+        // Should report connected componentId.
+        assertTrue(refs.getComponentIds().contains("port-guid"));
+
+        Referenceable ref = refs.getOutputs().iterator().next();
+        assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName());
+        assertEquals("inputPortA", ref.get(ATTR_NAME));
+        assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+    @Test
+    public void testRemoteOutputPort() {
+        final String componentType = "Remote Output Port";
+        final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/output-ports/port-guid/transactions/tx-guid/flow-files";
+        final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
+        when(record.getComponentId()).thenReturn("port-guid");
+        when(record.getComponentType()).thenReturn(componentType);
+        when(record.getTransitUri()).thenReturn(transitUri);
+        when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final List<ConnectionStatus> connections = new ArrayList<>();
+        final ConnectionStatus connection = new ConnectionStatus();
+        connection.setSourceId("port-guid");
+        connection.setSourceName("outputPortA");
+        connections.add(connection);
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+        when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType,
transitUri, record.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, record);
+        assertEquals(1, refs.getInputs().size());
+        assertEquals(0, refs.getOutputs().size());
+        Referenceable ref = refs.getInputs().iterator().next();
+        assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName());
+        assertEquals("outputPortA", ref.get(ATTR_NAME));
+        assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java
new file mode 100644
index 0000000..3398dfa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_OUTPUT_TABLES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.when;
+
+public class TestPutHiveStreaming {
+
+    @Test
+    public void testTableLineage() {
+        final String processorName = "PutHiveStreaming";
+        final String transitUri = "thrift://0.example.com:9083";
+        final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
+        when(record.getComponentType()).thenReturn(processorName);
+        when(record.getTransitUri()).thenReturn(transitUri);
+        when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
+        when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseA.tableA");
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName,
transitUri, record.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, record);
+        assertEquals(0, refs.getInputs().size());
+        assertEquals(1, refs.getOutputs().size());
+        Referenceable ref = refs.getOutputs().iterator().next();
+        assertEquals("hive_table", ref.getTypeName());
+        assertEquals("tableA", ref.get(ATTR_NAME));
+        assertEquals("databaseA.tableA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java
new file mode 100644
index 0000000..f4cfe0d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.when;
+
+public class TestUnknownDataSet {
+
+    @Test
+    public void testGenerateFlowFile() {
+        final String processorName = "GenerateFlowFile";
+        final String processorId = "processor-1234";
+        final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
+        when(record.getComponentType()).thenReturn(processorName);
+        when(record.getComponentId()).thenReturn(processorId);
+        when(record.getEventType()).thenReturn(ProvenanceEventType.CREATE);
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final List<ConnectionStatus> connections = new ArrayList<>();
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+        when(context.findConnectionTo(processorId)).thenReturn(connections);
+        when(context.getNiFiClusterName()).thenReturn("nifi-cluster");
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName,
null, record.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, record);
+        assertEquals(1, refs.getInputs().size());
+        assertEquals(0, refs.getOutputs().size());
+        Referenceable ref = refs.getInputs().iterator().next();
+        assertEquals("nifi_data", ref.getTypeName());
+        assertEquals("GenerateFlowFile", ref.get(ATTR_NAME));
+        assertEquals("processor-1234@nifi-cluster", ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+    @Test
+    public void testSomethingHavingIncomingConnection() {
+        final String processorName = "SomeProcessor";
+        final String processorId = "processor-1234";
+        final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
+        when(record.getComponentType()).thenReturn(processorName);
+        when(record.getComponentId()).thenReturn(processorId);
+        when(record.getEventType()).thenReturn(ProvenanceEventType.CREATE);
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final List<ConnectionStatus> connections = new ArrayList<>();
+        // The content of connection is not important, just create an empty status.
+        connections.add(new ConnectionStatus());
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+        when(context.findConnectionTo(processorId)).thenReturn(connections);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName,
null, record.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, record);
+        assertNull("If the processor has incoming connections, no refs should be created",
refs);
+    }
+
+}


Mime
View raw message