nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbe...@apache.org
Subject nifi git commit: NIFI-1579 Performance improvements for ListenSyslog which include removing an unnecessary yield and exposing a configurable size for the internal queue used by the processor, changing ListenSyslog to use a 20ms poll and use a long poll w
Date Fri, 04 Mar 2016 14:23:46 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 6776060ac -> 19e53962c


NIFI-1579 Performance improvements for ListenSyslog which include removing an unnecessary
yield and exposing a configurable size for the internal queue used by the processor, changing
ListenSyslog to use a 20ms poll and use a long poll when batching, also including same improvements
for ListenRELP


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/19e53962
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/19e53962
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/19e53962

Branch: refs/heads/master
Commit: 19e53962ca45fd00a46efd671b674d388ff10053
Parents: 6776060
Author: Bryan Bende <bbende@apache.org>
Authored: Tue Mar 1 12:54:52 2016 -0500
Committer: Bryan Bende <bbende@apache.org>
Committed: Fri Mar 4 09:17:45 2016 -0500

----------------------------------------------------------------------
 .../listen/AbstractListenEventProcessor.java    | 25 ++++++++++----
 .../nifi/processors/standard/ListenRELP.java    |  6 ++--
 .../nifi/processors/standard/ListenSyslog.java  | 34 +++++++++++++++-----
 .../processors/standard/TestListenSyslog.java   |  5 +--
 4 files changed, 52 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/19e53962/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
index 029f6db..d56255d 100644
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
@@ -85,6 +85,15 @@ public abstract class AbstractListenEventProcessor<E extends Event>
extends Abst
             .defaultValue("1 MB")
             .required(true)
             .build();
+    public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder()
+            .name("Max Size of Message Queue")
+            .description("The maximum size of the internal queue used to buffer messages
being transferred from the underlying channel to the processor. " +
+                    "Setting this value higher allows more messages to be buffered in memory
during surges of incoming messages, but increases the total " +
+                    "memory used by the processor.")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("10000")
+            .required(true)
+            .build();
 
     // Putting these properties here so sub-classes don't have to redefine them, but they
are
     // not added to the properties by default since not all processors may need them
@@ -119,7 +128,7 @@ public abstract class AbstractListenEventProcessor<E extends Event>
extends Abst
             .description("Messages received successfully will be sent out this relationship.")
             .build();
 
-    public static final int POLL_TIMEOUT_MS = 100;
+    public static final int POLL_TIMEOUT_MS = 20;
 
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> descriptors;
@@ -127,7 +136,7 @@ public abstract class AbstractListenEventProcessor<E extends Event>
extends Abst
     protected volatile int port;
     protected volatile Charset charset;
     protected volatile ChannelDispatcher dispatcher;
-    protected volatile BlockingQueue<E> events = new LinkedBlockingQueue<>(10);
+    protected volatile BlockingQueue<E> events;
     protected volatile BlockingQueue<E> errorEvents = new LinkedBlockingQueue<>();
 
     @Override
@@ -135,6 +144,7 @@ public abstract class AbstractListenEventProcessor<E extends Event>
extends Abst
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(PORT);
         descriptors.add(RECV_BUFFER_SIZE);
+        descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
         descriptors.add(MAX_SOCKET_BUFFER_SIZE);
         descriptors.add(CHARSET);
         descriptors.addAll(getAdditionalProperties());
@@ -178,6 +188,7 @@ public abstract class AbstractListenEventProcessor<E extends Event>
extends Abst
     public void onScheduled(final ProcessContext context) throws IOException {
         charset = Charset.forName(context.getProperty(CHARSET).getValue());
         port = context.getProperty(PORT).asInteger();
+        events = new LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger());
 
         final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
 
@@ -230,7 +241,7 @@ public abstract class AbstractListenEventProcessor<E extends Event>
extends Abst
      *
      * @return an event from one of the queues, or null if none are available
      */
