nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pvill...@apache.org
Subject nifi git commit: NIFI-4547: Add ProvenanceEventConsumer utility class
Date Mon, 30 Oct 2017 08:53:26 GMT
Repository: nifi
Updated Branches:
  refs/heads/master fb94e983b -> d914ad292


NIFI-4547: Add ProvenanceEventConsumer utility class

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2236.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d914ad29
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d914ad29
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d914ad29

Branch: refs/heads/master
Commit: d914ad2924c8b2811b8b89d37b8139d14a77b641
Parents: fb94e98
Author: Koji Kawamura <ijokarumawak@apache.org>
Authored: Mon Oct 30 10:55:37 2017 +0900
Committer: Pierre Villard <pierre.villard.fr@gmail.com>
Committed: Mon Oct 30 09:50:44 2017 +0100

----------------------------------------------------------------------
 .../nifi-reporting-utils/pom.xml                |  45 ++++
 .../provenance/ProvenanceEventConsumer.java     | 244 +++++++++++++++++++
 nifi-nar-bundles/nifi-extension-utils/pom.xml   |   1 +
 .../nifi-site-to-site-reporting-task/pom.xml    |   4 +
 .../SiteToSiteProvenanceReportingTask.java      | 226 ++++-------------
 .../TestSiteToSiteProvenanceReportingTask.java  |   1 +
 pom.xml                                         |   5 +
 7 files changed, 349 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d914ad29/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
