jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdue...@apache.org
Subject svn commit: r1174808 - /jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
Date Fri, 23 Sep 2011 14:28:58 GMT
Author: mduerig
Date: Fri Sep 23 14:28:58 2011
New Revision: 1174808

URL: http://svn.apache.org/viewvc?rev=1174808&view=rev
Log:
Microkernel based Jackrabbit prototype (WIP)
remove now unnecessary synchronization

Modified:
    jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java

Modified: jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java?rev=1174808&r1=1174807&r2=1174808&view=diff
==============================================================================
--- jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
(original)
+++ jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
Fri Sep 23 14:28:58 2011
@@ -114,29 +114,23 @@ public final class WorkspaceManager impl
     private final SessionInfo sessionInfo;
     private final NameFactory nameFactory;
     private final PathFactory pathFactory;
+    private final IdFactory idFactory;
 
     private final TransientItemStateFactory itemStateFactory;
-    private final HierarchyManager hierarchyManager;
 
-    private final int pollTimeout;
+    private final HierarchyManager hierarchyManager;
 
-    private final IdFactory idFactory;
     private final NamespaceRegistryImpl nsRegistry;
     private final NodeTypeRegistryImpl ntRegistry;
     private final ItemDefinitionProvider definitionProvider;
 
-    /**
-     * Semaphore to synchronize the feed thread with client
-     * threads that call {@link #execute(Operation)} or {@link
-     * #execute(ChangeLog)}.
-     */
-    private final Semaphore updateSync = new Semaphore(1);
+    private final int pollTimeout;
 
     /**
      * This is the event polling for changes. If {@code null}
      * then the underlying repository service does not support observation.
      */
-    private ChangePolling changeFeed;  
+    private ChangePolling changeFeed;
 
     /**
      * List of event listener that are set on this WorkspaceManager to get
@@ -352,8 +346,7 @@ public final class WorkspaceManager impl
      * @param listener The listener to be removed.
      * @throws RepositoryException If an error occurs.
      */
