jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From resc...@apache.org
Subject svn commit: r1873852 - /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java
Date Mon, 10 Feb 2020 12:59:33 GMT
Author: reschke
Date: Mon Feb 10 12:59:33 2020
New Revision: 1873852

URL: http://svn.apache.org/viewvc?rev=1873852&view=rev
Log:
JCR-4530: jackrabbit-core: avoid use of deprecated commons-collections Buffers

Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java?rev=1873852&r1=1873851&r2=1873852&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java
Mon Feb 10 12:59:33 2020
@@ -16,19 +16,17 @@
  */
 package org.apache.jackrabbit.core.observation;
 
-import org.apache.commons.collections.Buffer;
-import org.apache.commons.collections.BufferUtils;
-import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
-import org.apache.jackrabbit.core.state.ChangeLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.jackrabbit.core.state.ChangeLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Dispatcher for dispatching events to listeners within a single workspace.
  */
@@ -84,8 +82,7 @@ public final class ObservationDispatcher
     /**
      * Contains the pending events that will be delivered to event listeners
      */
-    private Buffer eventQueue
-            = BufferUtils.blockingBuffer(new UnboundedFifoBuffer());
+    private BlockingQueue<DispatchAction> eventQueue = new LinkedBlockingQueue<>();
 
     private AtomicInteger eventQueueSize = new AtomicInteger();
 
@@ -116,7 +113,7 @@ public final class ObservationDispatcher
         try {
             notificationThread.join();
         } catch (InterruptedException e) {
-            // FIXME log exception ?
+            log.debug("while joining notificationThread", e);
         }
         log.info("Notification of EventListeners stopped.");
     }
@@ -149,26 +146,29 @@ public final class ObservationDispatcher
      * thread.
      */
     public void run() {
-        DispatchAction action;
-        while ((action = (DispatchAction) eventQueue.remove()) != DISPOSE_MARKER) {
-
-            eventQueueSize.getAndAdd(-action.getEventStates().size());
-            log.debug("got EventStateCollection");
-            log.debug("event delivery to " + action.getEventConsumers().size() + " consumers
started...");
-            for (Iterator<EventConsumer> it = action.getEventConsumers().iterator();
it.hasNext();) {
-                EventConsumer c = it.next();
-                try {
-                    c.consumeEvents(action.getEventStates());
-                } catch (Throwable t) {
-                    log.warn("EventConsumer " +
-                            c.getEventListener().getClass().getName() +
-                            " threw exception", t);
-                    // move on to the next consumer
+        boolean done = false;
+        do {
+            try {
+                DispatchAction action = eventQueue.take();
+                done = action == DISPOSE_MARKER;
+                if (!done) {
+                    eventQueueSize.getAndAdd(-action.getEventStates().size());
+                    log.debug("got EventStateCollection");
+                    log.debug("event delivery to " + action.getEventConsumers().size() +
" consumers started...");
+                    for (EventConsumer c : action.getEventConsumers()) {
+                        try {
+                            c.consumeEvents(action.getEventStates());
+                        } catch (Throwable t) {
+                            log.warn("EventConsumer " + c.getEventListener().getClass().getName()
+ " threw exception", t);
+                            // move on to the next consumer
+                        }
+                    }
                 }
+            } catch (InterruptedException ex) {
+                log.debug("event delivery interrupted", ex);
             }
-            log.debug("event delivery finished.");
-
-        }
+        } while (!done);
+        log.debug("event delivery finished.");
     }
 
     /**
@@ -310,5 +310,4 @@ public final class ObservationDispatcher
             }
         }
     }
-
 }



Mime
View raw message