-    protected E getMessage(final boolean longPoll, final boolean pollErrorQueue) {
+    protected E getMessage(final boolean longPoll, final boolean pollErrorQueue, final ProcessSession
session) {
         E event = null;
         if (pollErrorQueue) {
             event = errorEvents.poll();
@@ -249,6 +260,10 @@ public abstract class AbstractListenEventProcessor<E extends Event>
extends Abst
             }
         }
 
+        if (event != null) {
+            session.adjustCounter("Messages Received", 1L, false);
+        }
+
         return event;
     }
 
@@ -270,7 +285,7 @@ public abstract class AbstractListenEventProcessor<E extends Event>
extends Abst
 
         final Map<String,FlowFileEventBatch> batches = new HashMap<>();
         for (int i=0; i < totalBatchSize; i++) {
-            final E event = getMessage(true, true);
+            final E event = getMessage(true, true, session);
             if (event == null) {
                 break;
             }
@@ -311,8 +326,6 @@ public abstract class AbstractListenEventProcessor<E extends Event>
extends Abst
                 errorEvents.offer(event);
                 break;
             }
-
-            session.adjustCounter("Messages Received", 1L, false);
         }
 
         return batches;

http://git-wip-us.apache.org/repos/asf/nifi/blob/19e53962/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
index 99e1830..c386bdc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
@@ -133,9 +133,10 @@ public class ListenRELP extends AbstractListenEventProcessor<RELPEvent>
{
         final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
         final Map<String,FlowFileEventBatch> batches = getBatches(session, maxBatchSize,
messageDemarcatorBytes);
 
-        // if the size is 0 then there was nothing to process so yield and return
+        // if the size is 0 then there was nothing to process so return
+        // we don't need to yield here because inside getBatches() we are polling a queue
with a wait
+        // and yielding here could have a negative impact on performance
         if (batches.size() == 0) {
-            context.yield();
             return;
         }
 
@@ -170,6 +171,7 @@ public class ListenRELP extends AbstractListenEventProcessor<RELPEvent>
{
 
             getLogger().debug("Transferring {} to success", new Object[] {flowFile});
             session.transfer(flowFile, REL_SUCCESS);
+            session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
 
             // create a provenance receive event
             final String senderHost = sender.startsWith("/") && sender.length() >
1 ? sender.substring(1) : sender;

http://git-wip-us.apache.org/repos/asf/nifi/blob/19e53962/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 8ca7eb1..1ec406d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -98,6 +98,15 @@ import java.util.concurrent.TimeUnit;
 @SeeAlso({PutSyslog.class, ParseSyslog.class})
 public class ListenSyslog extends AbstractSyslogProcessor {
 
+    public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder()
+            .name("Max Size of Message Queue")
+            .description("The maximum size of the internal queue used to buffer messages
being transferred from the underlying channel to the processor. " +
+                    "Setting this value higher allows more messages to be buffered in memory
during surges of incoming messages, but increases the total " +
+                    "memory used by the processor.")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("10000")
+            .required(true)
+            .build();
     public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder()
             .name("Receive Buffer Size")
             .description("The size of each buffer used to receive Syslog messages. Adjust
this value appropriately based on the expected size of the " +
@@ -171,7 +180,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
     private volatile ChannelDispatcher channelDispatcher;
     private volatile SyslogParser parser;
     private volatile BlockingQueue<ByteBuffer> bufferPool;
-    private volatile BlockingQueue<RawSyslogEvent> syslogEvents = new LinkedBlockingQueue<>(10);
+    private volatile BlockingQueue<RawSyslogEvent> syslogEvents;
     private volatile BlockingQueue<RawSyslogEvent> errorEvents = new LinkedBlockingQueue<>();
     private volatile byte[] messageDemarcatorBytes; //it is only the array reference that
is volatile - not the contents.
 
@@ -182,6 +191,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         descriptors.add(PORT);
         descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(RECV_BUFFER_SIZE);
+        descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
         descriptors.add(MAX_SOCKET_BUFFER_SIZE);
         descriptors.add(MAX_CONNECTIONS);
         descriptors.add(MAX_BATCH_SIZE);
@@ -245,6 +255,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         final int port = context.getProperty(PORT).asInteger();
         final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final int maxMessageQueueSize = context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger();
         final String protocol = context.getProperty(PROTOCOL).getValue();
         final String charSet = context.getProperty(CHARSET).getValue();
         final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n",
"\n").replace("\\r", "\r").replace("\\t", "\t");
@@ -263,6 +274,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         }
 
         parser = new SyslogParser(Charset.forName(charSet));
+        syslogEvents = new LinkedBlockingQueue<>(maxMessageQueueSize);
 
         // create either a UDP or TCP reader and call open() to bind to the given port
         final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
@@ -313,7 +325,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         }
     }
 