new file mode 100644
index 0000000..f534da9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-extension-utils</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-reporting-utils</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/d914ad29/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
new file mode 100644
index 0000000..1cbdbf1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.provenance;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.EventAccess;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.regex.Pattern;
+
+public class ProvenanceEventConsumer {
+
+    public static final String LAST_EVENT_ID_KEY = "last_event_id";
+
+    public static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream",
"Beginning of Stream",
+            "Start reading provenance Events from the beginning of the stream (the oldest
event first)");
+    public static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream",
"End of Stream",
+            "Start reading provenance Events from the end of the stream, ignoring old events");
+    public static final PropertyDescriptor PROVENANCE_START_POSITION = new PropertyDescriptor.Builder()
+            .name("provenance-start-position")
+            .displayName("Provenance Record Start Position")
+            .description("If the Reporting Task has never been run, or if its state has been
reset by a user, specifies where in the stream of Provenance Events the Reporting Task should
start")
+            .allowableValues(BEGINNING_OF_STREAM, END_OF_STREAM)
+            .defaultValue(BEGINNING_OF_STREAM.getValue())
+            .required(true)
+            .build();
+    public static final PropertyDescriptor PROVENANCE_BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("provenance-batch-size")
+            .displayName("Provenance Record Batch Size")
+            .description("Specifies how many records to send in a single batch, at most.")
+            .required(true)
+            .defaultValue("1000")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+
+    private String startPositionValue = PROVENANCE_START_POSITION.getDefaultValue();
+    private Pattern componentTypeRegex;
+    private List<ProvenanceEventType> eventTypes = new ArrayList<ProvenanceEventType>();
+    private List<String> componentIds = new ArrayList<String>();
+    private int batchSize = Integer.parseInt(PROVENANCE_BATCH_SIZE.getDefaultValue());
+
+    private volatile long firstEventId = -1L;
+    private volatile boolean scheduled = false;
+
+    private ComponentLog logger;
+
+    public void setStartPositionValue(String startPositionValue) {
+        this.startPositionValue = startPositionValue;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public void setComponentTypeRegex(final String componentTypeRegex) {
+        if (!StringUtils.isBlank(componentTypeRegex)) {
+            this.componentTypeRegex = Pattern.compile(componentTypeRegex);
+        }
+    }
+
+    public void addTargetEventType(final ProvenanceEventType ... types) {
+        for (ProvenanceEventType type : types) {
+            eventTypes.add(type);
+        }
+    }
+
+    public void addTargetComponentId(final String ... ids) {
+        for (String id : ids) {
+            componentIds.add(id);
+        }
+    }
+
+    public void setScheduled(boolean scheduled) {
+        this.scheduled = scheduled;
+    }
+
+    public boolean isScheduled() {
+        return scheduled;
+    }
+
+    public void setLogger(ComponentLog logger) {
+        this.logger = logger;
+    }
+
+    public void consumeEvents(final EventAccess eventAccess, final StateManager stateManager,
+                              final Consumer<List<ProvenanceEventRecord>> consumer)
throws ProcessException {
+
+        Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId();
+
+        if(currMaxId == null) {
+            logger.debug("No events to send because no events have been created yet.");
+            return;
+        }
+
+        if (firstEventId < 0) {
+            Map<String, String> state;
+            try {
+                state = stateManager.getState(Scope.LOCAL).toMap();
+            } catch (IOException e) {
+                logger.error("Failed to get state at start up due to:" + e.getMessage(),
e);
+                return;
+            }
+
+            if (state.containsKey(LAST_EVENT_ID_KEY)) {
+                firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1;
+            } else {
+                if (END_OF_STREAM.getValue().equals(startPositionValue)) {
+                    firstEventId = currMaxId;
+                }
+            }
+
+            if (currMaxId < (firstEventId - 1)) {
+                if (BEGINNING_OF_STREAM.getValue().equals(startPositionValue)) {
+                    logger.warn("Current provenance max id is {} which is less than what
was stored in state as the last queried event, which was {}. This means the provenance restarted
its " +
+                            "ids. Restarting querying from the beginning.", new Object[]{currMaxId,
firstEventId});
+                    firstEventId = -1;
+                } else {
+                    logger.warn("Current provenance max id is {} which is less than what
was stored in state as the last queried event, which was {}. This means the provenance restarted
its " +
+                            "ids. Restarting querying from the latest event in the Provenance
Repository.", new Object[] {currMaxId, firstEventId});
+                    firstEventId = currMaxId;
+                }
+            }
+        }
+
+        if (currMaxId == (firstEventId - 1)) {
+            logger.debug("No events to send due to the current max id being equal to the
last id that was queried.");
+            return;
+        }
+
+        List<ProvenanceEventRecord> rawEvents;
+        List<ProvenanceEventRecord> filteredEvents;
+        try {
+            rawEvents = eventAccess.getProvenanceEvents(firstEventId, batchSize);
+            filteredEvents = filterEvents(rawEvents);
+        } catch (final IOException ioe) {
+            logger.error("Failed to retrieve Provenance Events from repository due to: "
+ ioe.getMessage(), ioe);
+            return;
+        }
+
+        if (rawEvents == null || rawEvents.isEmpty()) {
+            logger.debug("No events to send due to 'events' being null or empty.");
+            return;
+        }
+
+        // Consume while there are more events and not stopped.
+        while (rawEvents != null && !rawEvents.isEmpty() && isScheduled())
{
+
+            if (!filteredEvents.isEmpty()) {
+                // Executes callback.
+                consumer.accept(filteredEvents);
+            }
+
+            firstEventId = updateLastEventId(rawEvents, stateManager);
+
+            // Retrieve the next batch
+            try {
+                rawEvents = eventAccess.getProvenanceEvents(firstEventId, batchSize);
+                filteredEvents = filterEvents(rawEvents);
+            } catch (final IOException ioe) {
+                logger.error("Failed to retrieve Provenance Events from repository due to:
" + ioe.getMessage(), ioe);
+                return;
+            }
+        }
+
+    }
+
+    private long updateLastEventId(final List<ProvenanceEventRecord> events, final
StateManager stateManager) {
+        if (events == null || events.isEmpty()) {
+            return firstEventId;
+        }
+
+        // Store the id of the last event so we know where we left off
+        final ProvenanceEventRecord lastEvent = events.get(events.size() - 1);
+        final String lastEventId = String.valueOf(lastEvent.getEventId());
+        try {
+            Map<String, String> newMapOfState = new HashMap<>();
+            newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId);
+            stateManager.setState(newMapOfState, Scope.LOCAL);
+        } catch (final IOException ioe) {
+            logger.error("Failed to update state to {} due to {}; this could result in events
being re-sent after a restart. The message of {} was: {}",
+                    new Object[] {lastEventId, ioe, ioe, ioe.getMessage()}, ioe);
+        }
+
+        return lastEvent.getEventId() + 1;
+    }
+
+
+    private boolean isFilteringEnabled() {
+        return componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty();
+    }
+
+    private List<ProvenanceEventRecord> filterEvents(List<ProvenanceEventRecord>
provenanceEvents) {
+        if(isFilteringEnabled()) {
+            List<ProvenanceEventRecord> filteredEvents = new ArrayList<ProvenanceEventRecord>();
+
+            for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
+                if(!componentIds.isEmpty() && !componentIds.contains(provenanceEventRecord.getComponentId()))
{
+                    continue;
+                }
+                if(!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType()))
{
+                    continue;
+                }
+                if(componentTypeRegex != null && !componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches())
{
+                    continue;
+                }
+                filteredEvents.add(provenanceEventRecord);
+            }
+
+            return filteredEvents;
+        } else {
+            return provenanceEvents;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d914ad29/nifi-nar-bundles/nifi-extension-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/pom.xml
index ac8a4ed..4a8e054 100644
--- a/nifi-nar-bundles/nifi-extension-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/pom.xml
@@ -30,6 +30,7 @@
         <module>nifi-record-utils</module>
         <module>nifi-hadoop-utils</module>
         <module>nifi-processor-utils</module>
+        <module>nifi-reporting-utils</module>
     </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/d914ad29/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
index 50f7d40..48efab7 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
@@ -36,6 +36,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-reporting-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-utils</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/d914ad29/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index 8af9412..8b8048b 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -27,7 +27,6 @@ import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
@@ -39,6 +38,7 @@ import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
 
 import javax.json.Json;
 import javax.json.JsonArray;
@@ -62,7 +62,6 @@ import java.util.Map;
 import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
 
 @Tags({"provenance", "lineage", "tracking", "site", "site to site", "restricted"})
 @CapabilityDescription("Publishes Provenance events using the Site To Site protocol.")
@@ -125,52 +124,43 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
         .required(true)
         .build();
 
-    private volatile long firstEventId = -1L;
-    private volatile boolean isFilteringEnabled = false;
-    private volatile Pattern componentTypeRegex;
-    private volatile List<ProvenanceEventType> eventTypes = new ArrayList<ProvenanceEventType>();
-    private volatile List<String> componentIds = new ArrayList<String>();
-    private volatile boolean scheduled = false;
+    private volatile ProvenanceEventConsumer consumer;
 
     @OnScheduled
     public void onScheduled(final ConfigurationContext context) throws IOException {
+        consumer = new ProvenanceEventConsumer();
+        consumer.setStartPositionValue(context.getProperty(START_POSITION).getValue());
+        consumer.setBatchSize(context.getProperty(BATCH_SIZE).asInteger());
+        consumer.setLogger(getLogger());
+
         // initialize component type filtering
-        componentTypeRegex = StringUtils.isBlank(context.getProperty(FILTER_COMPONENT_TYPE).getValue())
? null : Pattern.compile(context.getProperty(FILTER_COMPONENT_TYPE).getValue());
+        consumer.setComponentTypeRegex(context.getProperty(FILTER_COMPONENT_TYPE).getValue());
 
-        final String[] eventList = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE).getValue(),
','));
-        if(eventList != null) {
-            for(String type : eventList) {
+        final String[] targetEventTypes = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE).getValue(),
','));
+        if(targetEventTypes != null) {
+            for(String type : targetEventTypes) {
                 try {
-                    eventTypes.add(ProvenanceEventType.valueOf(type));
+                    consumer.addTargetEventType(ProvenanceEventType.valueOf(type));
                 } catch (Exception e) {
                     getLogger().warn(type + " is not a correct event type, removed from the
filtering.");
                 }
             }
-        } else {
-            eventTypes.clear();
         }
 
         // initialize component ID filtering
-        final String[] componentIdList = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID).getValue(),
','));
-        if(componentIdList != null) {
-            componentIds.addAll(Arrays.asList(componentIdList));
-        } else {
-            componentIds.clear();
+        final String[] targetComponentIds = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID).getValue(),
','));
+        if(targetComponentIds != null) {
+            consumer.addTargetComponentId(targetComponentIds);
         }
 