-    public void removeEventListener(InternalEventListener listener)
-            throws RepositoryException {
+    public void removeEventListener(InternalEventListener listener) throws RepositoryException
{
         synchronized (listeners) {
             listeners.remove(listener);
             if (listeners.isEmpty()) {
@@ -508,19 +501,9 @@ public final class WorkspaceManager impl
      */
     @Override
     public void execute(Operation operation) throws RepositoryException {
-        // block event delivery while changes are executed
-        try {
-            updateSync.acquire();
-        } catch (InterruptedException e) {
-            throw new RepositoryException(e);
-        }
-        try {
-            // Execute operation and delegate invalidation of affected item states to the
operation.
-            new OperationVisitorImpl(sessionInfo).execute(operation);
-            operation.persisted();
-        } finally {
-            updateSync.release();
-        }
+        // Execute operation and delegate invalidation of affected item states to the operation.
+        new OperationVisitorImpl(sessionInfo).execute(operation);
+        operation.persisted();
     }
 
     /**
@@ -531,18 +514,8 @@ public final class WorkspaceManager impl
      */
     @Override
     public void execute(ChangeLog changes) throws RepositoryException {
-        // block event delivery while changes are executed
-        try {
-            updateSync.acquire();
-        } catch (InterruptedException e) {
-            throw new RepositoryException(e);
-        }
-        try {
-            new OperationVisitorImpl(sessionInfo).execute(changes);
-            changes.persisted();
-        } finally {
-            updateSync.release();
-        }
+        new OperationVisitorImpl(sessionInfo).execute(changes);
+        changes.persisted();
     }
 
     /**
@@ -551,12 +524,6 @@ public final class WorkspaceManager impl
     @Override
     public synchronized void dispose() {
         try {
-            updateSync.acquire();
-        } catch (InterruptedException e) {
-            log.warn("Exception while disposing WorkspaceManager: " + e);
-            return;
-        }
-        try {
             if (changeFeed != null) {
                 changeFeed.cancel();
             }
@@ -565,20 +532,18 @@ public final class WorkspaceManager impl
                 service.dispose(subscription);
             }
             service.dispose(sessionInfo);
-            cache.dispose();
         } catch (Exception e) {
-            log.warn("Exception while disposing WorkspaceManager: " + e);
-        } finally {
-            updateSync.release();
+            log.error("Exception while disposing WorkspaceManager", e);
         }
+        
+        cache.dispose();
         ntRegistry.dispose();
     }
 
     //------------------------------------------------------< AccessManager >---
 
     @Override
-    public boolean isGranted(NodeState parentState, Path relPath, String[] actions)
-            throws RepositoryException {
+    public boolean isGranted(NodeState parentState, Path relPath, String[] actions) throws
RepositoryException {
         if (parentState.getStatus() == Status.NEW) {
             return true;
         }
@@ -658,10 +623,8 @@ public final class WorkspaceManager impl
      * @param eventBundles the event bundles generated by the repository service
      *                     as the effect of an local or external change.
      * @param eventListeners Array of internal event listeners
-     * @throws InterruptedException if this thread is interrupted while waiting
-     *                              for the {@link #updateSync}.
      */
-    private void onEventReceived(EventBundle[] eventBundles, InternalEventListener[] eventListeners)
throws InterruptedException {
+    private static void onEventReceived(EventBundle[] eventBundles, InternalEventListener[]
eventListeners) {
         if (log.isDebugEnabled()) {
             log.debug("received {} event bundles.", eventBundles.length);
             for (EventBundle eventBundle : eventBundles) {
@@ -699,22 +662,15 @@ public final class WorkspaceManager impl
             }
         }
 
-        // do not deliver events while an operation executes
-        updateSync.acquire();
-        try {
-            // notify listener
-            for (EventBundle eventBundle : eventBundles) {
-                for (InternalEventListener l : eventListeners) {
-                    try {
-                        l.onEvent(eventBundle);
-                    } catch (Exception e) {
-                        log.warn("Exception in event polling thread: " + e);
-                        log.debug("Dump:", e);
-                    }
+        for (EventBundle eventBundle : eventBundles) {
+            for (InternalEventListener l : eventListeners) {
+                try {
+                    l.onEvent(eventBundle);
+                } catch (Exception e) {
+                    log.warn("Exception in event polling thread: " + e);
+                    log.debug("Dump:", e);
                 }
             }
-        } finally {
-            updateSync.release();
         }
     }
 
@@ -1016,7 +972,8 @@ public final class WorkspaceManager impl
      */
     private final class ChangePolling implements Callable<Object> {
         private final ExecutorService changePollingExecutor;
-        private volatile Future<Object> result;
+        private volatile Future<Object> result;             // Used to cancel change
polling
+        private final Semaphore running = new Semaphore(1); // Used to determine that change
polling terminated
 
         public ChangePolling(ExecutorService changePollingExecutor) {
             this.changePollingExecutor = changePollingExecutor;
@@ -1026,8 +983,11 @@ public final class WorkspaceManager impl
 
         @Override
         public Object call() throws Exception {
-            String wspName = sessionInfo.getWorkspaceName();            
             try {
+                if (running.availablePermits() > 0) {
+                    running.acquire();
+                }
+                
                 InternalEventListener[] internalEventListeners;
                 Subscription subscription;
                 synchronized (listeners) {
@@ -1039,30 +999,40 @@ public final class WorkspaceManager impl
                     internalEventListeners = listeners.toArray(new InternalEventListener[listeners.size()]);
                 }
 
-                EventBundle[] bundles = service.getEvents(subscription, pollTimeout);
+                EventBundle[] bundles = null;
+                try {
+                    bundles = service.getEvents(subscription, pollTimeout);
+                } catch (UnsupportedRepositoryOperationException e) {
+                    log.error("SPI implementation does not support observation", e);
+                    running.release();
+                    return null;
+                } catch (RepositoryException e) {
+                    log.error("Workspace=" + sessionInfo.getWorkspaceName() + ": Exception
while retrieving event bundles", e);
+                }
 
-                if (!Thread.interrupted() && bundles.length > 0) {
+                if (!Thread.interrupted() && bundles != null && bundles.length
> 0) {
                     onEventReceived(bundles, internalEventListeners);
                 }
 
                 synchronized (this) {
                     if (result != null) {
                         result = changePollingExecutor.submit(this);
+                        return null;
                     }
                     else {
-                        log.debug("Cancelled change polling instance: {}", this);
+                        // terminate
+                        running.release();
+                        return null;
                     }
                 }
-            } catch (UnsupportedRepositoryOperationException e) {
-                log.error("SPI implementation does not support observation: " + e);
-            } catch (RepositoryException e) {
-                log.info("Workspace=" + wspName + ": Exception while retrieving event bundles:
" + e);
-                log.debug(e.getMessage(), e);
             }
-            return null;
+            catch (InterruptedException e) {
+                running.release();
+                throw e;
+            }
         }
 
-        public void cancel() {
+        public void cancel() throws InterruptedException {
             Future<Object> result;
             synchronized (this) {
                 result = this.result;
@@ -1070,8 +1040,9 @@ public final class WorkspaceManager impl
             }
 
             if (result != null) {
-                log.debug("Cancelling change polling instance: {}", this);
                 result.cancel(true);
+                running.acquire();  // wait for change polling to terminate
+                running.release();
             }
         }
     }



Mime
View raw message