jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdue...@apache.org
Subject svn commit: r1144268 - in /jackrabbit/sandbox/spi2microkernel/src: main/java/org/apache/jackrabbit/spi2microkernel/SubscriptionImpl.java test/java/org/apache/jackrabbit/spi2microkernel/RepositoryTest.java
Date Fri, 08 Jul 2011 11:24:16 GMT
Author: mduerig
Date: Fri Jul  8 11:24:16 2011
New Revision: 1144268

URL: http://svn.apache.org/viewvc?rev=1144268&view=rev
Log:
observation: implement dispose for observation listeners

Modified:
    jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel/SubscriptionImpl.java
    jackrabbit/sandbox/spi2microkernel/src/test/java/org/apache/jackrabbit/spi2microkernel/RepositoryTest.java

Modified: jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel/SubscriptionImpl.java?rev=1144268&r1=1144267&r2=1144268&view=diff
==============================================================================
--- jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel/SubscriptionImpl.java
(original)
+++ jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel/SubscriptionImpl.java
Fri Jul  8 11:24:16 2011
@@ -51,16 +51,20 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
 
 public class SubscriptionImpl implements Subscription {
     private final MicroKernel microKernel;
     private final SessionState sessionState;
+    private final WaitForCommit waitForCommit;
     private final Queue<EventSource> eventSources = new LinkedList<EventSource>();
-    private volatile boolean disposed;
 
     public SubscriptionImpl(MicroKernel microKernel, SessionState sessionState) {
         this.microKernel = microKernel;
         this.sessionState = sessionState;
+        waitForCommit = new WaitForCommit();
     }
 
     public void setEventFilters(EventFilter[] eventFilters) throws RepositoryException {
@@ -69,7 +73,7 @@ public class SubscriptionImpl implements
         }
     }
 
-    public EventBundle[] getEvents(long timeout) throws RepositoryException, InterruptedException
{
+    public EventBundle[] getEvents(long timeout) throws RepositoryException {
         List<EventBundle> eventBundles = new ArrayList<EventBundle>();
 
         String baseRevision;
@@ -79,7 +83,7 @@ public class SubscriptionImpl implements
                 : eventSources.element().startRevision;
         }
 
-        String head = waitForCommit(baseRevision, timeout);
+        String head = waitForCommit.wait(baseRevision, timeout);
 
         synchronized (eventSources) {
             if (!eventSources.isEmpty()) {
@@ -100,31 +104,11 @@ public class SubscriptionImpl implements
     }
 
     public void dispose() {
-        synchronized (eventSources) {
-            disposed = true;
-            Thread.currentThread().interrupt();
-            Thread.interrupted();  // clear interrupted flag right away. We might not get
a chance later
-        }
+        waitForCommit.cancel();
     }
 
     //------------------------------------------< private >---
 
-    private String waitForCommit(String baseRevision, long timeout) {
-        long millis = timeout;
-        long start = System.currentTimeMillis();
-
-        do {
-            try {
-                return microKernel.waitForCommit(baseRevision, millis);
-            }
-            catch (InterruptedException e) {
-                millis = millis - System.currentTimeMillis() + start;
-            }
-        } while (!disposed && millis > 0);
-        
-        return baseRevision;
-    }
-
     private static JSONArray jsonArray(Object jsonArray) {
         if (jsonArray instanceof JSONArray) {
             return (JSONArray) jsonArray;
@@ -177,6 +161,67 @@ public class SubscriptionImpl implements
         return Paths.stringToPath(path);
     }
 
+    private class WaitForCommit {
+        private FutureTask<String> waitForCommit;
+
+        public synchronized String wait(final String baseRevision, final long timeout) throws
RepositoryException {
+            if (baseRevision == null) {
+                throw new IllegalArgumentException("Base revision may not be null");
+            }
+            if (waitForCommit != null) {
+                throw new IllegalStateException("Wait already in progress");
+            }
+
+            waitForCommit = new FutureTask<String>(new Callable<String>(){
+                public String call() throws Exception {
+                    long millis = timeout;
+                    long start = System.currentTimeMillis();
+
+                    do {
+                        try {
+                            String newHead =  microKernel.waitForCommit(baseRevision, millis);
+                            return newHead == null ? baseRevision : newHead;
+                        }
+                        catch (InterruptedException e) {
+                            millis = millis - System.currentTimeMillis() + start;
+                        }
+                    } while (millis > 0);
+
+                    return baseRevision;
+                }
+            });
+            
+            waitForCommit.run();
+            try {
+                return waitForCommit.get();
+            }
+            catch (InterruptedException e) {
+                if (waitForCommit.isCancelled()) {
+                    Thread.interrupted(); // clear interrupted flag if cancel was called
+                }
+                return baseRevision;
+            }
+            catch (ExecutionException e) {
+                if (e.getCause() instanceof RepositoryException) {
+                    throw (RepositoryException) e.getCause();
+                }
+                else {
+                    throw new RepositoryException(e.getMessage(), e);
+                }
+            }
+            finally {
+                waitForCommit = null;
+            }
+        }
+
+        public synchronized void cancel() {
+            if (waitForCommit != null) {
+                waitForCommit.cancel(true);
+            }
+        }
+
+    }
+
     private class EventSource {
         private final EventFilter[] eventFilters;
         private final List<String> commitLog;

Modified: jackrabbit/sandbox/spi2microkernel/src/test/java/org/apache/jackrabbit/spi2microkernel/RepositoryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/spi2microkernel/src/test/java/org/apache/jackrabbit/spi2microkernel/RepositoryTest.java?rev=1144268&r1=1144267&r2=1144268&view=diff
==============================================================================
--- jackrabbit/sandbox/spi2microkernel/src/test/java/org/apache/jackrabbit/spi2microkernel/RepositoryTest.java
(original)
+++ jackrabbit/sandbox/spi2microkernel/src/test/java/org/apache/jackrabbit/spi2microkernel/RepositoryTest.java
Fri Jul  8 11:24:16 2011
@@ -62,6 +62,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -1013,7 +1014,7 @@ public class RepositoryTest {
         n.getNode("2").remove();
         getSession().save();
 
-        new Loop() { public void test() {
+        new Loop() { @Override public void test() {
             assertTrue(addNodes.isEmpty());
             assertTrue(removeNodes.isEmpty());
             assertTrue(addProperties.isEmpty());
@@ -1070,7 +1071,8 @@ public class RepositoryTest {
             }
         });
 
-        disposer.run();
+        Thread.sleep(250);
+        Executors.newSingleThreadExecutor().execute(disposer);
         disposer.get(10000, TimeUnit.MILLISECONDS);
         assertTrue(failedEvents.isEmpty());
     }



Mime
View raw message