jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From thom...@apache.org
Subject svn commit: r884154 - in /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation: EventStateCollection.java ObservationDispatcher.java
Date Wed, 25 Nov 2009 15:46:02 GMT
Author: thomasm
Date: Wed Nov 25 15:46:02 2009
New Revision: 884154

URL: http://svn.apache.org/viewvc?rev=884154&view=rev
Log:
JCR-2402 Observation: avoid running out of memory

Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventStateCollection.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventStateCollection.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventStateCollection.java?rev=884154&r1=884153&r2=884154&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventStateCollection.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventStateCollection.java
Wed Nov 25 15:46:02 2009
@@ -500,6 +500,15 @@
     }
 
     /**
+     * Get the number of events.
+     *
+     * @return the size
+     */
+    public int size() {
+        return events.size();
+    }
+
+    /**
      * Return the session who is the origin of this events.
      * @return event source
      */

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java?rev=884154&r1=884153&r2=884154&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java
Wed Nov 25 15:46:02 2009
@@ -27,6 +27,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Dispatcher for dispatching events to listeners within a single workspace.
@@ -46,6 +47,16 @@
     private static final DispatchAction DISPOSE_MARKER = new DispatchAction(null, null);
 
     /**
+     * The maximum number of queued asynchronous events. To avoid of of memory
+     * problems, the default value is 200'000. To change the default, set the
+     * system property jackrabbit.maxQueuedEvents to the required value. If more
+     * events are in the queue, the current thread waits, unless the current thread is
+     * the observation dispatcher itself (in which case only a warning is logged
+     * - usually observation listeners shouldn't cause new events).
+     */
+    private static final int MAX_QUEUED_EVENTS = Integer.parseInt(System.getProperty("jackrabbit.maxQueuedEvents",
"200000"));
+
+    /**
      * Currently active <code>EventConsumer</code>s for notification.
      */
     private Set<EventConsumer> activeConsumers = new HashSet<EventConsumer>();
@@ -76,11 +87,15 @@
     private Buffer eventQueue
             = BufferUtils.blockingBuffer(new UnboundedFifoBuffer());
 
+    private AtomicInteger eventQueueSize = new AtomicInteger();
+
     /**
      * The background notification thread
      */
     private Thread notificationThread;
 
+    private long lastError;
+
     /**
      * Creates a new <code>ObservationDispatcher</code> instance
      * and starts the notification thread daemon.
@@ -137,6 +152,7 @@
         DispatchAction action;
         while ((action = (DispatchAction) eventQueue.remove()) != DISPOSE_MARKER) {
 
+            eventQueueSize.getAndAdd(-action.getEventStates().size());
             log.debug("got EventStateCollection");
             log.debug("event delivery to " + action.getEventConsumers().size() + " consumers
started...");
             for (Iterator<EventConsumer> it = action.getEventConsumers().iterator();
it.hasNext();) {
@@ -202,6 +218,33 @@
             }
         }
         eventQueue.add(new DispatchAction(events, getAsynchronousConsumers()));
+        int size = eventQueueSize.addAndGet(events.size());
+        if (size > MAX_QUEUED_EVENTS) {
+            boolean logWarning = false;
+            long now = System.currentTimeMillis();
+            // log a warning at most every 5 seconds (to avoid filling the log file)
+            if (lastError == 0 || now > lastError + 5000) {
+                logWarning = true;
+                log.warn("More than " + MAX_QUEUED_EVENTS + " events in the queue", new Exception("Stack
Trace"));
+                lastError = now;
+            }
+            if (Thread.currentThread() == notificationThread) {
+                if (logWarning) {
+                    log.warn("Recursive notification?");
+                }
+            } else {
+                if (logWarning) {
+                    log.warn("Waiting");
+                }
+                while (eventQueueSize.get() > MAX_QUEUED_EVENTS) {
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                }
+            }
+        }
     }
 
     /**



Mime
View raw message