-        // set a boolean whether filtering will be applied or not
-        isFilteringEnabled = componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty();
-
-        scheduled = true;
+        consumer.setScheduled(true);
     }
 
     @OnUnscheduled
     public void onUnscheduled() {
-        scheduled = false;
-    }
-
-    public boolean isScheduled() {
-        return scheduled;
+        if (consumer != null) {
+            consumer.setScheduled(false);
+        }
     }
 
     @Override
@@ -228,65 +218,6 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
         final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
         final Map<String,String> componentMap = createComponentMap(procGroupStatus);
 
-        Long currMaxId = context.getEventAccess().getProvenanceRepository().getMaxEventId();
-
-        if(currMaxId == null) {
-            getLogger().debug("No events to send because no events have been created yet.");
-            return;
-        }
-
-        if (firstEventId < 0) {
-            Map<String, String> state;
-            try {
-                state = context.getStateManager().getState(Scope.LOCAL).toMap();
-            } catch (IOException e) {
-                getLogger().error("Failed to get state at start up due to:" + e.getMessage(),
e);
-                return;
-            }
-
-            final String startPositionValue = context.getProperty(START_POSITION).getValue();
-
-            if (state.containsKey(LAST_EVENT_ID_KEY)) {
-                firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1;
-            } else {
-                if (END_OF_STREAM.getValue().equals(startPositionValue)) {
-                    firstEventId = currMaxId;
-                }
-            }
-
-            if (currMaxId < (firstEventId - 1)) {
-                if (BEGINNING_OF_STREAM.getValue().equals(startPositionValue)) {
-                    getLogger().warn("Current provenance max id is {} which is less than
what was stored in state as the last queried event, which was {}. This means the provenance
restarted its " +
-                        "ids. Restarting querying from the beginning.", new Object[]{currMaxId,
firstEventId});
-                    firstEventId = -1;
-                } else {
-                    getLogger().warn("Current provenance max id is {} which is less than
what was stored in state as the last queried event, which was {}. This means the provenance
restarted its " +
-                        "ids. Restarting querying from the latest event in the Provenance
Repository.", new Object[] {currMaxId, firstEventId});
-                    firstEventId = currMaxId;
-                }
-            }
-        }
-
-        if (currMaxId == (firstEventId - 1)) {
-            getLogger().debug("No events to send due to the current max id being equal to
the last id that was queried.");
-            return;
-        }
-
-        List<ProvenanceEventRecord> rawEvents;
-        List<ProvenanceEventRecord> filteredEvents;
-        try {
-            rawEvents = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger());
-            filteredEvents = filterEvents(rawEvents);
-        } catch (final IOException ioe) {
-            getLogger().error("Failed to retrieve Provenance Events from repository due to:
" + ioe.getMessage(), ioe);
-            return;
-        }
-
-        if (rawEvents == null || rawEvents.isEmpty()) {
-            getLogger().debug("No events to send due to 'events' being null or empty.");
-            return;
-        }
-
         final String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue();
         URL url;
         try {
@@ -306,104 +237,45 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
         final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
         df.setTimeZone(TimeZone.getTimeZone("Z"));
 
-        while (rawEvents != null && !rawEvents.isEmpty() && isScheduled())
{
+        consumer.consumeEvents(context.getEventAccess(), context.getStateManager(), events
-> {
             final long start = System.nanoTime();
-
-            if (!filteredEvents.isEmpty()) {
-                // Create a JSON array of all the events in the current batch
-                final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
-                for (final ProvenanceEventRecord event : filteredEvents) {
-                    final String componentName = componentMap.get(event.getComponentId());
-                    arrayBuilder.add(serialize(factory, builder, event, df, componentName,
hostname, url, rootGroupName, platform, nodeId));
-                }
-                final JsonArray jsonArray = arrayBuilder.build();
-
-                // Send the JSON document for the current batch
-                try {
-                    final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
-                    if (transaction == null) {
-                        getLogger().debug("All destination nodes are penalized; will attempt
to send data later");
-                        return;
-                    }
-
-                    final Map<String, String> attributes = new HashMap<>();
-                    final String transactionId = UUID.randomUUID().toString();
-                    attributes.put("reporting.task.transaction.id", transactionId);
-                    attributes.put("mime.type", "application/json");
-
-                    final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8);
-                    transaction.send(data, attributes);
-                    transaction.confirm();
-                    transaction.complete();
-
-                    final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
- start);
-                    getLogger().info("Successfully sent {} Provenance Events to destination
in {} ms; Transaction ID = {}; First Event ID = {}",
-                        new Object[] {filteredEvents.size(), transferMillis, transactionId,
rawEvents.get(0).getEventId()});
-                } catch (final IOException e) {
-                    throw new ProcessException("Failed to send Provenance Events to destination
due to IOException:" + e.getMessage(), e);
-                }
+            // Create a JSON array of all the events in the current batch
+            final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
+            for (final ProvenanceEventRecord event : events) {
+                final String componentName = componentMap.get(event.getComponentId());
+                arrayBuilder.add(serialize(factory, builder, event, df, componentName, hostname,
url, rootGroupName, platform, nodeId));
             }
+            final JsonArray jsonArray = arrayBuilder.build();
 
-            firstEventId = updateLastEventId(rawEvents, context.getStateManager());
-
-            // Retrieve the next batch
+            // Send the JSON document for the current batch
             try {
-                rawEvents = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger());
-                filteredEvents = filterEvents(rawEvents);
-            } catch (final IOException ioe) {
-                getLogger().error("Failed to retrieve Provenance Events from repository due
to: " + ioe.getMessage(), ioe);
-                return;
-            }
-        }
-    }
-
-    private long updateLastEventId(final List<ProvenanceEventRecord> events, final
StateManager stateManager) {
-        if (events == null || events.isEmpty()) {
-            return firstEventId;
-        }
-
-        // Store the id of the last event so we know where we left off
-        final ProvenanceEventRecord lastEvent = events.get(events.size() - 1);
-        final String lastEventId = String.valueOf(lastEvent.getEventId());
-        try {
-            Map<String, String> newMapOfState = new HashMap<>();
-            newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId);
-            stateManager.setState(newMapOfState, Scope.LOCAL);
-        } catch (final IOException ioe) {
-            getLogger().error("Failed to update state to {} due to {}; this could result
in events being re-sent after a restart. The message of {} was: {}",
-                new Object[] {lastEventId, ioe, ioe, ioe.getMessage()}, ioe);
-        }
-
-        return lastEvent.getEventId() + 1;
-    }
-
-    private List<ProvenanceEventRecord> filterEvents(final List<ProvenanceEventRecord>
provenanceEvents) {
-        if (provenanceEvents == null || provenanceEvents.isEmpty()) {
-            return Collections.emptyList();
-        }
-
-        if(isFilteringEnabled) {
-            List<ProvenanceEventRecord> filteredEvents = new ArrayList<ProvenanceEventRecord>();
-
-            for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
-                if(!componentIds.isEmpty() && !componentIds.contains(provenanceEventRecord.getComponentId()))
{
-                    continue;
-                }
-                if(!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType()))
{
-                    continue;
-                }
-                if(componentTypeRegex != null && !componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches())
{
-                    continue;
+                final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
+                if (transaction == null) {
+                    getLogger().debug("All destination nodes are penalized; will attempt
to send data later");
+                    return;
                 }
-                filteredEvents.add(provenanceEventRecord);
+
+                final Map<String, String> attributes = new HashMap<>();
+                final String transactionId = UUID.randomUUID().toString();
+                attributes.put("reporting.task.transaction.id", transactionId);
+                attributes.put("mime.type", "application/json");
+
+                final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8);
+                transaction.send(data, attributes);
+                transaction.confirm();
+                transaction.complete();
+
+                final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
- start);
+                getLogger().info("Successfully sent {} Provenance Events to destination in
{} ms; Transaction ID = {}; First Event ID = {}",
+                        new Object[] {events.size(), transferMillis, transactionId, events.get(0).getEventId()});
+            } catch (final IOException e) {
+                throw new ProcessException("Failed to send Provenance Events to destination
due to IOException:" + e.getMessage(), e);
             }
+        });
 
-            return filteredEvents;
-        } else {
-            return provenanceEvents;
-        }
     }
 
+
     static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder
builder, final ProvenanceEventRecord event, final DateFormat df,
         final String componentName, final String hostname, final URL nifiUrl, final String
applicationName, final String platform, final String nodeIdentifier) {
         addField(builder, "eventId", UUID.randomUUID().toString());

http://git-wip-us.apache.org/repos/asf/nifi/blob/d914ad29/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
index 86cbb74..ec2e301 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
@@ -336,6 +336,7 @@ public class TestSiteToSiteProvenanceReportingTask {
         task.initialize(initContext);
 
         // execute the reporting task and should not produce any data b/c max id same as
previous id
+        task.onScheduled(confContext);
         task.onTrigger(context);
         assertEquals(0, task.dataSent.size());
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d914ad29/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0104a45..37cbacc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1486,6 +1486,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-reporting-utils</artifactId>
+                <version>1.5.0-SNAPSHOT</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-schema-utils</artifactId>
                 <version>1.5.0-SNAPSHOT</version>
             </dependency>


Mime
View raw message