jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mreut...@apache.org
Subject svn commit: r512981 - in /jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr: EventBundleImpl.java EventFilterImpl.java EventImpl.java EventSubscription.java RepositoryServiceImpl.java
Date Wed, 28 Feb 2007 21:29:37 GMT
Author: mreutegg
Date: Wed Feb 28 13:29:36 2007
New Revision: 512981

URL: http://svn.apache.org/viewvc?view=rev&rev=512981
Log:
- Observation support in spi2jcr

Added:
    jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventBundleImpl.java   (with props)
    jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventFilterImpl.java   (with props)
    jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventImpl.java   (with props)
    jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventSubscription.java   (with props)
Modified:
    jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/RepositoryServiceImpl.java

Added: jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventBundleImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventBundleImpl.java?view=auto&rev=512981
==============================================================================
--- jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventBundleImpl.java (added)
+++ jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventBundleImpl.java Wed Feb 28 13:29:36 2007
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.spi2jcr;
+
+import org.apache.jackrabbit.spi.EventBundle;
+import org.apache.jackrabbit.spi.EventIterator;
+
+import java.util.Collection;
+
+/**
+ * <code>EventBundleImpl</code> implements a {@link EventBundle}.
+ */
+class EventBundleImpl implements EventBundle {
+
+    /**
+     * Indicates if this bundle was created due to a local change.
+     */
+    private final boolean isLocal;
+
+    /**
+     * The bundle id.
+     */
+    private final String bundleId;
+
+    /**
+     * The events in this bundle.
+     */
+    private final Collection events;
+
+    /**
+     * Creates a new event bundle with <code>events</code>.
+     *
+     * @param events   the events for this bundle.
+     * @param isLocal  if this events were created due to a local change.
+     * @param bundleId the bundle id.
+     */
+    EventBundleImpl(Collection events, boolean isLocal, String bundleId) {
+        this.events = events;
+        this.isLocal = isLocal;
+        this.bundleId = bundleId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public EventIterator getEvents() {
+        return new IteratorHelper(events);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public String getBundleId() {
+        return bundleId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean isLocal() {
+        return isLocal;
+    }
+}

Propchange: jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventBundleImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventFilterImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventFilterImpl.java?view=auto&rev=512981
==============================================================================
--- jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventFilterImpl.java (added)
+++ jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventFilterImpl.java Wed Feb 28 13:29:36 2007
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.spi2jcr;
+
+import org.apache.jackrabbit.spi.EventFilter;
+import org.apache.jackrabbit.spi.Event;
+import org.apache.jackrabbit.spi.NodeId;
+import org.apache.jackrabbit.name.Path;
+import org.apache.jackrabbit.name.MalformedPathException;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import javax.jcr.PathNotFoundException;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.io.Serializable;
+
+/**
+ * <code>EventFilterImpl</code> is the spi2dav implementation of an
+ * {@link EventFilter}.
+ * TODO: copied from spi2dav, move to spi-commons?
+ */
+class EventFilterImpl implements EventFilter, Serializable {
+
+    /**
+     * The logger instance for this class.
+     */
+    private static final Logger log = LoggerFactory.getLogger(EventFilterImpl.class);
+
+    private final int eventTypes;
+
+    private final boolean isDeep;
+
+    private final Path absPath;
+
+    private final Set uuids;
+
+    private final Set nodeTypeNames;
+
+    private final boolean noLocal;
+
+    /**
+     * Creates a new <code>EventFilterImpl</code>.
+     *
+     * @param eventTypes    the event types this filter is interested in.
+     * @param absPath       filter events that are below this path.
+     * @param isDeep        whether this filter is applied deep.
+     * @param uuids         the jcr:uuid of the nodes this filter allows.
+     * @param nodeTypeNames the QNames of the already resolved node types this
+     *                      filter allows.
+     * @param noLocal       whether this filter accepts local events or not.
+     */
+    EventFilterImpl(int eventTypes,
+                    Path absPath,
+                    boolean isDeep,
+                    String[] uuids,
+                    Set nodeTypeNames,
+                    boolean noLocal) {
+        this.eventTypes = eventTypes;
+        this.absPath = absPath;
+        this.isDeep = isDeep;
+        this.uuids = uuids != null ? new HashSet(Arrays.asList(uuids)) : null;
+        this.nodeTypeNames = nodeTypeNames != null ? new HashSet(nodeTypeNames) : null;
+        this.noLocal = noLocal;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean accept(Event event, boolean isLocal) {
+        int type = event.getType();
+        // check type
+        if ((type & eventTypes) == 0) {
+            return false;
+        }
+
+        // check local flag
+        if (isLocal && noLocal) {
+            return false;
+        }
+
+        // check UUIDs
+        NodeId parentId = event.getParentId();
+        if (uuids != null) {
+            if (parentId.getPath() == null) {
+                if (!uuids.contains(parentId.getUniqueID())) {
+                    return false;
+                }
+            } else {
+                return false;
+            }
+        }
+
+        // check node types
+        if (nodeTypeNames != null) {
+            Set eventTypes = new HashSet();
+            eventTypes.addAll(Arrays.asList(event.getMixinTypeNames()));
+            eventTypes.add(event.getPrimaryNodeTypeName());
+            // create intersection
+            eventTypes.retainAll(nodeTypeNames);
+            if (eventTypes.isEmpty()) {
+                return false;
+            }
+        }
+
+        // finally check path
+        try {
+            // the relevant path for the path filter depends on the event type
+            // for node events, the relevant path is the one returned by
+            // Event.getPath().
+            // for property events, the relevant path is the path of the
+            // node where the property belongs to.
+            Path eventPath;
+            if (type == Event.NODE_ADDED || type == Event.NODE_REMOVED) {
+                eventPath = event.getQPath();
+            } else {
+                eventPath = event.getQPath().getAncestor(1);
+            }
+
+            boolean match = eventPath.equals(absPath);
+            if (!match && isDeep) {
+                match = eventPath.isDescendantOf(absPath);
+            }
+            return match;
+        } catch (MalformedPathException e) {
+            // should never get here
+            log.warn("malformed path: " + e);
+            log.debug("Exception: ", e);
+        } catch (PathNotFoundException e) {
+            // should never get here
+            log.warn("invalid property path: " + e);
+            log.debug("Exception: ", e);
+        }
+        // if we get here an exception occurred while checking for the path
+        return false;
+    }
+
+    /**
+     * @return the event types this event filter accepts.
+     */
+    public int getEventTypes() {
+        return eventTypes;
+    }
+
+    /**
+     * @return <code>true</code> if this event filter is deep.
+     */
+    public boolean isDeep() {
+        return isDeep;
+    }
+
+    /**
+     * @return the path to the item where events are filtered.
+     */
+    public Path getAbsPath() {
+        return absPath;
+    }
+
+    /**
+     * @return the uuids of the nodes of this filter or <code>null</code> if
+     *         this filter does not care about uuids.
+     */
+    public String[] getUUIDs() {
+        if (uuids == null) {
+            return null;
+        } else {
+            return (String[]) uuids.toArray(new String[uuids.size()]);
+        }
+    }
+
+    /**
+     * @return an unmodifiable set of node type names or <code>null</code> if
+     *         this filter does not care about node types.
+     */
+    public Set getNodeTypeNames() {
+        if (nodeTypeNames == null) {
+            return null;
+        } else {
+            return Collections.unmodifiableSet(nodeTypeNames);
+        }
+    }
+
+    /**
+     * @return if this filter accepts local events.
+     */
+    public boolean getNoLocal() {
+        return noLocal;
+    }
+}

Propchange: jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventFilterImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventImpl.java?view=auto&rev=512981
==============================================================================
--- jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventImpl.java (added)
+++ jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventImpl.java Wed Feb 28 13:29:36 2007
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.spi2jcr;
+
+import org.apache.jackrabbit.spi.Event;
+import org.apache.jackrabbit.spi.ItemId;
+import org.apache.jackrabbit.spi.NodeId;
+import org.apache.jackrabbit.name.Path;
+import org.apache.jackrabbit.name.QName;
+
+import java.io.Serializable;
+
+/**
+ * <code>EventImpl</code> implements a serializable SPI
+ * {@link org.apache.jackrabbit.spi.Event}.
+ * TODO: copied from spi-rmi, move to spi-commons?
+ */
+class EventImpl implements Event, Serializable {
+
+    /**
+     * The SPI event type.
+     * @see Event
+     */
+    private final int type;
+
+    /**
+     * The path of the affected item.
+     */
+    private final Path path;
+
+    /**
+     * The id of the affected item.
+     */
+    private final ItemId itemId;
+
+    /**
+     * The id of the affected item.
+     */
+    private final NodeId parentId;
+
+    /**
+     * The name of the primary node type of the 'associated' node of this event.
+     */
+    private final QName primaryNodeTypeName;
+
+    /**
+     * The names of the mixin types of the 'associated' node of this event.
+     */
+    private final QName[] mixinTypeNames;
+
+    /**
+     * The user ID connected with this event.
+     */
+    private final String userId;
+
+    /**
+     * Creates a new serializable event.
+     */
+    public EventImpl(int type, Path path, ItemId itemId, NodeId parentId,
+                     QName primaryNodeTypeName, QName[] mixinTypeNames,
+                     String userId) {
+        this.type = type;
+        this.path = path;
+        this.itemId = itemId;
+        this.parentId = parentId;
+        this.primaryNodeTypeName = primaryNodeTypeName;
+        this.mixinTypeNames = mixinTypeNames;
+        this.userId = userId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public int getType() {
+        return type;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public Path getQPath() {
+        return path;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public ItemId getItemId() {
+        return itemId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public NodeId getParentId() {
+        return parentId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public QName getPrimaryNodeTypeName() {
+        return primaryNodeTypeName;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public QName[] getMixinTypeNames() {
+        QName[] mixins = new QName[mixinTypeNames.length];
+        System.arraycopy(mixinTypeNames, 0, mixins, 0, mixinTypeNames.length);
+        return mixins;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public String getUserID() {
+        return userId;
+    }
+}

Propchange: jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventSubscription.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventSubscription.java?view=auto&rev=512981
==============================================================================
--- jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventSubscription.java (added)
+++ jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventSubscription.java Wed Feb 28 13:29:36 2007
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.spi2jcr;
+
+import org.apache.jackrabbit.spi.EventBundle;
+import org.apache.jackrabbit.spi.EventFilter;
+import org.apache.jackrabbit.spi.EventIterator;
+import org.apache.jackrabbit.spi.Event;
+import org.apache.jackrabbit.spi.ItemId;
+import org.apache.jackrabbit.spi.NodeId;
+import org.apache.jackrabbit.uuid.UUID;
+import org.apache.jackrabbit.name.NamespaceResolver;
+import org.apache.jackrabbit.name.Path;
+import org.apache.jackrabbit.name.PathFormat;
+import org.apache.jackrabbit.name.QName;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import javax.jcr.observation.EventListener;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * <code>EventSubscription</code> listens for JCR events and creates SPI event
+ * bundles for them.
+ */
+class EventSubscription implements EventListener {
+
+    /**
+     * Logger instance for this class.
+     */
+    private static final Logger log = LoggerFactory.getLogger(EventSubscription.class);
+
+    /**
+     * Mask for all events.
+     */
+    static final int ALL_EVENTS = javax.jcr.observation.Event.NODE_ADDED
+            | javax.jcr.observation.Event.NODE_REMOVED
+            | javax.jcr.observation.Event.PROPERTY_ADDED
+            | javax.jcr.observation.Event.PROPERTY_CHANGED
+            | javax.jcr.observation.Event.PROPERTY_REMOVED;
+
+    private final List eventBundles = new ArrayList();
+
+    private final IdFactoryImpl idFactory;
+
+    private final NamespaceResolver nsResolver;
+
+    EventSubscription(IdFactoryImpl idFactory, NamespaceResolver nsResolver) {
+        this.idFactory = idFactory;
+        this.nsResolver = nsResolver;
+    }
+
+    /**
+     * Adds the events to the list of pending event bundles.
+     *
+     * @param events the events that occurred.
+     */
+    public void onEvent(javax.jcr.observation.EventIterator events) {
+        createEventBundle(events, false);
+    }
+
+    /**
+     * @return a temporary event listener that will create local event bundles
+     *         for delivered events.
+     */
+    EventListener getLocalEventListener() {
+        return new EventListener() {
+            public void onEvent(javax.jcr.observation.EventIterator events) {
+                createEventBundle(events, true);
+            }
+        };
+    }
+
+    /**
+     * @return all the pending event bundles.
+     */
+    EventBundle[] getEventBundles(EventFilter[] filters, long timeout) {
+        EventBundle[] bundles;
+        synchronized (eventBundles) {
+            while (eventBundles.isEmpty()) {
+                try {
+                    eventBundles.wait(timeout);
+                } catch (InterruptedException e) {
+                    // continue
+                }
+            }
+            bundles = (EventBundle[]) eventBundles.toArray(new EventBundle[eventBundles.size()]);
+            eventBundles.clear();
+        }
+        // apply filters to bundles
+        for (int i = 0; i < bundles.length; i++) {
+            List filteredEvents = new ArrayList();
+            for (EventIterator it = bundles[i].getEvents(); it.hasNext(); ) {
+                Event e = it.nextEvent();
+                // TODO: this is actually not correct. if filters are empty no event should go out
+                if (filters == null || filters.length == 0) {
+                    filteredEvents.add(e);
+                } else {
+                    for (int j = 0; j < filters.length; j++) {
+                        if (filters[j].accept(e, bundles[i].isLocal())) {
+                            filteredEvents.add(e);
+                            break;
+                        }
+                    }
+                }
+            }
+            bundles[i] = new EventBundleImpl(filteredEvents,
+                    bundles[i].isLocal(), bundles[i].getBundleId());
+        }
+        return bundles;
+    }
+
+    //--------------------------------< internal >------------------------------
+
+    private void createEventBundle(javax.jcr.observation.EventIterator events,
+                                   boolean isLocal) {
+        List spiEvents = new ArrayList();
+        while (events.hasNext()) {
+            try {
+                javax.jcr.observation.Event e = events.nextEvent();
+                Path p = PathFormat.parse(e.getPath(), nsResolver);
+                Path parent = p.getAncestor(1);
+                NodeId parentId = idFactory.createNodeId((String) null, parent);
+                ItemId itemId = null;
+                switch (e.getType()) {
+                    case Event.NODE_ADDED:
+                    case Event.NODE_REMOVED:
+                        itemId = idFactory.createNodeId((String) null, p);
+                        break;
+                    case Event.PROPERTY_ADDED:
+                    case Event.PROPERTY_CHANGED:
+                    case Event.PROPERTY_REMOVED:
+                        itemId = idFactory.createPropertyId(parentId,
+                                p.getNameElement().getName());
+                        break;
+                }
+                Event spiEvent = new EventImpl(e.getType(), p, itemId, parentId,
+                        null, new QName[0], e.getUserID());
+                spiEvents.add(spiEvent);
+            } catch (Exception ex) {
+                log.warn("Unable to create SPI Event: " + ex);
+            }
+        }
+        String bundleId = UUID.randomUUID().toString();
+        EventBundle bundle = new EventBundleImpl(spiEvents, isLocal, bundleId);
+        synchronized (eventBundles) {
+            eventBundles.add(bundle);
+            eventBundles.notify();
+        }
+    }
+}

Propchange: jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/EventSubscription.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/RepositoryServiceImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/RepositoryServiceImpl.java?view=diff&rev=512981&r1=512980&r2=512981
==============================================================================
--- jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/RepositoryServiceImpl.java (original)
+++ jackrabbit/trunk/contrib/spi/spi2jcr/src/main/java/org/apache/jackrabbit/spi2jcr/RepositoryServiceImpl.java Wed Feb 28 13:29:36 2007
@@ -66,6 +66,8 @@
 import javax.jcr.Workspace;
 import javax.jcr.ImportUUIDBehavior;
 import javax.jcr.Value;
+import javax.jcr.observation.ObservationManager;
+import javax.jcr.observation.EventListener;
 import javax.jcr.query.InvalidQueryException;
 import javax.jcr.query.QueryManager;
 import javax.jcr.query.Query;
@@ -85,6 +87,9 @@
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.Collections;
 import java.io.InputStream;
 import java.io.IOException;
 import java.io.ByteArrayInputStream;
@@ -107,6 +112,17 @@
     private final IdFactoryImpl idFactory = (IdFactoryImpl) IdFactoryImpl.getInstance();
 
     /**
+     * Maps session info instances to {@link EventSubscription}s.
+     */
+    private final Map subscriptions = Collections.synchronizedMap(new IdentityHashMap());
+
+    /**
+     * Set to <code>true</code> if the underlying JCR repository supports
+     * observation.
+     */
+    private final boolean supportsObservation;
+
+    /**
      * Creates a new repository service based on the given
      * <code>repository</code>.
      *
@@ -114,6 +130,7 @@
      */
     public RepositoryServiceImpl(Repository repository) {
         this.repository = repository;
+        this.supportsObservation = "true".equals(repository.getDescriptor(Repository.OPTION_OBSERVATION_SUPPORTED));
     }
 
     /**
@@ -137,8 +154,7 @@
         Map descriptors = new HashMap();
         String[] keys = repository.getDescriptorKeys();
         for (int i = 0; i < keys.length; i++) {
-            if (keys[i].equals(Repository.OPTION_TRANSACTIONS_SUPPORTED)
-                    || keys[i].equals(Repository.OPTION_OBSERVATION_SUPPORTED)) {
+            if (keys[i].equals(Repository.OPTION_TRANSACTIONS_SUPPORTED)) {
                 descriptors.put(keys[i], "false");
             } else {
                 descriptors.put(keys[i], repository.getDescriptor(keys[i]));
@@ -170,6 +186,7 @@
      * {@inheritDoc}
      */
     public void dispose(SessionInfo sessionInfo) throws RepositoryException {
+        subscriptions.remove(sessionInfo);
         getSessionInfoImpl(sessionInfo).getSession().logout();
     }
 
@@ -320,101 +337,126 @@
     /**
      * {@inheritDoc}
      */
-    public void importXml(SessionInfo sessionInfo,
-                          NodeId parentId,
-                          InputStream xmlStream,
-                          int uuidBehaviour) throws ItemExistsException, PathNotFoundException, VersionException, ConstraintViolationException, LockException, AccessDeniedException, UnsupportedRepositoryOperationException, RepositoryException {
-        SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
-        String path = pathForId(parentId, sInfo);
-        try {
-            sInfo.getSession().getWorkspace().importXML(path, xmlStream, uuidBehaviour);
-        } catch (IOException e) {
-            throw new RepositoryException(e.getMessage(), e);
-        }
+    public void importXml(final SessionInfo sessionInfo,
+                          final NodeId parentId,
+                          final InputStream xmlStream,
+                          final int uuidBehaviour) throws ItemExistsException, PathNotFoundException, VersionException, ConstraintViolationException, LockException, AccessDeniedException, UnsupportedRepositoryOperationException, RepositoryException {
+        final SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        executeWithLocalEvents(new Callable() {
+            public Object run() throws RepositoryException {
+                String path = pathForId(parentId, sInfo);
+                try {
+                    sInfo.getSession().getWorkspace().importXML(path, xmlStream, uuidBehaviour);
+                } catch (IOException e) {
+                    throw new RepositoryException(e.getMessage(), e);
+                }
+                return null;
+            }
+        }, sInfo);
     }
 
     /**
      * {@inheritDoc}
      */
-    public void move(SessionInfo sessionInfo,
-                     NodeId srcNodeId,
-                     NodeId destParentNodeId,
-                     QName destName) throws ItemExistsException, PathNotFoundException, VersionException, ConstraintViolationException, LockException, AccessDeniedException, UnsupportedRepositoryOperationException, RepositoryException {
-        SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
-        String srcPath = pathForId(srcNodeId, sInfo);
-        StringBuffer destPath = new StringBuffer(pathForId(destParentNodeId, sInfo));
-        try {
-            if (destPath.length() > 1) {
-                destPath.append("/");
+    public void move(final SessionInfo sessionInfo,
+                     final NodeId srcNodeId,
+                     final NodeId destParentNodeId,
+                     final QName destName) throws ItemExistsException, PathNotFoundException, VersionException, ConstraintViolationException, LockException, AccessDeniedException, UnsupportedRepositoryOperationException, RepositoryException {
+        final SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        executeWithLocalEvents(new Callable() {
+            public Object run() throws RepositoryException {
+                String srcPath = pathForId(srcNodeId, sInfo);
+                StringBuffer destPath = new StringBuffer(pathForId(destParentNodeId, sInfo));
+                try {
+                    if (destPath.length() > 1) {
+                        destPath.append("/");
+                    }
+                    destPath.append(NameFormat.format(destName, sInfo.getNamespaceResolver()));
+                } catch (NoPrefixDeclaredException e) {
+                    throw new RepositoryException(e.getMessage(), e);
+                }
+                sInfo.getSession().getWorkspace().move(srcPath, destPath.toString());
+                return null;
             }
-            destPath.append(NameFormat.format(destName, sInfo.getNamespaceResolver()));
-        } catch (NoPrefixDeclaredException e) {
-            throw new RepositoryException(e.getMessage(), e);
-        }
-        sInfo.getSession().getWorkspace().move(srcPath, destPath.toString());
+        }, sInfo);
     }
 
     /**
      * {@inheritDoc}
      */
-    public void copy(SessionInfo sessionInfo,
-                     String srcWorkspaceName,
-                     NodeId srcNodeId,
-                     NodeId destParentNodeId,
-                     QName destName) throws NoSuchWorkspaceException, ConstraintViolationException, VersionException, AccessDeniedException, PathNotFoundException, ItemExistsException, LockException, UnsupportedRepositoryOperationException, RepositoryException {
-        SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
-        String srcPath = pathForId(srcNodeId, sInfo);
-        StringBuffer destPath = new StringBuffer(pathForId(destParentNodeId, sInfo));
-        try {
-            if (destPath.length() > 1) {
-                destPath.append("/");
+    public void copy(final SessionInfo sessionInfo,
+                     final String srcWorkspaceName,
+                     final NodeId srcNodeId,
+                     final NodeId destParentNodeId,
+                     final QName destName) throws NoSuchWorkspaceException, ConstraintViolationException, VersionException, AccessDeniedException, PathNotFoundException, ItemExistsException, LockException, UnsupportedRepositoryOperationException, RepositoryException {
+        final SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        executeWithLocalEvents(new Callable() {
+            public Object run() throws RepositoryException {
+                String srcPath = pathForId(srcNodeId, sInfo);
+                StringBuffer destPath = new StringBuffer(pathForId(destParentNodeId, sInfo));
+                try {
+                    if (destPath.length() > 1) {
+                        destPath.append("/");
+                    }
+                    destPath.append(NameFormat.format(destName, sInfo.getNamespaceResolver()));
+                } catch (NoPrefixDeclaredException e) {
+                    throw new RepositoryException(e.getMessage(), e);
+                }
+                Workspace ws = sInfo.getSession().getWorkspace();
+                if (sInfo.getWorkspaceName().equals(srcWorkspaceName)) {
+                    // inner-workspace copy
+                    ws.copy(srcPath, destPath.toString());
+                } else {
+                    ws.copy(srcWorkspaceName, srcPath, destPath.toString());
+                }
+                return null;
             }
-            destPath.append(NameFormat.format(destName, sInfo.getNamespaceResolver()));
-        } catch (NoPrefixDeclaredException e) {
-            throw new RepositoryException(e.getMessage(), e);
-        }
-        Workspace ws = sInfo.getSession().getWorkspace();
-        if (sInfo.getWorkspaceName().equals(srcWorkspaceName)) {
-            // inner-workspace copy
-            ws.copy(srcPath, destPath.toString());
-        } else {
-            ws.copy(srcWorkspaceName, srcPath, destPath.toString());
-        }
+        }, sInfo);
     }
 
     /**
      * {@inheritDoc}
      */
-    public void update(SessionInfo sessionInfo,
-                       NodeId nodeId,
-                       String srcWorkspaceName)
+    public void update(final SessionInfo sessionInfo,
+                       final NodeId nodeId,
+                       final String srcWorkspaceName)
             throws NoSuchWorkspaceException, AccessDeniedException, LockException, InvalidItemStateException, RepositoryException {
-        SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
-        getNode(nodeId, sInfo).update(srcWorkspaceName);
+        final SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        executeWithLocalEvents(new Callable() {
+            public Object run() throws RepositoryException {
+                getNode(nodeId, sInfo).update(srcWorkspaceName);
+                return null;
+            }
+        }, sInfo);
     }
 
     /**
      * {@inheritDoc}
      */
-    public void clone(SessionInfo sessionInfo,
-                      String srcWorkspaceName,
-                      NodeId srcNodeId,
-                      NodeId destParentNodeId,
-                      QName destName,
-                      boolean removeExisting) throws NoSuchWorkspaceException, ConstraintViolationException, VersionException, AccessDeniedException, PathNotFoundException, ItemExistsException, LockException, UnsupportedRepositoryOperationException, RepositoryException {
-        SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
-        String srcPath = pathForId(srcNodeId, sInfo);
-        StringBuffer destPath = new StringBuffer(pathForId(destParentNodeId, sInfo));
-        try {
-            if (destPath.length() > 1) {
-                destPath.append("/");
+    public void clone(final SessionInfo sessionInfo,
+                      final String srcWorkspaceName,
+                      final NodeId srcNodeId,
+                      final NodeId destParentNodeId,
+                      final QName destName,
+                      final boolean removeExisting) throws NoSuchWorkspaceException, ConstraintViolationException, VersionException, AccessDeniedException, PathNotFoundException, ItemExistsException, LockException, UnsupportedRepositoryOperationException, RepositoryException {
+        final SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        executeWithLocalEvents(new Callable() {
+            public Object run() throws RepositoryException {
+                String srcPath = pathForId(srcNodeId, sInfo);
+                StringBuffer destPath = new StringBuffer(pathForId(destParentNodeId, sInfo));
+                try {
+                    if (destPath.length() > 1) {
+                        destPath.append("/");
+                    }
+                    destPath.append(NameFormat.format(destName, sInfo.getNamespaceResolver()));
+                } catch (NoPrefixDeclaredException e) {
+                    throw new RepositoryException(e.getMessage(), e);
+                }
+                sInfo.getSession().getWorkspace().clone(srcWorkspaceName, srcPath,
+                        destPath.toString(), removeExisting);
+                return null;
             }
-            destPath.append(NameFormat.format(destName, sInfo.getNamespaceResolver()));
-        } catch (NoPrefixDeclaredException e) {
-            throw new RepositoryException(e.getMessage(), e);
-        }
-        sInfo.getSession().getWorkspace().clone(srcWorkspaceName, srcPath,
-                destPath.toString(), removeExisting);
+        }, sInfo);
     }
 
     /**
@@ -430,15 +472,19 @@
     /**
      * {@inheritDoc}
      */
-    public LockInfo lock(SessionInfo sessionInfo,
-                         NodeId nodeId,
-                         boolean deep,
-                         boolean sessionScoped)
+    public LockInfo lock(final SessionInfo sessionInfo,
+                         final NodeId nodeId,
+                         final boolean deep,
+                         final boolean sessionScoped)
             throws UnsupportedRepositoryOperationException, LockException, AccessDeniedException, InvalidItemStateException, RepositoryException {
-        SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
-        Node n = getNode(nodeId, sInfo);
-        n.lock(deep, sessionScoped);
-        return new LockInfoImpl(n, idFactory, sInfo.getNamespaceResolver());
+        final SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        return (LockInfo) executeWithLocalEvents(new Callable() {
+            public Object run() throws RepositoryException {
+                Node n = getNode(nodeId, sInfo);
+                n.lock(deep, sessionScoped);
+                return new LockInfoImpl(n, idFactory, sInfo.getNamespaceResolver());
+            }
+        }, sInfo);
     }
 
     /**
@@ -452,94 +498,131 @@
     /**
      * {@inheritDoc}
      */
-    public void unlock(SessionInfo sessionInfo, NodeId nodeId)
+    public void unlock(final SessionInfo sessionInfo, final NodeId nodeId)
             throws UnsupportedRepositoryOperationException, LockException, AccessDeniedException, InvalidItemStateException, RepositoryException {
-        getNode(nodeId, getSessionInfoImpl(sessionInfo)).unlock();
+        final SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        executeWithLocalEvents(new Callable() {
+            public Object run() throws RepositoryException {
+                getNode(nodeId, sInfo).unlock();
+                return null;
+            }
+        }, sInfo);
     }
 
     /**
      * {@inheritDoc}
      */
-    public void checkin(SessionInfo sessionInfo, NodeId nodeId)
+    public void checkin(final SessionInfo sessionInfo, final NodeId nodeId)
             throws VersionException, UnsupportedRepositoryOperationException, InvalidItemStateException, LockException, RepositoryException {
-        getNode(nodeId, getSessionInfoImpl(sessionInfo)).checkin();
+        final SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        executeWithLocalEvents(new Callable() {
+            public Object run() throws RepositoryException {
+                getNode(nodeId, getSessionInfoImpl(sessionInfo)).checkin();
+                return null;
+            }
+        }, sInfo);
     }
 
     /**
      * {@inheritDoc}
      */
-    public void checkout(SessionInfo sessionInfo, NodeId nodeId)
+    public void checkout(final SessionInfo sessionInfo, final NodeId nodeId)
             throws UnsupportedRepositoryOperationException, LockException, RepositoryException {
-        getNode(nodeId, getSessionInfoImpl(sessionInfo)).checkout();
+        final SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        executeWithLocalEvents(new Callable() {
+            public Object run() throws RepositoryException {
+                getNode(nodeId, getSessionInfoImpl(sessionInfo)).checkout();
+                return null;
+            }
+        }, sInfo);
     }
 
     /**
      * {@inheritDoc}
      */
-    public void removeVersion(SessionInfo sessionInfo,
-                              NodeId versionHistoryId,
-                              NodeId versionId)
+    public void removeVersion(final SessionInfo sessionInfo,
+                              final NodeId versionHistoryId,
+                              final NodeId versionId)
             throws ReferentialIntegrityException, AccessDeniedException, UnsupportedRepositoryOperationException, VersionException, RepositoryException {
-        SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
-        Node vHistory = getNode(versionHistoryId, sInfo);
-        Node version = getNode(versionId, sInfo);
-        if (vHistory instanceof VersionHistory) {
-            ((VersionHistory) vHistory).removeVersion(version.getName());
-        } else {
-            throw new RepositoryException("versionHistoryId does not reference a VersionHistor node");
-        }
+        final SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        executeWithLocalEvents(new Callable() {
+            public Object run() throws RepositoryException {
+                Node vHistory = getNode(versionHistoryId, sInfo);
+                Node version = getNode(versionId, sInfo);
+                if (vHistory instanceof VersionHistory) {
+                    ((VersionHistory) vHistory).removeVersion(version.getName());
+                } else {
+                    throw new RepositoryException("versionHistoryId does not reference a VersionHistor node");
+                }
+                return null;
+            }
+        }, sInfo);
     }
 
     /**
      * {@inheritDoc}
      */
-    public void restore(SessionInfo sessionInfo,
-                        NodeId nodeId,
-                        NodeId versionId,
-                        boolean removeExisting) throws VersionException, PathNotFoundException, ItemExistsException, UnsupportedRepositoryOperationException, LockException, InvalidItemStateException, RepositoryException {
-        SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
-        Node n = getNode(nodeId, sInfo);
-        Node v = getNode(versionId, sInfo);
-        n.restore(v.getName(), removeExisting);
+    public void restore(final SessionInfo sessionInfo,
+                        final NodeId nodeId,
+                        final NodeId versionId,
+                        final boolean removeExisting) throws VersionException, PathNotFoundException, ItemExistsException, UnsupportedRepositoryOperationException, LockException, InvalidItemStateException, RepositoryException {
+        final SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        executeWithLocalEvents(new Callable() {
+            public Object run() throws RepositoryException {
+                Node n = getNode(nodeId, sInfo);
+                Node v = getNode(versionId, sInfo);
+                n.restore(v.getName(), removeExisting);
+                return null;
+            }
+        }, sInfo);
     }
 
     /**
      * {@inheritDoc}
      */
-    public void restore(SessionInfo sessionInfo,
-                        NodeId[] versionIds,
-                        boolean removeExisting) throws ItemExistsException, UnsupportedRepositoryOperationException, VersionException, LockException, InvalidItemStateException, RepositoryException {
-        SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
-        Version[] versions = new Version[versionIds.length];
-        for (int i = 0; i < versions.length; i++) {
-            Node n = getNode(versionIds[i], sInfo);
-            if (n instanceof Version) {
-                versions[i] = (Version) n;
-            } else {
-                throw new RepositoryException(n.getPath() +
-                        " does not reference a Version node");
+    public void restore(final SessionInfo sessionInfo,
+                        final NodeId[] versionIds,
+                        final boolean removeExisting) throws ItemExistsException, UnsupportedRepositoryOperationException, VersionException, LockException, InvalidItemStateException, RepositoryException {
+        final SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        executeWithLocalEvents(new Callable() {
+            public Object run() throws RepositoryException {
+                Version[] versions = new Version[versionIds.length];
+                for (int i = 0; i < versions.length; i++) {
+                    Node n = getNode(versionIds[i], sInfo);
+                    if (n instanceof Version) {
+                        versions[i] = (Version) n;
+                    } else {
+                        throw new RepositoryException(n.getPath() +
+                                " does not reference a Version node");
+                    }
+                }
+                sInfo.getSession().getWorkspace().restore(versions, removeExisting);
+                return null;
             }
-        }
-        sInfo.getSession().getWorkspace().restore(versions, removeExisting);
+        }, sInfo);
     }
 
     /**
      * {@inheritDoc}
      */
-    public IdIterator merge(SessionInfo sessionInfo,
-                            NodeId nodeId,
-                            String srcWorkspaceName,
-                            boolean bestEffort)
+    public IdIterator merge(final SessionInfo sessionInfo,
+                            final NodeId nodeId,
+                            final String srcWorkspaceName,
+                            final boolean bestEffort)
             throws NoSuchWorkspaceException, AccessDeniedException, MergeException, LockException, InvalidItemStateException, RepositoryException {
-        SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
-        Node n = getNode(nodeId, sInfo);
-        NodeIterator it = n.merge(srcWorkspaceName, bestEffort);
-        List ids = new ArrayList();
-        while (it.hasNext()) {
-            ids.add(idFactory.createNodeId(it.nextNode(),
-                    sInfo.getNamespaceResolver()));
-        }
-        return new IteratorHelper(ids);
+        final SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        return (IdIterator) executeWithLocalEvents(new Callable() {
+            public Object run() throws RepositoryException {
+                Node n = getNode(nodeId, sInfo);
+                NodeIterator it = n.merge(srcWorkspaceName, bestEffort);
+                List ids = new ArrayList();
+                while (it.hasNext()) {
+                    ids.add(idFactory.createNodeId(it.nextNode(),
+                            sInfo.getNamespaceResolver()));
+                }
+                return new IteratorHelper(ids);
+            }
+        }, sInfo);
     }
 
     /**
@@ -556,48 +639,58 @@
     /**
      * {@inheritDoc}
      */
-    public void addVersionLabel(SessionInfo sessionInfo,
-                                NodeId versionHistoryId,
-                                NodeId versionId,
-                                QName label,
-                                boolean moveLabel) throws VersionException, RepositoryException {
-        SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
-        String jcrLabel;
-        try {
-            jcrLabel = NameFormat.format(label, sInfo.getNamespaceResolver());
-        } catch (NoPrefixDeclaredException e) {
-            throw new RepositoryException(e.getMessage(), e);
-        }
-        Node version = getNode(versionId, sInfo);
-        Node vHistory = getNode(versionHistoryId, sInfo);
-        if (vHistory instanceof VersionHistory) {
-            ((VersionHistory) vHistory).addVersionLabel(
-                    version.getName(), jcrLabel, moveLabel);
-        } else {
-            throw new RepositoryException("versionHistoryId does not reference a VersionHistory node");
-        }
+    public void addVersionLabel(final SessionInfo sessionInfo,
+                                final NodeId versionHistoryId,
+                                final NodeId versionId,
+                                final QName label,
+                                final boolean moveLabel) throws VersionException, RepositoryException {
+        final SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        executeWithLocalEvents(new Callable() {
+            public Object run() throws RepositoryException {
+                String jcrLabel;
+                try {
+                    jcrLabel = NameFormat.format(label, sInfo.getNamespaceResolver());
+                } catch (NoPrefixDeclaredException e) {
+                    throw new RepositoryException(e.getMessage(), e);
+                }
+                Node version = getNode(versionId, sInfo);
+                Node vHistory = getNode(versionHistoryId, sInfo);
+                if (vHistory instanceof VersionHistory) {
+                    ((VersionHistory) vHistory).addVersionLabel(
+                            version.getName(), jcrLabel, moveLabel);
+                } else {
+                    throw new RepositoryException("versionHistoryId does not reference a VersionHistory node");
+                }
+                return null;
+            }
+        }, sInfo);
     }
 
     /**
      * {@inheritDoc}
      */
-    public void removeVersionLabel(SessionInfo sessionInfo,
-                                   NodeId versionHistoryId,
-                                   NodeId versionId,
-                                   QName label) throws VersionException, RepositoryException {
-        SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
-        String jcrLabel;
-        try {
-            jcrLabel = NameFormat.format(label, sInfo.getNamespaceResolver());
-        } catch (NoPrefixDeclaredException e) {
-            throw new RepositoryException(e.getMessage(), e);
-        }
-        Node vHistory = getNode(versionHistoryId, sInfo);
-        if (vHistory instanceof VersionHistory) {
-            ((VersionHistory) vHistory).removeVersionLabel(jcrLabel);
-        } else {
-            throw new RepositoryException("versionHistoryId does not reference a VersionHistory node");
-        }
+    public void removeVersionLabel(final SessionInfo sessionInfo,
+                                   final NodeId versionHistoryId,
+                                   final NodeId versionId,
+                                   final QName label) throws VersionException, RepositoryException {
+        final SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        executeWithLocalEvents(new Callable() {
+            public Object run() throws RepositoryException {
+                String jcrLabel;
+                try {
+                    jcrLabel = NameFormat.format(label, sInfo.getNamespaceResolver());
+                } catch (NoPrefixDeclaredException e) {
+                    throw new RepositoryException(e.getMessage(), e);
+                }
+                Node vHistory = getNode(versionHistoryId, sInfo);
+                if (vHistory instanceof VersionHistory) {
+                    ((VersionHistory) vHistory).removeVersionLabel(jcrLabel);
+                } else {
+                    throw new RepositoryException("versionHistoryId does not reference a VersionHistory node");
+                }
+                return null;
+            }
+        }, sInfo);
     }
 
     /**
@@ -636,7 +729,7 @@
     }
 
     /**
-     * TODO: implement
+     * {@inheritDoc}
      */
     public EventFilter createEventFilter(SessionInfo sessionInfo,
                                          int eventTypes,
@@ -646,17 +739,39 @@
                                          QName[] nodeTypeName,
                                          boolean noLocal)
             throws UnsupportedRepositoryOperationException, RepositoryException {
-        throw new UnsupportedRepositoryOperationException("not yet implemented");
+        // make sure there is an event subscription for this session info
+        SessionInfoImpl sInfo = getSessionInfoImpl(sessionInfo);
+        if (!subscriptions.containsKey(sInfo)) {
+            EventSubscription subscr = new EventSubscription(
+                    idFactory, sInfo.getNamespaceResolver());
+            ObservationManager obsMgr = sInfo.getSession().getWorkspace().getObservationManager();
+            obsMgr.addEventListener(subscr, EventSubscription.ALL_EVENTS,
+                    "/", true, null, null, true);
+            subscriptions.put(sInfo, subscr);
+        }
+
+        Set ntNames = null;
+        if (nodeTypeName != null) {
+            ntNames = new HashSet(Arrays.asList(nodeTypeName));
+        }
+        return new EventFilterImpl(eventTypes, absPath, isDeep, uuid, ntNames, noLocal);
     }
 
     /**
-     * TODO: implement
+     * {@inheritDoc}
      */
     public EventBundle[] getEvents(SessionInfo sessionInfo,
                                    long timeout,
                                    EventFilter[] filters)
             throws RepositoryException, UnsupportedRepositoryOperationException, InterruptedException {
-        throw new UnsupportedRepositoryOperationException("not yet implemented");
+        EventSubscription subscr = (EventSubscription) subscriptions.get(sessionInfo);
+        if (subscr != null) {
+            return subscr.getEventBundles(filters, timeout);
+        } else {
+            // sleep for at most one second, then return
+            Thread.sleep(Math.min(timeout, 1000));
+            return new EventBundle[0];
+        }
     }
 
     /**
@@ -746,7 +861,7 @@
                             final QName nodetypeName,
                             final String uuid) throws RepositoryException {
             executeGuarded(new Callable() {
-                public void run() throws RepositoryException {
+                public Object run() throws RepositoryException {
                     Session s = sInfo.getSession();
                     Node n = getNode(parentId, sInfo);
                     String jcrName = getJcrName(nodeName);
@@ -766,6 +881,7 @@
                             throw new RepositoryException(e.getMessage(), e);
                         }
                     }
+                    return null;
                 }
             });
         }
@@ -775,12 +891,13 @@
                                 final QValue value)
                 throws ValueFormatException, VersionException, LockException, ConstraintViolationException, PathNotFoundException, ItemExistsException, AccessDeniedException, UnsupportedRepositoryOperationException, RepositoryException {
             executeGuarded(new Callable() {
-                public void run() throws RepositoryException {
+                public Object run() throws RepositoryException {
                     Session s = sInfo.getSession();
                     Node n = getNode(parentId, sInfo);
                     Value jcrValue = ValueFormat.getJCRValue(value,
                             sInfo.getNamespaceResolver(), s.getValueFactory());
                     n.setProperty(getJcrName(propertyName), jcrValue);
+                    return null;
                 }
             });
         }
@@ -789,7 +906,7 @@
                                 final QName propertyName,
                                 final QValue[] values) throws ValueFormatException, VersionException, LockException, ConstraintViolationException, PathNotFoundException, ItemExistsException, AccessDeniedException, UnsupportedRepositoryOperationException, RepositoryException {
             executeGuarded(new Callable() {
-                public void run() throws RepositoryException {
+                public Object run() throws RepositoryException {
                     Session s = sInfo.getSession();
                     Node n = getNode(parentId, sInfo);
                     Value[] jcrValues = new Value[values.length];
@@ -798,6 +915,7 @@
                                 sInfo.getNamespaceResolver(), s.getValueFactory());
                     }
                     n.setProperty(getJcrName(propertyName), jcrValues);
+                    return null;
                 }
             });
         }
@@ -805,11 +923,12 @@
         public void setValue(final PropertyId propertyId, final QValue value)
                 throws RepositoryException {
             executeGuarded(new Callable() {
-                public void run() throws RepositoryException {
+                public Object run() throws RepositoryException {
                     Session s = sInfo.getSession();
                     Value jcrValue = ValueFormat.getJCRValue(value,
                             sInfo.getNamespaceResolver(), s.getValueFactory());
                     getProperty(propertyId, sInfo).setValue(jcrValue);
+                    return null;
                 }
             });
         }
@@ -817,7 +936,7 @@
         public void setValue(final PropertyId propertyId, final QValue[] values)
                 throws RepositoryException {
             executeGuarded(new Callable() {
-                public void run() throws RepositoryException {
+                public Object run() throws RepositoryException {
                     Session s = sInfo.getSession();
                     Value[] jcrValues = new Value[values.length];
                     for (int i = 0; i < jcrValues.length; i++) {
@@ -825,18 +944,20 @@
                                 sInfo.getNamespaceResolver(), s.getValueFactory());
                     }
                     getProperty(propertyId, sInfo).setValue(jcrValues);
+                    return null;
                 }
             });
         }
 
         public void remove(final ItemId itemId) throws RepositoryException {
             executeGuarded(new Callable() {
-                public void run() throws RepositoryException {
+                public Object run() throws RepositoryException {
                     if (itemId.denotesNode()) {
                         getNode((NodeId) itemId, sInfo).remove();
                     } else {
                         getProperty((PropertyId) itemId, sInfo).remove();
                     }
+                    return null;
                 }
             });
         }
@@ -846,7 +967,7 @@
                                  final NodeId beforeNodeId)
                 throws RepositoryException {
             executeGuarded(new Callable() {
-                public void run() throws RepositoryException {
+                public Object run() throws RepositoryException {
                     Node parent = getNode(parentId, sInfo);
                     Node srcNode = getNode(srcNodeId, sInfo);
                     Node beforeNode = null;
@@ -865,6 +986,7 @@
                         }
                     }
                     parent.orderBefore(srcPath, beforePath);
+                    return null;
                 }
             });
         }
@@ -873,7 +995,7 @@
                               final QName[] mixinNodeTypeIds)
                 throws RepositoryException {
             executeGuarded(new Callable() {
-                public void run() throws RepositoryException {
+                public Object run() throws RepositoryException {
                     Set mixinNames = new HashSet();
                     for (int i = 0; i < mixinNodeTypeIds.length; i++) {
                         mixinNames.add(getJcrName(mixinNodeTypeIds[i]));
@@ -893,6 +1015,7 @@
                     for (Iterator it = mixinNames.iterator(); it.hasNext(); ) {
                         n.addMixin((String) it.next());
                     }
+                    return null;
                 }
             });
         }
@@ -901,7 +1024,7 @@
                          final NodeId destParentNodeId,
                          final QName destName) throws RepositoryException {
             executeGuarded(new Callable() {
-                public void run() throws RepositoryException {
+                public Object run() throws RepositoryException {
                     String srcPath = pathForId(srcNodeId, sInfo);
                     String destPath = pathForId(destParentNodeId, sInfo);
                     if (destPath.length() > 1) {
@@ -909,6 +1032,7 @@
                     }
                     destPath += getJcrName(destName);
                     sInfo.getSession().move(srcPath, destPath);
+                    return null;
                 }
             });
         }
@@ -963,15 +1087,21 @@
                 VersionException, LockException, NoSuchNodeTypeException,
                 RepositoryException {
             executeGuarded(new Callable() {
-                public void run() throws RepositoryException {
-                    sInfo.getSession().save();
+                public Object run() throws RepositoryException {
+                    executeWithLocalEvents(new Callable() {
+                        public Object run() throws RepositoryException {
+                            sInfo.getSession().save();
+                            return null;
+                        }
+                    }, sInfo);
+                    return null;
                 }
             });
         }
     }
 
     private interface Callable {
-        public void run() throws RepositoryException;
+        public Object run() throws RepositoryException;
     }
 
     private SessionInfoImpl getSessionInfoImpl(SessionInfo sessionInfo)
@@ -1086,5 +1216,26 @@
                 session.setNamespacePrefix(nsReg.getPrefix(uri), uri);
             }
         }
+    }
+
+    private Object executeWithLocalEvents(Callable call, SessionInfoImpl sInfo)
+            throws RepositoryException {
+        if (supportsObservation) {
+            // register local event listener
+            EventSubscription subscr = (EventSubscription) subscriptions.get(sInfo);
+            if (subscr != null) {
+                ObservationManager obsMgr = sInfo.getSession().getWorkspace().getObservationManager();
+                EventListener listener = subscr.getLocalEventListener();
+                obsMgr.addEventListener(listener, EventSubscription.ALL_EVENTS,
+                        "/", true, null, null, false);
+                try {
+                    return call.run();
+                } finally {
+                    obsMgr.removeEventListener(listener);
+                }
+            }
+        }
+        // if we get here simply run as is
+        return call.run();
     }
 }



Mime
View raw message