nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pvill...@apache.org
Subject [nifi] branch main updated: NIFI-7943 - Add application properties to GetAzureEventHub
Date Mon, 26 Oct 2020 18:36:42 GMT
This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a11d23  NIFI-7943 - Add application properties to GetAzureEventHub
9a11d23 is described below

commit 9a11d23c833a941106e464b6d6d09bd0b3ffbba6
Author: Fabio Torchetti <Fabio.Torchetti@kumandgo.com>
AuthorDate: Fri Oct 23 11:52:11 2020 -0500

    NIFI-7943 - Add application properties to GetAzureEventHub
    
    Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
    
    This closes #4617.
---
 .../nifi-azure-bundle/nifi-azure-processors/pom.xml    |  4 ++--
 .../azure/eventhub/ConsumeAzureEventHub.java           |  8 ++++++--
 .../processors/azure/eventhub/GetAzureEventHub.java    |  6 +++++-
 .../azure/eventhub/utils/AzureEventHubUtils.java       | 16 ++++++++++++++++
 .../azure/eventhub/GetAzureEventHubTest.java           | 17 ++++++++++++++++-
 .../azure/eventhub/TestConsumeAzureEventHub.java       | 18 ++++++++++++++++++
 6 files changed, 63 insertions(+), 6 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 2837f3d..253a43d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -20,8 +20,8 @@
     <artifactId>nifi-azure-processors</artifactId>
     <packaging>jar</packaging>
     <properties>
-        <azure-eventhubs.version>3.1.1</azure-eventhubs.version>
-        <azure-eventhubs-eph.version>3.1.1</azure-eventhubs-eph.version>
+        <azure-eventhubs.version>3.2.1</azure-eventhubs.version>
+        <azure-eventhubs-eph.version>3.2.1</azure-eventhubs-eph.version>
     </properties>
     <dependencies>
         <dependency>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index 7873e4c..d01aa34 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -86,7 +86,8 @@ import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
         @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the
partition at which the message was stored"),
         @WritesAttribute(attribute = "eventhub.sequence", description = "The sequence number
associated with the message"),
         @WritesAttribute(attribute = "eventhub.name", description = "The name of the event
hub from which the message was pulled"),
-        @WritesAttribute(attribute = "eventhub.partition", description = "The name of the
partition from which the message was pulled")
+        @WritesAttribute(attribute = "eventhub.partition", description = "The name of the
partition from which the message was pulled"),
+        @WritesAttribute(attribute = "eventhub.property.*", description = "The application
properties of this message. IE: 'application' would be 'eventhub.property.application'")
 })
 public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
 
@@ -401,6 +402,9 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor
{
                 attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber()));
             }
 
+            final Map<String,String> applicationProperties = AzureEventHubUtils.getApplicationProperties(eventData);
+            attributes.putAll(applicationProperties);
+
             attributes.put("eventhub.name", eventHubName);
             attributes.put("eventhub.partition", partitionId);
         }