-    protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue)
{
+    protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue,
final ProcessSession session) {
         RawSyslogEvent rawSyslogEvent = null;
         if (pollErrorQueue) {
             rawSyslogEvent = errorEvents.poll();
@@ -322,7 +334,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         if (rawSyslogEvent == null) {
             try {
                 if (longPoll) {
-                    rawSyslogEvent = syslogEvents.poll(100, TimeUnit.MILLISECONDS);
+                    rawSyslogEvent = syslogEvents.poll(20, TimeUnit.MILLISECONDS);
                 } else {
                     rawSyslogEvent = syslogEvents.poll();
                 }
@@ -332,6 +344,10 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             }
         }
 
+        if (rawSyslogEvent != null) {
+            session.adjustCounter("Messages Received", 1L, false);
+        }
+
         return rawSyslogEvent;
     }
 
@@ -342,11 +358,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws
ProcessException {
         // poll the queue with a small timeout to avoid unnecessarily yielding below
-        RawSyslogEvent rawSyslogEvent = getMessage(true, true);
+        RawSyslogEvent rawSyslogEvent = getMessage(true, true, session);
 
-        // if nothing in the queue then yield and return
+        // if nothing in the queue just return, we don't want to yield here because yielding
could adversely
+        // impact performance, and we already have a long poll in getMessage so there will
be some built in
+        // throttling even when no data is available
         if (rawSyslogEvent == null) {
-            context.yield();
             return;
         }
 
@@ -372,7 +389,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
             // If this is our first iteration, we have already polled our queues. Otherwise,
poll on each iteration.
             if (i > 0) {
-                rawSyslogEvent = getMessage(false, false);
+                rawSyslogEvent = getMessage(true, false, session);
 
                 if (rawSyslogEvent == null) {
                     break;
@@ -461,7 +478,6 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 break;
             }
 
-            session.adjustCounter("Messages Received", 1L, false);
             flowFilePerSender.put(sender, flowFile);
         }
 
@@ -483,6 +499,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
             getLogger().debug("Transferring {} to success", new Object[] {flowFile});
             session.transfer(flowFile, REL_SUCCESS);
+            session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
+
             final String senderHost = sender.startsWith("/") && sender.length() >
1 ? sender.substring(1) : sender;
             final String transitUri = new StringBuilder().append(protocol.toLowerCase()).append("://").append(senderHost).append(":").append(port).toString();
             session.getProvenanceReporter().receive(flowFile, transitUri);

http://git-wip-us.apache.org/repos/asf/nifi/blob/19e53962/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index 360dfe7..cd8621c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -20,6 +20,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -602,11 +603,11 @@ public class TestListenSyslog {
         }
 
         @Override
-        protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue)
{
+        protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue,
final ProcessSession session) {
             if (eventItr.hasNext()) {
                 return eventItr.next();
             }
-            return super.getMessage(longPoll, pollErrorQueue);
+            return super.getMessage(longPoll, pollErrorQueue, session);
         }
     }
 }


Mime
View raw message