Return-Path: X-Original-To: apmail-jackrabbit-oak-commits-archive@minotaur.apache.org Delivered-To: apmail-jackrabbit-oak-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3A7AD10D9A for ; Tue, 12 Nov 2013 09:31:42 +0000 (UTC) Received: (qmail 76752 invoked by uid 500); 12 Nov 2013 09:31:38 -0000 Delivered-To: apmail-jackrabbit-oak-commits-archive@jackrabbit.apache.org Received: (qmail 76681 invoked by uid 500); 12 Nov 2013 09:31:33 -0000 Mailing-List: contact oak-commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: oak-dev@jackrabbit.apache.org Delivered-To: mailing list oak-commits@jackrabbit.apache.org Received: (qmail 75775 invoked by uid 99); 12 Nov 2013 09:31:30 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Nov 2013 09:31:30 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Nov 2013 09:31:28 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C313223888A6; Tue, 12 Nov 2013 09:31:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1540981 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java Date: Tue, 12 Nov 2013 09:31:08 -0000 To: oak-commits@jackrabbit.apache.org From: mduerig@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131112093108.C313223888A6@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mduerig Date: Tue Nov 12 09:31:08 2013 New Revision: 1540981 URL: http://svn.apache.org/r1540981 Log: OAK-1143: [scala] Repository init throws "illegal cyclic reference involving class ChangeDispatcher" Implement ChangeDispatcher using CompositeObserver and BackgroundObserver Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java?rev=1540981&r1=1540980&r2=1540981&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java Tue Nov 12 09:31:08 2013 @@ -18,23 +18,14 @@ */ package org.apache.jackrabbit.oak.spi.commit; -import static com.google.common.base.Objects.toStringHelper; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.collect.Queues.newLinkedBlockingQueue; import java.io.Closeable; -import java.io.IOException; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.CheckForNull; import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import com.google.common.collect.Sets; import org.apache.jackrabbit.oak.spi.state.NodeState; /** @@ -61,10 +52,10 @@ import org.apache.jackrabbit.oak.spi.sta * notifications about all changes reported to this instance. */ public class ChangeDispatcher implements Observable { - private final Set listeners = Sets.newHashSet(); + private final CompositeObserver observers = new CompositeObserver(); @Nonnull - private volatile NodeState root; + private NodeState root; /** * Create a new instance for recording changes to a {@code NodeStore} @@ -85,10 +76,17 @@ public class ChangeDispatcher implements @Override @Nonnull public Closeable addObserver(Observer observer) { - Listener listener = new Listener(observer, root); - listener.start(); - register(listener); - return listener; + // FIXME don't hard code queue size + final BackgroundObserver backgroundObserver = new BackgroundObserver(observer, 8192); + backgroundObserver.contentChanged(root, null); + observers.addObserver(backgroundObserver); + return new Closeable() { + @Override + public void close() { + backgroundObserver.stop(); + observers.removeObserver(backgroundObserver); + } + }; } private final AtomicLong changeCount = new AtomicLong(0); @@ -110,8 +108,10 @@ public class ChangeDispatcher implements */ public synchronized void beforeCommit(@Nonnull NodeState root) { checkState(!inLocalCommit()); + checkNotNull(root); changeCount.incrementAndGet(); - externalChange(checkNotNull(root)); + observers.contentChanged(root, null); + this.root = root; } /** @@ -134,7 +134,7 @@ public class ChangeDispatcher implements @Nonnull NodeState root, @Nonnull CommitInfo info) { checkState(inLocalCommit()); checkNotNull(root); - add(root, info); + observers.contentChanged(root, info); this.root = root; } @@ -151,127 +151,10 @@ public class ChangeDispatcher implements */ public synchronized void afterCommit(@Nonnull NodeState root) { checkState(inLocalCommit()); - externalChange(checkNotNull(root)); + checkNotNull(root); + observers.contentChanged(root, null); + this.root = root; changeCount.incrementAndGet(); } - private synchronized void externalChange(NodeState root) { - if (!root.equals(this.root)) { - add(root, null); - this.root = root; - } - } - - private void register(Listener listener) { - synchronized (listeners) { - listeners.add(listener); - } - } - - private void unregister(Listener listener) { - synchronized (listeners) { - listeners.remove(listener); - } - } - - private void add(NodeState root, CommitInfo info) { - for (Listener l : getListeners()) { - l.contentChanged(root, info); - } - } - - private Listener[] getListeners() { - synchronized (listeners) { - return listeners.toArray(new Listener[listeners.size()]); - } - } - - //------------------------------------------------------------< Listener >--- - - /** - * Listener thread receiving changes reported into {@code ChangeDispatcher} and - * asynchronously distributing these to an associated {@link Observer}. - */ - private class Listener extends Thread implements Closeable, Observer { - private final LinkedBlockingQueue commits = newLinkedBlockingQueue(); - private final Observer observer; - - private boolean blocked = false; - private volatile boolean stopping; - - Listener(Observer observer, NodeState root) { - this.observer = checkNotNull(observer); - commits.add(new Commit(root, null)); - setDaemon(true); - setPriority(Thread.MIN_PRIORITY); - } - - @Override - public void contentChanged(NodeState root, CommitInfo info) { - Commit commit = new Commit(root, blocked ? null : info); - blocked = !commits.offer(commit); - } - - @Override - public void run() { - try { - while (!stopping) { - Commit commit = commits.poll(100, TimeUnit.MILLISECONDS); - if (commit != null) { - observer.contentChanged(commit.getRoot(), commit.getCommitInfo()); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - @Override - public void close() throws IOException { - checkState(!stopping, "Change processor already stopped"); - - unregister(this); - stopping = true; - if (Thread.currentThread() != this) { - try { - join(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException - ("Interruption while waiting for the listener thread to terminate", e); - } - } - } - } - - //------------------------------------------------------------< Commit >--- - - private static class Commit { - private final NodeState root; - private final CommitInfo commitInfo; - - Commit(@Nonnull NodeState root, @Nullable CommitInfo commitInfo) { - this.root = checkNotNull(root); - this.commitInfo = commitInfo; - } - - @Nonnull - NodeState getRoot() { - return root; - } - - @CheckForNull - CommitInfo getCommitInfo() { - return commitInfo; - } - - @Override - public String toString() { - return toStringHelper(this) - .add("root", root) - .add("commit info", commitInfo) - .toString(); - } - } - } Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java?rev=1540981&r1=1540980&r2=1540981&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java (original) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java Tue Nov 12 09:31:08 2013 @@ -57,7 +57,7 @@ public class CommitQueueTest { MongoNodeState after = (MongoNodeState) root; Revision r = after.getRevision(); // System.out.println("seen: " + r); - if (r.compareRevisionTime(before) < 1) { + if (r.compareRevisionTime(before) < 0) { exceptions.add(new Exception( "Inconsistent revision sequence. Before: " + before + ", after: " + r));