jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mreut...@apache.org
Subject svn commit: r592356 - in /jackrabbit/trunk: jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/ jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/hierarchy/ jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/observa...
Date Tue, 06 Nov 2007 10:16:15 GMT
Author: mreutegg
Date: Tue Nov  6 02:16:14 2007
New Revision: 592356

URL: http://svn.apache.org/viewvc?rev=592356&view=rev
Log:
JCR-1204: Redesign SPI observation

Added:
    jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/Subscription.java
  (with props)
Modified:
    jackrabbit/trunk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
    jackrabbit/trunk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/hierarchy/HierarchyEventListener.java
    jackrabbit/trunk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/observation/ObservationManagerImpl.java
    jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/EventFilter.java
    jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/RepositoryService.java
    jackrabbit/trunk/jackrabbit-spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventSubscription.java
    jackrabbit/trunk/jackrabbit-spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/RepositoryServiceImpl.java
    jackrabbit/trunk/jackrabbit-spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/SessionInfoImpl.java

Modified: jackrabbit/trunk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java?rev=592356&r1=592355&r2=592356&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
(original)
+++ jackrabbit/trunk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
Tue Nov  6 02:16:14 2007
@@ -83,6 +83,7 @@
 import org.apache.jackrabbit.spi.Event;
 import org.apache.jackrabbit.spi.NameFactory;
 import org.apache.jackrabbit.spi.PathFactory;
+import org.apache.jackrabbit.spi.Subscription;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
 
@@ -111,6 +112,7 @@
 import java.util.Set;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Collection;
 
 import EDU.oswego.cs.dl.util.concurrent.Sync;
 import EDU.oswego.cs.dl.util.concurrent.Mutex;
@@ -124,6 +126,8 @@
 
     private final RepositoryService service;
     private final SessionInfo sessionInfo;
+    private final NameFactory nameFactory;
+    private final PathFactory pathFactory;
 
     private final ItemStateFactory isf;
     private final HierarchyManager hierarchyManager;
@@ -153,6 +157,11 @@
      */
     private final Set listeners = new HashSet();
 
+    /**
+     * The current subscription for change events if there are listeners.
+     */
+    private Subscription subscription;
+
     public WorkspaceManager(RepositoryService service, SessionInfo sessionInfo,
                             CacheBehaviour cacheBehaviour, int pollTimeout,
                             boolean enableObservation)
@@ -160,6 +169,8 @@
         this.service = service;
         this.sessionInfo = sessionInfo;
         this.cacheBehaviour = cacheBehaviour;
+        this.nameFactory = service.getNameFactory();
+        this.pathFactory = service.getPathFactory();
 
         idFactory = service.getIdFactory();
         nsRegistry = createNamespaceRegistry(NamespaceCache.getInstance(service));
