jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdue...@apache.org
Subject svn commit: r1374293 - in /jackrabbit/oak/trunk/oak-jcr/src: main/java/org/apache/jackrabbit/oak/jcr/observation/ test/java/org/apache/jackrabbit/oak/jcr/
Date Fri, 17 Aug 2012 14:40:49 GMT
Author: mduerig
Date: Fri Aug 17 14:40:48 2012
New Revision: 1374293

URL: http://svn.apache.org/viewvc?rev=1374293&view=rev
Log:
OAK-252  Stop sending observation events on shutdown

Modified:
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
    jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1374293&r1=1374292&r2=1374293&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
(original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
Fri Aug 17 14:40:48 2012
@@ -19,11 +19,17 @@ package org.apache.jackrabbit.oak.jcr.ob
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jcr.observation.Event;
 import javax.jcr.observation.EventListener;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
 import org.apache.jackrabbit.commons.iterator.EventIteratorAdapter;
 import org.apache.jackrabbit.oak.api.ChangeExtractor;
 import org.apache.jackrabbit.oak.api.PropertyState;
@@ -34,16 +40,16 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
 import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-
 class ChangeProcessor implements Runnable {
-
     private final ObservationManagerImpl observationManager;
     private final NamePathMapper namePathMapper;
     private final ChangeExtractor changeExtractor;
     private final EventListener listener;
     private final AtomicReference<ChangeFilter> filterRef;
+    private final CountDownLatch stopped = new CountDownLatch(1);
+    private volatile boolean running;
+    private volatile boolean stopping;
+    private ScheduledFuture<?> future;
 
     public ChangeProcessor(ObservationManagerImpl observationManager, EventListener listener,
ChangeFilter filter) {
         this.observationManager = observationManager;
@@ -57,12 +63,46 @@ class ChangeProcessor implements Runnabl
         filterRef.set(filter);
     }
 
+    /**
+     * Stop this change processor if running. After returning from this methods no further
+     * events will be delivered. This method has no effect if the change processor is not
running.
+     * A change processor is running while execution is inside its {@code run()} method.
+     */
+    public synchronized void stop() {
+        if (running) {
+            try {
+                stopping = true;
+                future.cancel(true);
+                stopped.await();
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Start the change processor on the passed {@code executor}.
+     * @param executor
+     * @throws IllegalStateException if started already
+     */
+    public synchronized void start(ScheduledExecutorService executor) {
+        if (future != null) {
+            throw new IllegalStateException("Change processor started already");
+        }
+        future = executor.scheduleWithFixedDelay(this, 100, 1000, TimeUnit.MILLISECONDS);
+    }
+
     @Override
     public void run() {
+        running = true;
         EventGeneratingNodeStateDiff diff = new EventGeneratingNodeStateDiff();
         changeExtractor.getChanges(diff);
-        observationManager.setHasEvents();
-        diff.sendEvents();
+        if (!stopping) {
+            diff.sendEvents();
+        }
+        stopped.countDown();
+        running = false;
     }
 
     //------------------------------------------------------------< private >---
@@ -89,7 +129,13 @@ class ChangeProcessor implements Runnabl
         public void sendEvents() {
             Iterator<Event> eventIt = Iterators.concat(events.iterator());
             if (eventIt.hasNext()) {
-                listener.onEvent(new EventIteratorAdapter(eventIt));
+                observationManager.setHasEvents();
+                listener.onEvent(new EventIteratorAdapter(eventIt) {
+                    @Override
+                    public boolean hasNext() {
+                        return !stopping && super.hasNext();
+                    }
+                });
                 events = new ArrayList<Iterator<Event>>(PURGE_LIMIT);
             }
         }
@@ -100,7 +146,7 @@ class ChangeProcessor implements Runnabl
 
         @Override
         public void propertyAdded(PropertyState after) {
-            if (filterRef.get().include(Event.PROPERTY_ADDED, jcrPath(), associatedParentNode))
{
+            if (!stopping && filterRef.get().include(Event.PROPERTY_ADDED, jcrPath(),
associatedParentNode)) {
                 Event event = generatePropertyEvent(Event.PROPERTY_ADDED, path, after);
                 events.add(Iterators.singletonIterator(event));
             }
@@ -108,7 +154,7 @@ class ChangeProcessor implements Runnabl
 
         @Override
         public void propertyChanged(PropertyState before, PropertyState after) {
-            if (filterRef.get().include(Event.PROPERTY_CHANGED, jcrPath(), associatedParentNode))
{
+            if (!stopping && filterRef.get().include(Event.PROPERTY_CHANGED, jcrPath(),
associatedParentNode)) {
                 Event event = generatePropertyEvent(Event.PROPERTY_CHANGED, path, after);
                 events.add(Iterators.singletonIterator(event));
             }
@@ -116,7 +162,7 @@ class ChangeProcessor implements Runnabl
 
         @Override
         public void propertyDeleted(PropertyState before) {
-            if (filterRef.get().include(Event.PROPERTY_REMOVED, jcrPath(), associatedParentNode))
{
+            if (!stopping && filterRef.get().include(Event.PROPERTY_REMOVED, jcrPath(),
associatedParentNode)) {
                 Event event = generatePropertyEvent(Event.PROPERTY_REMOVED, path, before);
                 events.add(Iterators.singletonIterator(event));
             }
@@ -127,7 +173,7 @@ class ChangeProcessor implements Runnabl
             if (NodeStateUtils.isHidden(name)) {
                 return;
             }
-            if (filterRef.get().includeChildren(jcrPath())) {
+            if (!stopping && filterRef.get().includeChildren(jcrPath())) {
                 Iterator<Event> events = generateNodeEvents(Event.NODE_ADDED, path,
name, after);
                 this.events.add(events);
                 if (++childNodeCount > PURGE_LIMIT) {
@@ -141,7 +187,7 @@ class ChangeProcessor implements Runnabl
             if (NodeStateUtils.isHidden(name)) {
                 return;
             }
-            if (filterRef.get().includeChildren(jcrPath())) {
+            if (!stopping && filterRef.get().includeChildren(jcrPath())) {
                 Iterator<Event> events = generateNodeEvents(Event.NODE_REMOVED, path,
name, before);
                 this.events.add(events);
             }
@@ -152,7 +198,7 @@ class ChangeProcessor implements Runnabl
             if (NodeStateUtils.isHidden(name)) {
                 return;
             }
-            if (filterRef.get().includeChildren(jcrPath())) {
+            if (!stopping && filterRef.get().includeChildren(jcrPath())) {
                 EventGeneratingNodeStateDiff diff = new EventGeneratingNodeStateDiff(
                         PathUtils.concat(path, name), events, after);
                 after.compareAgainstBaseState(before, diff);

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java?rev=1374293&r1=1374292&r2=1374293&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
(original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
Fri Aug 17 14:40:48 2012
@@ -19,8 +19,6 @@ package org.apache.jackrabbit.oak.jcr.ob
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jcr.RepositoryException;
@@ -41,31 +39,21 @@ public class ObservationManagerImpl impl
 
     private final ScheduledExecutorService executor;
 
-    private ScheduledFuture<?> future = null;
-
     private final Map<EventListener, ChangeProcessor> processors =
             new HashMap<EventListener, ChangeProcessor>();
 
     private final AtomicBoolean hasEvents = new AtomicBoolean(false);
 
-    public ObservationManagerImpl(
-            SessionDelegate sessionDelegate,
-            ScheduledExecutorService executor) {
+    public ObservationManagerImpl(SessionDelegate sessionDelegate, ScheduledExecutorService
executor) {
         this.sessionDelegate = sessionDelegate;
         this.executor = executor;
     }
 
-    private synchronized void sendEvents() {
-        for (ChangeProcessor processor : processors.values()) {
-            processor.run();
-        }
-    }
-
     public synchronized void dispose() {
-        if (future != null) {
-            future.cancel(false);
-            future = null;
+        for (ChangeProcessor processor : processors.values()) {
+            processor.stop();
         }
+        processors.clear();
     }
 
     /**
@@ -78,25 +66,14 @@ public class ObservationManagerImpl impl
     }
 
     @Override
-    public synchronized void addEventListener(
-            EventListener listener, int eventTypes, String absPath,
-            boolean isDeep, String[] uuid, String[] nodeTypeName,
-            boolean noLocal) throws RepositoryException {
-        if (future == null) {
-            future = executor.scheduleWithFixedDelay(new Runnable() {
-                @Override
-                public void run() {
-                    sendEvents();
-                }
-            }, 100, 1000, TimeUnit.MILLISECONDS);
-        }
-
-        ChangeFilter filter = new ChangeFilter(
-                eventTypes, absPath, isDeep, uuid, nodeTypeName, noLocal);
+    public synchronized void addEventListener(EventListener listener, int eventTypes, String
absPath,
+            boolean isDeep, String[] uuid, String[] nodeTypeName, boolean noLocal) throws
RepositoryException {
+        ChangeFilter filter = new ChangeFilter(eventTypes, absPath, isDeep, uuid, nodeTypeName,
noLocal);
         ChangeProcessor processor = processors.get(listener);
         if (processor == null) {
             processor = new ChangeProcessor(this, listener, filter);
             processors.put(listener, processor);
+            processor.start(executor);
         } else {
             processor.setFilter(filter);
         }
@@ -104,11 +81,10 @@ public class ObservationManagerImpl impl
 
     @Override
     public synchronized void removeEventListener(EventListener listener) {
-        processors.remove(listener);
+        ChangeProcessor processor = processors.remove(listener);
 
-        if (processors.isEmpty()) {
-            future.cancel(false);
-            future = null;
+        if (processor != null) {
+            processor.stop();
         }
     }
 

Modified: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java?rev=1374293&r1=1374292&r2=1374293&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java
(original)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java
Fri Aug 17 14:40:48 2012
@@ -31,7 +31,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -62,13 +61,12 @@ import javax.jcr.observation.EventIterat
 import javax.jcr.observation.EventListener;
 import javax.jcr.observation.ObservationManager;
 
+import com.google.common.collect.Sets;
 import org.apache.jackrabbit.JcrConstants;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import com.google.common.collect.Sets;
-
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -1596,35 +1594,61 @@ public class RepositoryTest extends Abst
     }
 
     @Test
-    public void observationDispose() throws RepositoryException, ExecutionException, TimeoutException,
-                InterruptedException {
+    public void observationDispose() throws RepositoryException, InterruptedException, ExecutionException,
+            TimeoutException {
 
-        final List<Event> failedEvents = new ArrayList<Event>();
-
-        final ObservationManager obsMgr = getAdminSession().getWorkspace().getObservationManager();
-        final EventListener listener = new EventListener() {
-            @Override
-            public void onEvent(EventIterator events) {
-                while (events.hasNext()) {
-                    failedEvents.add(events.nextEvent());
+        final AtomicReference<CountDownLatch> hasEvents = new AtomicReference<CountDownLatch>(new
CountDownLatch(1));
+        final AtomicReference<CountDownLatch> waitForRemove = new AtomicReference<CountDownLatch>(new
CountDownLatch(1));
+        final Session observingSession = createAdminSession();
+        try {
+            final ObservationManager obsMgr = observingSession.getWorkspace().getObservationManager();
+            final EventListener listener = new EventListener() {
+                @Override
+                public void onEvent(EventIterator events) {
+                    while (events.hasNext()) {
+                        events.next();
+                        hasEvents.get().countDown();
+                        try {
+                            // After receiving an event wait until event listener is removed
+                            waitForRemove.get().await();
+                        }
+                        catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        }
+                    }
                 }
-            }
-        };
-        obsMgr.addEventListener(listener, Event.NODE_ADDED | Event.NODE_REMOVED | Event.NODE_MOVED
|
+            };
+
+            obsMgr.addEventListener(listener, Event.NODE_ADDED | Event.NODE_REMOVED | Event.NODE_MOVED
|
                 Event.PROPERTY_ADDED | Event.PROPERTY_REMOVED | Event.PROPERTY_CHANGED |
Event.PERSIST,
                 "/", true, null, null, false);
 
-        FutureTask<Void> disposer = new FutureTask<Void>(new Callable<Void>()
{
-            @Override
-            public Void call() throws Exception {
-                obsMgr.removeEventListener(listener);
-                return null;
-            }
-        });
+            // Generate two events
+            Node n = getNode(TEST_PATH);
+            n.setProperty("prop1", "val1");
+            n.setProperty("prop2", "val2");
+            n.getSession().save();
+
+            // Make sure we see the first event
+            assertTrue(hasEvents.get().await(2, TimeUnit.SECONDS));
 
-        Executors.newSingleThreadExecutor().execute(disposer);
-        disposer.get(2, TimeUnit.SECONDS);
-        assertTrue(failedEvents.isEmpty());
+            // Remove event listener before it receives the second event
+            Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    obsMgr.removeEventListener(listener);
+                    return null;
+                }
+            }).get(2, TimeUnit.SECONDS);
+            hasEvents.set(new CountDownLatch(1));
+            waitForRemove.get().countDown();
+
+            // Make sure we don't see the second event
+            assertFalse(hasEvents.get().await(2, TimeUnit.SECONDS));
+        }
+        finally {
+            observingSession.logout();
+        }
     }
 
     @Test



Mime
View raw message