@@ -414,8 +418,8 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor
{
 
                 final Map<String, String> attributes = new HashMap<>();
                 putEventHubAttributes(attributes, eventHubName, partitionId, eventData);
-
                 flowFile = session.putAllAttributes(flowFile, attributes);
+
                 flowFile = session.write(flowFile, out -> {
                     out.write(eventData.getBytes());
                 });
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index 78a3327..f40f668 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -77,7 +77,8 @@ import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
         @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the
partition at which the message was stored"),
         @WritesAttribute(attribute = "eventhub.sequence", description = "The Azure sequence
number associated with the message"),
         @WritesAttribute(attribute = "eventhub.name", description = "The name of the event
hub from which the message was pulled"),
-        @WritesAttribute(attribute = "eventhub.partition", description = "The name of the
event hub partition from which the message was pulled")
+        @WritesAttribute(attribute = "eventhub.partition", description = "The name of the
event hub partition from which the message was pulled"),
+        @WritesAttribute(attribute = "eventhub.property.*", description = "The application
properties of this message. IE: 'application' would be 'eventhub.property.application'")
 })
 public class GetAzureEventHub extends AbstractProcessor {
     static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
@@ -375,6 +376,9 @@ public class GetAzureEventHub extends AbstractProcessor {
                         attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber()));
                     }
 
+                    final Map<String,String> applicationProperties = AzureEventHubUtils.getApplicationProperties(eventData);
+                    attributes.putAll(applicationProperties);
+
                     attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue());
                     attributes.put("eventhub.partition", partitionId);
 
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
index d95e27f..f71f6c8 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
@@ -18,8 +18,11 @@ package org.apache.nifi.processors.azure.eventhub.utils;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
 
 import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
+import com.microsoft.azure.eventhubs.EventData;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -87,4 +90,17 @@ public final class AzureEventHubUtils {
                     .setSasKeyName(sasName)
                     .setSasKey(sasKey).toString();
     }
+
+    public static Map<String, String> getApplicationProperties(EventData eventData)
{
+        final Map<String, String> properties = new HashMap<>();
+
+        final Map<String,Object> applicationProperties = eventData.getProperties();
+        if (null != applicationProperties) {
+            for (Map.Entry<String, Object> property : applicationProperties.entrySet())
{
+                properties.put(String.format("eventhub.property.%s", property.getKey()),
property.getValue().toString());
+            }
+        }
+
+        return properties;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index 88bcfa8..b33bc6f 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -145,6 +145,19 @@ public class GetAzureEventHubTest {
     }
 
     @Test
+    public void testNormalFlowWithApplicationProperties() throws Exception {
+        setUpStandardTestConfig();
+        testRunner.run(1, true);
+        testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 10);
+
+        MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GetAzureEventHub.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("eventhub.property.event-sender", "Apache NiFi");
+        flowFile.assertAttributeEquals("eventhub.property.application", "TestApp");
+
+        testRunner.clearTransferState();
+    }
+
+    @Test
     public void testNormalNotReceivedEventsFlow() throws Exception {
         setUpStandardTestConfig();
         processor.received = false;
@@ -193,6 +206,8 @@ public class GetAzureEventHubTest {
             final LinkedList<EventData> receivedEvents = new LinkedList<>();
             for(int i = 0; i < 10; i++){
                 EventData eventData = EventData.create(String.format("test event number:
%d", i).getBytes());
+                eventData.getProperties().put("event-sender", "Apache NiFi");
+                eventData.getProperties().put("application", "TestApp");
                 if (received) {
                     HashMap<String, Object> properties = new HashMap<>();
                     properties.put(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME, PARTITION_KEY_VALUE);
@@ -232,4 +247,4 @@ public class GetAzureEventHubTest {
         testRunner.setProperty(GetAzureEventHub.NUM_PARTITIONS,"4");
         testRunner.assertValid();
     }
-}
\ No newline at end of file
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
index 99541a5..17f8442 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
@@ -107,6 +107,7 @@ public class TestConsumeAzureEventHub {
         when(partitionContext.getPartitionId()).thenReturn("partition-id");
         when(partitionContext.getConsumerGroupName()).thenReturn("consumer-group");
     }
+
     @Test
     public void testProcessorConfigValidityWithManagedIdentityFlag() throws InitializationException
{
         TestRunner testRunner = TestRunners.newTestRunner(processor);
@@ -130,6 +131,23 @@ public class TestConsumeAzureEventHub {
         testRunner.setProperty(ConsumeAzureEventHub.USE_MANAGED_IDENTITY,"true");
         testRunner.assertValid();
     }
+
+    @Test
+    public void testReceivedApplicationProperties() throws Exception {
+        final EventData singleEvent = EventData.create("one".getBytes(StandardCharsets.UTF_8));
+        singleEvent.getProperties().put("event-sender", "Apache NiFi");
+        singleEvent.getProperties().put("application", "TestApp");
+        final Iterable<EventData> eventDataList = Arrays.asList(singleEvent);
+        eventProcessor.onEvents(partitionContext, eventDataList);
+
+        processSession.assertCommitted();
+        final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+        final MockFlowFile msg1 = flowFiles.get(0);
+        msg1.assertAttributeEquals("eventhub.property.event-sender", "Apache NiFi");
+        msg1.assertAttributeEquals("eventhub.property.application", "TestApp");
+    }
+
     @Test
     public void testReceiveOne() throws Exception {
         final Iterable<EventData> eventDataList = Arrays.asList(EventData.create("one".getBytes(StandardCharsets.UTF_8)));


Mime
View raw message