@@ -202,11 +213,11 @@
     }
 
     public NameFactory getNameFactory() throws RepositoryException {
-        return service.getNameFactory();
+        return nameFactory;
     }
 
     public PathFactory getPathFactory() throws RepositoryException {
-        return service.getPathFactory();
+        return pathFactory;
     }
 
     public ItemStateFactory getItemStateFactory() {
@@ -312,21 +323,48 @@
      * local and external changes.
      *
      * @param listener the new listener.
+     * @throws RepositoryException if the listener cannot be registered.
      */
-    public void addEventListener(InternalEventListener listener) {
+    public void addEventListener(InternalEventListener listener)
+            throws RepositoryException {
         synchronized (listeners) {
             listeners.add(listener);
+            EventFilter[] filters = getEventFilters(listeners);
+            if (listeners.size() == 1) {
+                subscription = service.createSubscription(sessionInfo, filters);
+            } else {
+                service.updateEventFilters(subscription, filters);
+            }
             listeners.notify();
         }
     }
 
     /**
+     * Updates the event filters on the subscription. The filters are retrieved
+     * from the current list of internal event listeners.
+     *
+     * @throws RepositoryException
+     */
+    public void updateEventFilters() throws RepositoryException {
+        synchronized (listeners) {
+            service.updateEventFilters(subscription, getEventFilters(listeners));
+        }
+    }
+
+    /**
      *
      * @param listener
      */
-    public void removeEventListener(InternalEventListener listener) {
+    public void removeEventListener(InternalEventListener listener)
+            throws RepositoryException {
         synchronized (listeners) {
             listeners.remove(listener);
+            if (listeners.isEmpty()) {
+                service.dispose(subscription);
+                subscription = null;
+            } else {
+                service.updateEventFilters(subscription, getEventFilters(listeners));
+            }
         }
     }
 
@@ -352,6 +390,21 @@
         return service.createEventFilter(sessionInfo, eventTypes, path, isDeep, uuids, nodeTypes,
noLocal);
     }
     //--------------------------------------------------------------------------
+
+    /**
+     * Gets the event filters from the passed listener list.
+     *
+     * @param listeners the internal event listeners.
+     */
+    private static EventFilter[] getEventFilters(Collection listeners) {
+        List filters = new ArrayList();
+        for (Iterator it = listeners.iterator(); it.hasNext(); ) {
+            InternalEventListener listener = (InternalEventListener) it.next();
+            filters.addAll(listener.getEventFilters());
+        }
+        return (EventFilter[]) filters.toArray(new EventFilter[filters.size()]);
+    }
+
     /**
      *
      * @return
@@ -504,6 +557,9 @@
                 changeFeed.interrupt();
             }
             hierarchyManager.dispose();
+            if (subscription != null) {
+                service.dispose(subscription);
+            }
             service.dispose(sessionInfo);
         } catch (Exception e) {
             log.warn("Exception while disposing WorkspaceManager: " + e);
@@ -970,23 +1026,19 @@
         public void run() {
             while (!Thread.interrupted()) {
                 try {
-                    // get filters from listeners
-                    List filters = new ArrayList();
                     InternalEventListener[] iel;
+                    Subscription subscr;
                     synchronized (listeners) {
                         while (listeners.isEmpty()) {
                             listeners.wait();
                         }
                         iel = (InternalEventListener[]) listeners.toArray(new InternalEventListener[0]);
+                        subscr = subscription;
                     }
-                    for (int i = 0; i < iel.length; i++) {
-                        filters.addAll(iel[i].getEventFilters());
-                    }
-                    EventFilter[] filtArr = (EventFilter[]) filters.toArray(new EventFilter[filters.size()]);
 
                     log.debug("calling getEvents() (Workspace={})",
                             sessionInfo.getWorkspaceName());
-                    EventBundle[] bundles = service.getEvents(sessionInfo, pollTimeout, filtArr);
+                    EventBundle[] bundles = service.getEvents(subscr, pollTimeout);
                     log.debug("returned from getEvents() (Workspace={})",
                             sessionInfo.getWorkspaceName());
                     // check if thread had been interrupted while

Modified: jackrabbit/trunk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/hierarchy/HierarchyEventListener.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/hierarchy/HierarchyEventListener.java?rev=592356&r1=592355&r2=592356&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/hierarchy/HierarchyEventListener.java
(original)
+++ jackrabbit/trunk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/hierarchy/HierarchyEventListener.java
Tue Nov  6 02:16:14 2007
@@ -60,7 +60,11 @@
                 // spi does not support observation, or another error occurred.
             }
             this.eventFilter = (filter == null) ? Collections.EMPTY_LIST : Collections.singletonList(filter);
-            wspManager.addEventListener(this);
+            try {
+                wspManager.addEventListener(this);
+            } catch (RepositoryException e) {
+                // spi does not support observation, or another error occurred.
+            }
         } else {
             this.eventFilter = Collections.EMPTY_LIST;
         }

Modified: jackrabbit/trunk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/observation/ObservationManagerImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/observation/ObservationManagerImpl.java?rev=592356&r1=592355&r2=592356&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/observation/ObservationManagerImpl.java
(original)
+++ jackrabbit/trunk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/observation/ObservationManagerImpl.java
Tue Nov  6 02:16:14 2007
@@ -75,12 +75,6 @@
     private Map readOnlySubscriptions;
 
     /**
-     * Indicates if this observation manager is registered as an internal event
-     * listener on the workspace manager.
-     */
-    private boolean isRegistered = false;
-
-    /**
      * Creates a new observation manager for <code>session</code>.
      * @param wspManager the WorkspaceManager.
      * @param resolver
@@ -135,9 +129,10 @@
             readOnlySubscriptions = null;
         }
 
-        if (!isRegistered) {
+        if (subscriptions.size() == 1) {
             wspManager.addEventListener(this);
-            isRegistered = true;
+        } else {
+            wspManager.updateEventFilters();
         }
     }
 
@@ -149,6 +144,11 @@
             if (subscriptions.remove(listener) != null) {
                 readOnlySubscriptions = null;
             }
+        }
+        if (subscriptions.size() == 0) {
+            wspManager.removeEventListener(this);
+        } else {
+            wspManager.updateEventFilters();
         }
     }
 

Modified: jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/EventFilter.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/EventFilter.java?rev=592356&r1=592355&r2=592356&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/EventFilter.java
(original)
+++ jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/EventFilter.java
Tue Nov  6 02:16:14 2007
@@ -31,7 +31,7 @@
     /**
      * If an implementation returns <code>true</code> the <code>event</code>
      * will be included in the event bundle returned by {@link
-     * RepositoryService#getEvents(SessionInfo, long, EventFilter[])}. A return
+     * RepositoryService#getEvents(Subscription, long)}. A return
      * value of <code>false</code> indicates that the client is not interested
      * in the <code>event</code>.
      *

Modified: jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/RepositoryService.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/RepositoryService.java?rev=592356&r1=592355&r2=592356&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/RepositoryService.java
(original)
+++ jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/RepositoryService.java
Tue Nov  6 02:16:14 2007
@@ -821,52 +821,100 @@
             throws UnsupportedRepositoryOperationException, RepositoryException;
 
     /**
-     * Retrieves the events that occurred since the last call to this
-     * method. When this method returns without an exception the bundle
-     * identfier in <code>sessionInfo</code> will be updated to reference the
-     * most recent event bundle returned by this call. In case an empty array is
-     * supplied as event filters it may also happen that the bundle identifier
-     * is updated even though no event bundle had been returned.
+     * Creates a new {@link Subscription} for events with an initial set of
+     * {@link EventFilter}s. The returned subscription must provide events from
+     * the time when the subscription was created. If an empty array of filters
+     * is passed no events will be available through the created subscription
+     * unless the filters are later updated by calling
+     * {@link RepositoryService#updateEventFilters(Subscription, EventFilter[])}.
+     *
+     * @param sessionInfo the session info.
+     * @param filters the initial event filters for the subscription.
+     * @return
+     * @throws UnsupportedRepositoryOperationException
+     *                             if this SPI implementation does not support
+     *                             observation.
+     * @throws RepositoryException if an error occurs while creating the
+     *                             Subscription.
+     */
+    public Subscription createSubscription(SessionInfo sessionInfo,
+                                           EventFilter[] filters)
+            throws UnsupportedRepositoryOperationException, RepositoryException;
+
+    /**
+     * Updates events filters on the subscription. When this method returns all
+     * events that go through the passed subscription and have been generated
+     * after this method call must be filtered using the passed
+     * <code>filters</code>.
      * <p/>
      * An implementation is required to accept at least event filter instances
-     * created by {@link #createEventFilter}. Optionally an implementation may
-     * also support event filters instanciated by the client itself. An
-     * implementation may require special deployment in that case, e.g. to make
-     * the event filter implementation class available to the repository
-     * server.<p/>
-     * Note, that an SPI implementation may support observation even if
-     * the corresponding {@link javax.jcr.Repository#OPTION_OBSERVATION_SUPPORTED repository
descriptor}
-     * does return 'false'.
+     * created by {@link RepositoryService#createEventFilter}. Optionally an
+     * implementation may also support event filters instanciated by the client
+     * itself. An implementation may require special deployment in that case,
+     * e.g. to make the event filter implementation class available to the
+     * repository server.
+     * <p/>
+     * <b>Note on thread-safety:</b> it is permissible to call this methods
+     * while another thread is blocked in calling {@link
+     * RepositoryService#getEvents(Subscription, long)} using the same
+     * subscription instance as a parameter.
      *
-     * @param sessionInfo the session info.
-     * @param timeout     a timeout in milliseconds to wait at most for an
-     *                    event bundle. If <code>timeout</code> is up
-     *                    and no event occurred meanwhile an empty array is
-     *                    returned.
-     * @param filters     the filters that are applied to the events as
-     *                    they occurred on the repository. An event is included
-     *                    in an event bundle if it is {@link EventFilter#accept(Event, boolean)
-     *                    accept}ed by at least one of the supplied filters. If
-     *                    an empty array is passed none of the potential events
-     *                    are include in an event bundle. This allows a client
-     *                    to skip or ignore events for a certain period of time.
-     *                    If <code>null</code> is passed no filtering is done
-     *                    and all events are included in the event bundle.
-     * @return an array of <code>EventBundle</code>s representing the external
-     *         events that occurred.
+     * @param subscription the subscription where the event filters are
+     *                     applied.
+     * @param filters the filters that are applied to the events as they
+     *                occurred on the repository. An event is included in an
+     *                event bundle if it is {@link EventFilter#accept(Event,
+     *                boolean) accept}ed by at least one of the supplied
+     *                filters. If an empty array is passed none of the potential
+     *                events are include in an event bundle. This allows a
+     *                client to skip or ignore events for a certain period of
+     *                time.
+     * @throws RepositoryException  if an error occurs while updating the event
+     *                              filters.
+     */
+    public void updateEventFilters(Subscription subscription,
+                                   EventFilter[] filters)
+            throws RepositoryException;
+
+    /**
+     * Retrieves the events that occurred since the last call to this method for
+     * the passed subscription.
+     * <p/>
+     * Note, that an SPI implementation may support observation even if the
+     * corresponding {@link javax.jcr.Repository#OPTION_OBSERVATION_SUPPORTED
+     * repository descriptor} does return 'false'.
+     * <p/>
+     * An implementation should un-block a calling thread and let it return if
+     * the associated subscription is disposed by another thread.
+     *
+     * @param subscription a subscription.
+     * @param timeout      a timeout in milliseconds to wait at most for an
+     *                     event bundle. If <code>timeout</code> is up and no
+     *                     event occurred meanwhile an empty array is returned.
+     * @return an array of <code>EventBundle</code>s representing the events
+     *         that occurred.
      * @throws RepositoryException  if an error occurs while retrieving the
-     *                              event bundles or the currently set bundle
-     *                              identifier in <code>sessionInfo</code>
-     *                              references an unknown or outdated event
-     *                              bundle.
-     * @throws UnsupportedRepositoryOperationException If this SPI implementation
-     * does not support observation.
+     *                              event bundles.
      * @throws InterruptedException if the calling thread is interrupted while
-     * waiting for events within the specified <code>timeout</code>.
+     *                              waiting for events within the specified
+     *                              <code>timeout</code>.
      */
-    public EventBundle[] getEvents(SessionInfo sessionInfo, long timeout,
-                                   EventFilter[] filters)
-            throws RepositoryException, UnsupportedRepositoryOperationException, InterruptedException;
+    public EventBundle[] getEvents(Subscription subscription,
+                                   long timeout)
+            throws RepositoryException, InterruptedException;
+
+    /**
+     * Indicates that the passed subscription is no longer needed.
+     * <p/>
+     * <b>Note on thread-safety:</b> it is permissible to call this methods
+     * while another thread is blocked in calling {@link
+     * RepositoryService#getEvents(Subscription, long)} using the same
+     * subscription instance as a parameter.
+     *
+     * @throws RepositoryException if an error occurs while the subscription is
+     *                             disposed.
+     */
+    public void dispose(Subscription subscription) throws RepositoryException;
 
     //---------------------------------------------------------< Namespaces >---
     /**

Added: jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/Subscription.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/Subscription.java?rev=592356&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/Subscription.java
(added)
+++ jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/Subscription.java
Tue Nov  6 02:16:14 2007
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.spi;
+
+/**
+ * <code>Subscription</code> defines a marker interface for an event
+ * subscription. An implemementation will likely keep information in this object
+ * about the last consumed events and other implementation specific data. A
+ * client will usually first create an event filter and then a subscription
+ * based on the filter. Events can then be retrieved by calling {@link
+ * RepositoryService#getEvents(Subscription, long)}. If a subscription is no
+ * longer needed a client should call {@link RepositoryService#dispose(Subscription)}.
+ */
+public interface Subscription {
+}

Propchange: jackrabbit/trunk/jackrabbit-spi/src/main/java/org/apache/jackrabbit/spi/Subscription.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/trunk/jackrabbit-spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventSubscription.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventSubscription.java?rev=592356&r1=592355&r2=592356&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventSubscription.java
(original)
+++ jackrabbit/trunk/jackrabbit-spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventSubscription.java
Tue Nov  6 02:16:14 2007
@@ -24,8 +24,10 @@
 import org.apache.jackrabbit.spi.IdFactory;
 import org.apache.jackrabbit.spi.Name;
 import org.apache.jackrabbit.spi.Path;
+import org.apache.jackrabbit.spi.Subscription;
 import org.apache.jackrabbit.spi.commons.EventImpl;
 import org.apache.jackrabbit.spi.commons.EventBundleImpl;
+import org.apache.jackrabbit.spi.commons.EventFilterImpl;
 import org.apache.jackrabbit.conversion.NameException;
 import org.apache.jackrabbit.conversion.NameResolver;
 import org.apache.jackrabbit.conversion.NamePathResolver;
@@ -33,20 +35,24 @@
 import org.slf4j.Logger;
 
 import javax.jcr.observation.EventListener;
+import javax.jcr.observation.ObservationManager;
 import javax.jcr.Session;
 import javax.jcr.Node;
 import javax.jcr.UnsupportedRepositoryOperationException;
 import javax.jcr.NamespaceException;
+import javax.jcr.RepositoryException;
 import javax.jcr.nodetype.NodeType;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Iterator;
+import java.util.Arrays;
+import java.util.Collections;
 
 /**
  * <code>EventSubscription</code> listens for JCR events and creates SPI event
  * bundles for them.
  */
-class EventSubscription implements EventListener {
+class EventSubscription implements Subscription, EventListener {
 
     /**
      * Logger instance for this class.
@@ -68,15 +74,85 @@
 
     private final SessionInfoImpl sessionInfo;
 
+    /**
+     * Current list of filters. Copy on write is performed on this list.
+     */
+    private volatile List filters;
+
+    /**
+     * The resolver of the underlying session.
+     */
     private final NamePathResolver resolver;
 
-    EventSubscription(IdFactory idFactory, SessionInfoImpl sessionInfo) {
+    /**
+     * Set to <code>true</code> if this subscription has been disposed.
+     */
+    private volatile boolean disposed = false;
+
+    /**
+     * Creates a new subscription for the passed session.
+     *
+     * @param idFactory   the id factory.
+     * @param sessionInfo the session info.
+     * @param filters     the filters that should be applied to the generated
+     *                    events.
+     * @throws RepositoryException if an error occurs while an event listener is
+     *                             registered with the session.
+     */
+    EventSubscription(IdFactory idFactory,
+                      SessionInfoImpl sessionInfo,
+                      EventFilter[] filters) throws RepositoryException {
         this.idFactory = idFactory;
         this.sessionInfo = sessionInfo;
         this.resolver = sessionInfo.getNamePathResolver();
+        setFilters(filters);
+        ObservationManager obsMgr = sessionInfo.getSession().getWorkspace().getObservationManager();
+        obsMgr.addEventListener(this, EventSubscription.ALL_EVENTS,
+                "/", true, null, null, true);
+    }
+
+    /**
+     * @return the session info associated with this event subscription.
+     */
+    SessionInfoImpl getSessionInfo() {
+        return sessionInfo;
     }
 
     /**
+     * Sets a new list of event filters for this subscription.
+     *
+     * @param filters the new filters.
+     * @throws RepositoryException if the filters array contains a unknown
+     *                             implementation of EventFilters.
+     */
+    void setFilters(EventFilter[] filters) throws RepositoryException {
+        // check type
+        for (int i = 0; i < filters.length; i++) {
+            if (!(filters[i] instanceof EventFilterImpl)) {
+                throw new RepositoryException("Unknown filter implementation");
+            }
+        }
+        List tmp = new ArrayList(Arrays.asList(filters));
+        this.filters = Collections.unmodifiableList(tmp);
+
+    }
+
+    /**
+     * Removes this subscription as a listener from the observation manager and
+     * marks itself as disposed.
+     */
+    void dispose() throws RepositoryException {
+        sessionInfo.removeSubscription(this);
+        sessionInfo.getSession().getWorkspace().getObservationManager().removeEventListener(this);
+        disposed = true;
+        synchronized (eventBundles) {
+            eventBundles.notify();
+        }
+    }
+
+    //--------------------------< EventListener >-------------------------------
+
+    /**
      * Adds the events to the list of pending event bundles.
      *
      * @param events the events that occurred.
@@ -100,10 +176,10 @@
     /**
      * @return all the pending event bundles.
      */
-    EventBundle[] getEventBundles(EventFilter[] filters, long timeout) {
+    EventBundle[] getEventBundles(long timeout) {
         EventBundle[] bundles;
         synchronized (eventBundles) {
-            while (eventBundles.isEmpty()) {
+            if (eventBundles.isEmpty()) {
                 try {
                     eventBundles.wait(timeout);
                 } catch (InterruptedException e) {
@@ -113,17 +189,19 @@
             bundles = (EventBundle[]) eventBundles.toArray(new EventBundle[eventBundles.size()]);
             eventBundles.clear();
         }
+        EventFilter[] eventFilters = (EventFilter[]) filters.toArray(
+                new EventFilter[filters.size()]);
         // apply filters to bundles
         for (int i = 0; i < bundles.length; i++) {
             List filteredEvents = new ArrayList();
             for (Iterator it = bundles[i].getEvents(); it.hasNext(); ) {
                 Event e = (Event) it.next();
                 // TODO: this is actually not correct. if filters are empty no event should
go out
-                if (filters == null || filters.length == 0) {
+                if (eventFilters == null || eventFilters.length == 0) {
                     filteredEvents.add(e);
                 } else {
-                    for (int j = 0; j < filters.length; j++) {
-                        if (filters[j].accept(e, bundles[i].isLocal())) {
+                    for (int j = 0; j < eventFilters.length; j++) {
+                        if (eventFilters[j].accept(e, bundles[i].isLocal())) {
                             filteredEvents.add(e);
                             break;
                         }
@@ -139,6 +217,10 @@
 
     private void createEventBundle(javax.jcr.observation.EventIterator events,
                                    boolean isLocal) {
+        // do not create events when disposed
+        if (disposed) {
+            return;
+        }
         List spiEvents = new ArrayList();
         while (events.hasNext()) {
             try {

Modified: jackrabbit/trunk/jackrabbit-spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/RepositoryServiceImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/RepositoryServiceImpl.java?rev=592356&r1=592355&r2=592356&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/RepositoryServiceImpl.java
(original)
+++ jackrabbit/trunk/jackrabbit-spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/RepositoryServiceImpl.java
Tue Nov  6 02:16:14 2007
@@ -37,6 +37,7 @@
 import org.apache.jackrabbit.spi.PathFactory;
 import org.apache.jackrabbit.spi.Name;
 import org.apache.jackrabbit.spi.Path;
+import org.apache.jackrabbit.spi.Subscription;
 import org.apache.jackrabbit.spi.commons.EventFilterImpl;
 import org.apache.jackrabbit.name.NameFactoryImpl;
 import org.apache.jackrabbit.name.PathFactoryImpl;
@@ -97,8 +98,8 @@
 import java.util.HashSet;
 import java.util.Set;
 import java.util.Arrays;
-import java.util.IdentityHashMap;
 import java.util.Collections;
+import java.util.Collection;
 import java.io.InputStream;
 import java.io.IOException;
 import java.io.ByteArrayInputStream;
@@ -127,11 +128,6 @@
     private final IdFactoryImpl idFactory = (IdFactoryImpl) IdFactoryImpl.getInstance();
 
     /**
-     * Maps session info instances to {@link EventSubscription}s.
-     */
-    private final Map subscriptions = Collections.synchronizedMap(new IdentityHashMap());
-
-    /**
      * Set to <code>true</code> if the underlying JCR repository supports
      * observation.
      */
@@ -227,10 +223,12 @@
      * {@inheritDoc}
      */
     public void dispose(SessionInfo sessionInfo) throws RepositoryException {
-        synchronized (sessionInfo) {
-            subscriptions.remove(sessionInfo);
-            getSessionInfoImpl(sessionInfo).getSession().logout();
+        SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        for (Iterator it = sInfo.getSubscriptions().iterator(); it.hasNext(); ) {
+            EventSubscription s = (EventSubscription) it.next();
+            s.dispose();
         }
+        sInfo.getSession().logout();
     }
 
     /**
@@ -908,9 +906,6 @@
                                          Name[] nodeTypeName,
                                          boolean noLocal)
             throws UnsupportedRepositoryOperationException, RepositoryException {
-        // make sure there is an event subscription for this session info
-        getSubscription(sessionInfo);
-
         Set ntNames = null;
         if (nodeTypeName != null) {
             ntNames = new HashSet(Arrays.asList(nodeTypeName));
@@ -921,11 +916,39 @@
     /**
      * {@inheritDoc}
      */
-    public EventBundle[] getEvents(SessionInfo sessionInfo,
-                                   long timeout,
-                                   EventFilter[] filters)
+    public Subscription createSubscription(SessionInfo sessionInfo,
+                                           EventFilter[] filters)
+            throws UnsupportedRepositoryOperationException, RepositoryException {
+        return getSessionInfoImpl(sessionInfo).createSubscription(idFactory, filters);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public EventBundle[] getEvents(Subscription subscription, long timeout)
             throws RepositoryException, UnsupportedRepositoryOperationException, InterruptedException
{
-        return getSubscription(sessionInfo).getEventBundles(filters, timeout);
+        if (subscription instanceof EventSubscription) {
+            return ((EventSubscription) subscription).getEventBundles(timeout);
+        } else {
+            throw new RepositoryException("Unknown subscription implementation: "
+                    + subscription.getClass().getName());
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void updateEventFilters(Subscription subscription,
+                                   EventFilter[] filters)
+            throws RepositoryException {
+        getEventSubscription(subscription).setFilters(filters);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void dispose(Subscription subscription) throws RepositoryException {
+        getEventSubscription(subscription).dispose();
     }
 
     /**
@@ -1028,25 +1051,6 @@
 
     //----------------------------< internal >----------------------------------
 
-    private EventSubscription getSubscription(SessionInfo sessionInfo)
-            throws RepositoryException {
-        SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
-        EventSubscription subscr;
-        synchronized (sInfo) {
-            subscr = (EventSubscription) subscriptions.get(sInfo);
-            if (subscr == null) {
-                subscr = new EventSubscription(idFactory, sInfo);
-                if (sInfo.getSession().isLive()) {
-                    ObservationManager obsMgr = sInfo.getSession().getWorkspace().getObservationManager();
-                    obsMgr.addEventListener(subscr, EventSubscription.ALL_EVENTS,
-                            "/", true, null, null, true);
-                }
-                subscriptions.put(sInfo, subscr);
-            }
-        }
-        return subscr;
-    }
-
     private final class BatchImpl implements Batch {
 
         private final SessionInfoImpl sInfo;
@@ -1351,6 +1355,16 @@
         }
     }
 
+    private EventSubscription getEventSubscription(Subscription subscription)
+            throws RepositoryException {
+        if (subscription instanceof EventSubscription) {
+            return (EventSubscription) subscription;
+        } else {
+            throw new RepositoryException("Unknown Subscription implementation: "
+                    + subscription.getClass().getName());
+        }
+    }
+
     private String getDestinationPath(NodeId destParentNodeId, Name destName, SessionInfoImpl
sessionInfo) throws RepositoryException {
         StringBuffer destPath = new StringBuffer(pathForId(destParentNodeId, sessionInfo));
         if (destPath.length() > 1) {
@@ -1464,16 +1478,27 @@
             throws RepositoryException {
         if (supportsObservation) {
             // register local event listener
-            EventSubscription subscr = (EventSubscription) subscriptions.get(sInfo);
-            if (subscr != null) {
+            Collection subscr = sInfo.getSubscriptions();
+            if (subscr.size() != 0) {
                 ObservationManager obsMgr = sInfo.getSession().getWorkspace().getObservationManager();
-                EventListener listener = subscr.getLocalEventListener();
-                obsMgr.addEventListener(listener, EventSubscription.ALL_EVENTS,
-                        "/", true, null, null, false);
+                List listeners = new ArrayList(subscr.size());
                 try {
+                    for (Iterator it = subscr.iterator(); it.hasNext(); ) {
+                        EventSubscription s = (EventSubscription) it.next();
+                        EventListener listener = s.getLocalEventListener();
+                        listeners.add(listener);
+                        obsMgr.addEventListener(listener, EventSubscription.ALL_EVENTS,
+                                "/", true, null, null, false);
+                    }
                     return call.run();
                 } finally {
-                    obsMgr.removeEventListener(listener);
+                    for (Iterator it = listeners.iterator(); it.hasNext(); ) {
+                        try {
+                            obsMgr.removeEventListener((EventListener) it.next());
+                        } catch (RepositoryException e) {
+                            // ignore and remove next
+                        }
+                    }
                 }
             }
         }

Modified: jackrabbit/trunk/jackrabbit-spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/SessionInfoImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/SessionInfoImpl.java?rev=592356&r1=592355&r2=592356&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/SessionInfoImpl.java
(original)
+++ jackrabbit/trunk/jackrabbit-spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/SessionInfoImpl.java
Tue Nov  6 02:16:14 2007
@@ -19,6 +19,9 @@
 import org.apache.jackrabbit.spi.SessionInfo;
 import org.apache.jackrabbit.spi.NameFactory;
 import org.apache.jackrabbit.spi.PathFactory;
+import org.apache.jackrabbit.spi.Subscription;
+import org.apache.jackrabbit.spi.EventFilter;
+import org.apache.jackrabbit.spi.IdFactory;
 import org.apache.jackrabbit.namespace.NamespaceResolver;
 import org.apache.jackrabbit.conversion.NamePathResolver;
 import org.apache.jackrabbit.conversion.ParsingNameResolver;
@@ -37,6 +40,10 @@
 import java.io.ByteArrayInputStream;
 import java.io.ObjectOutputStream;
 import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * <code>SessionInfoImpl</code> implements a session info based on a JCR
@@ -60,6 +67,16 @@
     private Credentials credentials;
 
     /**
+     * The subscriptions that are currently in place for this session info.
+     */
+    private List subscriptions = Collections.EMPTY_LIST;
+
+    /**
+     * Monitor object for subscription changes.
+     */
+    private Object subscriptionChange = new Object();
+
+    /**
      * Creates a new session info based on the given <code>session</code>.
      *
      * @param session     the JCR session.
@@ -123,6 +140,44 @@
     Credentials getCredentials() throws RepositoryException {
         // return a duplicate
         return duplicateCredentials(credentials);
+    }
+
+    Collection getSubscriptions() {
+        synchronized (subscriptionChange) {
+            return subscriptions;
+        }
+    }
+
+    /**
+     * Creates a subscriptions for this session info.
+     *
+     * @param idFactory the id factory.
+     * @param filters the initial list of filters.
+     * @return a subscription.
+     * @throws RepositoryException
+     */
+    Subscription createSubscription(IdFactory idFactory, EventFilter[] filters)
+            throws RepositoryException {
+        synchronized (subscriptionChange) {
+            List tmp = new ArrayList(subscriptions);
+            EventSubscription s = new EventSubscription(idFactory, this, filters);
+            tmp.add(s);
+            subscriptions = Collections.unmodifiableList(tmp);
+            return s;
+        }
+    }
+
+    /**
+     * Removes the subscription from this session info is it exists.
+     *
+     * @param subscription the subscription to remove.
+     */
+    void removeSubscription(Subscription subscription) {
+        synchronized (subscriptionChange) {
+            List tmp = new ArrayList(subscriptions);
+            tmp.remove(subscription);
+            subscriptions = Collections.unmodifiableList(tmp);
+        }
     }
 
     /**



Mime
View raw message