jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdue...@apache.org
Subject svn commit: r1137949 - in /jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel: SessionState.java SubscriptionImpl.java
Date Tue, 21 Jun 2011 10:44:31 GMT
Author: mduerig
Date: Tue Jun 21 10:44:31 2011
New Revision: 1137949

URL: http://svn.apache.org/viewvc?rev=1137949&view=rev
Log:
spi2microkernel prototype (WIP)
observation (WIP)

Modified:
    jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel/SessionState.java
    jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel/SubscriptionImpl.java

Modified: jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel/SessionState.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel/SessionState.java?rev=1137949&r1=1137948&r2=1137949&view=diff
==============================================================================
--- jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel/SessionState.java
(original)
+++ jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel/SessionState.java
Tue Jun 21 10:44:31 2011
@@ -26,22 +26,27 @@ import java.util.concurrent.Callable;
 
 public class SessionState {
     private final String initialRevision;
-    private final List<String> revisionHistory = new ArrayList<String>();
+    private final List<String> commitLog = new ArrayList<String>();
 
     public SessionState(String initialRevision) {
         this.initialRevision = initialRevision;
     }
 
     public String getHeadRevision() {
-        return revisionHistory.isEmpty()
-            ? initialRevision
-            : revisionHistory.get(revisionHistory.size() - 1);
+        if (commitLog.isEmpty()) {
+            return initialRevision;
+        }
+        else {
+            synchronized (commitLog) {
+                return commitLog.get(commitLog.size() - 1);
+            }
+        }
     }
 
     public void commitWithLock(Callable<String> commit) throws RepositoryException
{
         try {
             synchronized (this) {
-                revisionHistory.add(commit.call());
+                commitLog.add(commit.call());
             }
         }
         catch (Exception e) {
@@ -50,15 +55,15 @@ public class SessionState {
     }
 
     /**
-     * Snapshot of this sessions state at the time of the call
+     * Snapshot of this session state's commit log at the time of the call
      * @return
      */
-    public SessionState freeze() {
-        SessionState frozen = new SessionState(initialRevision);
-        synchronized (revisionHistory) {
-            frozen.revisionHistory.addAll(revisionHistory);
+    public List<String> getCommitLog() {
+        List<String> commitLog = new ArrayList<String>(this.commitLog.size());
+        synchronized (this.commitLog) {
+            commitLog.addAll(this.commitLog);
         }
-        return frozen;
+        return commitLog;
     }
 
 }

Modified: jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel/SubscriptionImpl.java?rev=1137949&r1=1137948&r2=1137949&view=diff
==============================================================================
--- jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel/SubscriptionImpl.java
(original)
+++ jackrabbit/sandbox/spi2microkernel/src/main/java/org/apache/jackrabbit/spi2microkernel/SubscriptionImpl.java
Tue Jun 21 10:44:31 2011
@@ -35,11 +35,12 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 
 public class SubscriptionImpl implements Subscription {
     private final MicroKernel microKernel;
     private final SessionState sessionState;
-    private final List<EventSource> eventSources = new LinkedList<EventSource>();
+    private final Queue<EventSource> eventSources = new LinkedList<EventSource>();
 
     public SubscriptionImpl(MicroKernel microKernel, SessionState sessionState) {
         this.microKernel = microKernel;
@@ -48,11 +49,8 @@ public class SubscriptionImpl implements
 
     public void setEventFilters(EventFilter[] eventFilters) throws RepositoryException {
         synchronized (eventSources) {
-            SessionState frozen = sessionState.freeze();
-            if (!eventSources.isEmpty()) {
-                eventSources.get(eventSources.size() - 1).setLastRevision(frozen.getHeadRevision());
-            }
-            eventSources.add(new EventSource(eventFilters, frozen));
+            List<String> commitLog = sessionState.getCommitLog();
+            eventSources.add(new EventSource(eventFilters, sessionState.getHeadRevision(),
commitLog));
         }
     }
 
@@ -60,16 +58,23 @@ public class SubscriptionImpl implements
         Thread.sleep(timeout);
 
         List<EventBundle> eventBundles = new ArrayList<EventBundle>();
+        
         synchronized (eventSources) {
-            for (Iterator<EventSource> it = eventSources.iterator(); it.hasNext();
) {
-                EventSource eventSource = it.next();
-                eventBundles.addAll(eventSource.getEventsBundles());
-                if (eventSource.isEmpty()) {
-                    it.remove();
+            if (!eventSources.isEmpty()) {
+                EventSource current = eventSources.remove();
+
+                while (!eventSources.isEmpty()) {
+                    EventSource next = eventSources.remove();
+                    eventBundles.addAll(current.getEventsBundles(next.getStartRevision()));
+                    current = next;
                 }
+                
+                String head = microKernel.getHeadRevision();
+                eventBundles.addAll(current.getEventsBundles(head));
+                eventSources.add(current.update(head));
             }
         }
-
+        
         return eventBundles.toArray(new EventBundle[eventBundles.size()]);
     }
 
@@ -95,31 +100,20 @@ public class SubscriptionImpl implements
 
     private class EventSource {
         private final EventFilter[] eventFilters;
-        private final SessionState sessionState;
-
-        private String startRevision;
-        private String lastRevision;
-        private boolean isEmpty;
+        private final String startRevision;
+        private final List<String> commitLog;
 
-        public EventSource(EventFilter[] eventFilters, SessionState sessionState) {
+        public EventSource(EventFilter[] eventFilters, String startRevision, List<String>
commitLog) {
             this.eventFilters = eventFilters;
-            this.sessionState = sessionState;
-            startRevision = sessionState.getHeadRevision();
+            this.startRevision = startRevision;
+            this.commitLog = commitLog;
         }
 
-        /**
-         * Set the last revision from which events should be included
-         * @param lastRevision
-         */
-        public void setLastRevision(String lastRevision) {
-            this.lastRevision = lastRevision;
-        }
-
-        public Collection<? extends EventBundle> getEventsBundles() throws RepositoryException
{
-            String endRevision = lastRevision == null
-                ? microKernel.getHeadRevision()
-                : lastRevision;
+        public String getStartRevision() {
+            return startRevision;
+        }
 
+        public Collection<? extends EventBundle> getEventsBundles(String endRevision)
throws RepositoryException {
             String journal = microKernel.getJournal(startRevision, endRevision);
 
             try {
@@ -132,8 +126,6 @@ public class SubscriptionImpl implements
 //                    eventBundles[k] = new EventBundleImpl(jsonObject, eventFilters, sessionState);
                 }
 
-                startRevision = endRevision;
-                isEmpty = lastRevision != null;
 
                 return Collections.emptyList(); // todo implement getEventsBundles
             }
@@ -143,8 +135,8 @@ public class SubscriptionImpl implements
 
         }
 
-        public boolean isEmpty() {
-            return isEmpty;    
+        public EventSource update(String startRevision) {
+            return new EventSource(eventFilters, startRevision, commitLog);
         }
     }
 }



Mime
View raw message