Author: reschke
Date: Mon Feb 10 12:59:33 2020
New Revision: 1873852
URL: http://svn.apache.org/viewvc?rev=1873852&view=rev
Log:
JCR-4530: jackrabbit-core: avoid use of deprecated commons-collections Buffers
Modified:
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java?rev=1873852&r1=1873851&r2=1873852&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java
Mon Feb 10 12:59:33 2020
@@ -16,19 +16,17 @@
*/
package org.apache.jackrabbit.core.observation;
-import org.apache.commons.collections.Buffer;
-import org.apache.commons.collections.BufferUtils;
-import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
-import org.apache.jackrabbit.core.state.ChangeLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Collections;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.jackrabbit.core.state.ChangeLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Dispatcher for dispatching events to listeners within a single workspace.
*/
@@ -84,8 +82,7 @@ public final class ObservationDispatcher
/**
* Contains the pending events that will be delivered to event listeners
*/
- private Buffer eventQueue
- = BufferUtils.blockingBuffer(new UnboundedFifoBuffer());
+ private BlockingQueue<DispatchAction> eventQueue = new LinkedBlockingQueue<>();
private AtomicInteger eventQueueSize = new AtomicInteger();
@@ -116,7 +113,7 @@ public final class ObservationDispatcher
try {
notificationThread.join();
} catch (InterruptedException e) {
- // FIXME log exception ?
+ log.debug("while joining notificationThread", e);
}
log.info("Notification of EventListeners stopped.");
}
@@ -149,26 +146,29 @@ public final class ObservationDispatcher
* thread.
*/
public void run() {
- DispatchAction action;
- while ((action = (DispatchAction) eventQueue.remove()) != DISPOSE_MARKER) {
-
- eventQueueSize.getAndAdd(-action.getEventStates().size());
- log.debug("got EventStateCollection");
- log.debug("event delivery to " + action.getEventConsumers().size() + " consumers
started...");
- for (Iterator<EventConsumer> it = action.getEventConsumers().iterator();
it.hasNext();) {
- EventConsumer c = it.next();
- try {
- c.consumeEvents(action.getEventStates());
- } catch (Throwable t) {
- log.warn("EventConsumer " +
- c.getEventListener().getClass().getName() +
- " threw exception", t);
- // move on to the next consumer
+ boolean done = false;
+ do {
+ try {
+ DispatchAction action = eventQueue.take();
+ done = action == DISPOSE_MARKER;
+ if (!done) {
+ eventQueueSize.getAndAdd(-action.getEventStates().size());
+ log.debug("got EventStateCollection");
+ log.debug("event delivery to " + action.getEventConsumers().size() +
" consumers started...");
+ for (EventConsumer c : action.getEventConsumers()) {
+ try {
+ c.consumeEvents(action.getEventStates());
+ } catch (Throwable t) {
+ log.warn("EventConsumer " + c.getEventListener().getClass().getName()
+ " threw exception", t);
+ // move on to the next consumer
+ }
+ }
}
+ } catch (InterruptedException ex) {
+ log.debug("event delivery interrupted", ex);
}
- log.debug("event delivery finished.");
-
- }
+ } while (!done);
+ log.debug("event delivery finished.");
}
/**
@@ -310,5 +310,4 @@ public final class ObservationDispatcher
}
}
}
-
}
|