jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdue...@apache.org
Subject svn commit: r1540981 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
Date Tue, 12 Nov 2013 09:31:08 GMT
Author: mduerig
Date: Tue Nov 12 09:31:08 2013
New Revision: 1540981

URL: http://svn.apache.org/r1540981
Log:
OAK-1143: [scala] Repository init throws "illegal cyclic reference involving class ChangeDispatcher"
Implement ChangeDispatcher using CompositeObserver and BackgroundObserver

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java?rev=1540981&r1=1540980&r2=1540981&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
Tue Nov 12 09:31:08 2013
@@ -18,23 +18,14 @@
  */
 package org.apache.jackrabbit.oak.spi.commit;
 
-import static com.google.common.base.Objects.toStringHelper;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.collect.Queues.newLinkedBlockingQueue;
 
 import java.io.Closeable;
-import java.io.IOException;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
-import com.google.common.collect.Sets;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 
 /**
@@ -61,10 +52,10 @@ import org.apache.jackrabbit.oak.spi.sta
  * notifications about all changes reported to this instance.
  */
 public class ChangeDispatcher implements Observable {
-    private final Set<Listener> listeners = Sets.newHashSet();
+    private final CompositeObserver observers = new CompositeObserver();
 
     @Nonnull
-    private volatile NodeState root;
+    private NodeState root;
 
     /**
      * Create a new instance for recording changes to a {@code NodeStore}
@@ -85,10 +76,17 @@ public class ChangeDispatcher implements
     @Override
     @Nonnull
     public Closeable addObserver(Observer observer) {
-        Listener listener = new Listener(observer, root);
-        listener.start();
-        register(listener);
-        return listener;
+        // FIXME don't hard code queue size
+        final BackgroundObserver backgroundObserver = new BackgroundObserver(observer, 8192);
+        backgroundObserver.contentChanged(root, null);
+        observers.addObserver(backgroundObserver);
+        return new Closeable() {
+            @Override
+            public void close() {
+                backgroundObserver.stop();
+                observers.removeObserver(backgroundObserver);
+            }
+        };
     }
 
     private final AtomicLong changeCount = new AtomicLong(0);
@@ -110,8 +108,10 @@ public class ChangeDispatcher implements
      */
     public synchronized void beforeCommit(@Nonnull NodeState root) {
         checkState(!inLocalCommit());
+        checkNotNull(root);
         changeCount.incrementAndGet();
-        externalChange(checkNotNull(root));
+        observers.contentChanged(root, null);
+        this.root = root;
     }
 
     /**
@@ -134,7 +134,7 @@ public class ChangeDispatcher implements
             @Nonnull NodeState root, @Nonnull CommitInfo info) {
         checkState(inLocalCommit());
         checkNotNull(root);
-        add(root, info);
+        observers.contentChanged(root, info);
         this.root = root;
     }
 
@@ -151,127 +151,10 @@ public class ChangeDispatcher implements
      */
     public synchronized void afterCommit(@Nonnull NodeState root) {
         checkState(inLocalCommit());
-        externalChange(checkNotNull(root));
+        checkNotNull(root);
+        observers.contentChanged(root, null);
+        this.root = root;
         changeCount.incrementAndGet();
     }
 
-    private synchronized void externalChange(NodeState root) {
-        if (!root.equals(this.root)) {
-            add(root, null);
-            this.root = root;
-        }
-    }
-
-    private void register(Listener listener) {
-        synchronized (listeners) {
-            listeners.add(listener);
-        }
-    }
-
-    private void unregister(Listener listener) {
-        synchronized (listeners) {
-            listeners.remove(listener);
-        }
-    }
-
-    private void add(NodeState root, CommitInfo info) {
-        for (Listener l : getListeners()) {
-            l.contentChanged(root, info);
-        }
-    }
-
-    private Listener[] getListeners() {
-        synchronized (listeners) {
-            return listeners.toArray(new Listener[listeners.size()]);
-        }
-    }
-
-    //------------------------------------------------------------< Listener >---
-
-    /**
-     * Listener thread receiving changes reported into {@code ChangeDispatcher} and
-     * asynchronously distributing these to an associated {@link Observer}.
-     */
-    private class Listener extends Thread implements Closeable, Observer {
-        private final LinkedBlockingQueue<Commit> commits = newLinkedBlockingQueue();
-        private final Observer observer;
-
-        private boolean blocked = false;
-        private volatile boolean stopping;
-
-        Listener(Observer observer, NodeState root) {
-            this.observer = checkNotNull(observer);
-            commits.add(new Commit(root, null));
-            setDaemon(true);
-            setPriority(Thread.MIN_PRIORITY);
-        }
-
-        @Override
-        public void contentChanged(NodeState root, CommitInfo info) {
-            Commit commit = new Commit(root, blocked ? null : info);
-            blocked = !commits.offer(commit);
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (!stopping) {
-                    Commit commit = commits.poll(100, TimeUnit.MILLISECONDS);
-                    if (commit != null) {
-                        observer.contentChanged(commit.getRoot(), commit.getCommitInfo());
-                    }
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            }
-        }
-
-        @Override
-        public void close() throws IOException {
-            checkState(!stopping, "Change processor already stopped");
-
-            unregister(this);
-            stopping = true;
-            if (Thread.currentThread() != this) {
-                try {
-                    join();
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    throw new IOException
-                            ("Interruption while waiting for the listener thread to terminate",
e);
-                }
-            }
-        }
-    }
-
-    //------------------------------------------------------------< Commit >---
-
-    private static class Commit {
-        private final NodeState root;
-        private final CommitInfo commitInfo;
-
-        Commit(@Nonnull NodeState root, @Nullable CommitInfo commitInfo) {
-            this.root = checkNotNull(root);
-            this.commitInfo = commitInfo;
-        }
-
-        @Nonnull
-        NodeState getRoot() {
-            return root;
-        }
-
-        @CheckForNull
-        CommitInfo getCommitInfo() {
-            return commitInfo;
-        }
-
-        @Override
-        public String toString() {
-            return toStringHelper(this)
-                .add("root", root)
-                .add("commit info", commitInfo)
-                .toString();
-        }
-    }
-
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java?rev=1540981&r1=1540980&r2=1540981&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
Tue Nov 12 09:31:08 2013
@@ -57,7 +57,7 @@ public class CommitQueueTest {
                 MongoNodeState after = (MongoNodeState) root;
                 Revision r = after.getRevision();
 //                System.out.println("seen: " + r);
-                if (r.compareRevisionTime(before) < 1) {
+                if (r.compareRevisionTime(before) < 0) {
                     exceptions.add(new Exception(
                             "Inconsistent revision sequence. Before: " +
                                     before + ", after: " + r));



Mime
View raw message