Return-Path: X-Original-To: apmail-jackrabbit-commits-archive@www.apache.org Delivered-To: apmail-jackrabbit-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BD4C87BC0 for ; Tue, 29 Nov 2011 16:41:44 +0000 (UTC) Received: (qmail 57128 invoked by uid 500); 29 Nov 2011 16:41:44 -0000 Delivered-To: apmail-jackrabbit-commits-archive@jackrabbit.apache.org Received: (qmail 57032 invoked by uid 500); 29 Nov 2011 16:41:44 -0000 Mailing-List: contact commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@jackrabbit.apache.org Delivered-To: mailing list commits@jackrabbit.apache.org Received: (qmail 57025 invoked by uid 99); 29 Nov 2011 16:41:44 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Nov 2011 16:41:44 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Nov 2011 16:41:42 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id DCDFF2388A3F; Tue, 29 Nov 2011 16:41:22 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1207957 - in /jackrabbit/trunk/jackrabbit-core/src: main/java/org/apache/jackrabbit/core/cluster/ main/java/org/apache/jackrabbit/core/journal/ test/java/org/apache/jackrabbit/core/cluster/ Date: Tue, 29 Nov 2011 16:41:22 -0000 To: commits@jackrabbit.apache.org From: dpfister@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111129164122.DCDFF2388A3F@eris.apache.org> Author: dpfister Date: Tue Nov 29 16:41:20 2011 New Revision: 1207957 URL: http://svn.apache.org/viewvc?rev=1207957&view=rev Log: JCR-3138 - Skip sync delay when changes are found - made behaviour subclass overridable - added test case Added: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/ClusterSyncTest.java (with props) Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/TestAll.java Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java?rev=1207957&r1=1207956&r2=1207957&view=diff ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java Tue Nov 29 16:41:20 2011 @@ -187,6 +187,11 @@ public class ClusterNode implements Runn * Record deserializer. */ private ClusterRecordDeserializer deserializer = new ClusterRecordDeserializer(); + + /** + * Flag indicating whether sync is manual. + */ + private boolean disableAutoSync; /** * Initialize this cluster node. @@ -243,6 +248,13 @@ public class ClusterNode implements Runn public long getStopDelay() { return stopDelay; } + + /** + * Disable periodic background synchronization. Used for testing purposes, only. + */ + protected void disableAutoSync() { + disableAutoSync = true; + } /** * Starts this cluster node. @@ -253,11 +265,12 @@ public class ClusterNode implements Runn if (status == NONE) { sync(); - Thread t = new Thread(this, "ClusterNode-" + clusterNodeId); - t.setDaemon(true); - t.start(); - syncThread = t; - + if (!disableAutoSync) { + Thread t = new Thread(this, "ClusterNode-" + clusterNodeId); + t.setDaemon(true); + t.start(); + syncThread = t; + } status = STARTED; } } Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java?rev=1207957&r1=1207956&r2=1207957&view=diff ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java Tue Nov 29 16:41:20 2011 @@ -217,39 +217,59 @@ public abstract class AbstractJournal im * @throws JournalException if an error occurs */ protected void doSync(long startRevision) throws JournalException { - RecordIterator iterator = getRecords(startRevision); - long stopRevision = Long.MIN_VALUE; - - try { - while (iterator.hasNext()) { - Record record = iterator.nextRecord(); - if (record.getJournalId().equals(id)) { - log.info("Record with revision '" + record.getRevision() - + "' created by this journal, skipped."); - } else { - RecordConsumer consumer = getConsumer(record.getProducerId()); - if (consumer != null) { - try { - consumer.consume(record); - } catch (IllegalStateException e) { - log.error("Could not synchronize to revision: " + record.getRevision() + " due illegal state of RecordConsumer."); - return; + for (;;) { + RecordIterator iterator = getRecords(startRevision); + long stopRevision = Long.MIN_VALUE; + + try { + while (iterator.hasNext()) { + Record record = iterator.nextRecord(); + if (record.getJournalId().equals(id)) { + log.info("Record with revision '" + record.getRevision() + + "' created by this journal, skipped."); + } else { + RecordConsumer consumer = getConsumer(record.getProducerId()); + if (consumer != null) { + try { + consumer.consume(record); + } catch (IllegalStateException e) { + log.error("Could not synchronize to revision: " + record.getRevision() + " due illegal state of RecordConsumer."); + return; + } } } + stopRevision = record.getRevision(); } - stopRevision = record.getRevision(); + } finally { + iterator.close(); } - } finally { - iterator.close(); - } + + if (stopRevision > 0) { + for (RecordConsumer consumer : consumers.values()) { + consumer.setRevision(stopRevision); + } + log.info("Synchronized to revision: " + stopRevision); - if (stopRevision > 0) { - for (RecordConsumer consumer : consumers.values()) { - consumer.setRevision(stopRevision); + if (syncAgainOnNewRecords()) { + // changes detected, sync again + startRevision = stopRevision; + continue; + } } - log.info("Synchronized to revision: " + stopRevision); + break; } } + + /** + * Return a flag indicating whether synchronization should continue + * in a loop until no more new records are found. Subclass overridable. + * + * @return true if synchronization should continue; + * false otherwise + */ + protected boolean syncAgainOnNewRecords() { + return false; + } /** * Lock the journal revision, disallowing changes from other sources until Added: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/ClusterSyncTest.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/ClusterSyncTest.java?rev=1207957&view=auto ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/ClusterSyncTest.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/ClusterSyncTest.java Tue Nov 29 16:41:20 2011 @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.cluster; + +import java.util.ArrayList; + +import javax.jcr.RepositoryException; + +import org.apache.jackrabbit.core.cluster.SimpleEventListener.LockEvent; +import org.apache.jackrabbit.core.config.ClusterConfig; +import org.apache.jackrabbit.core.id.NodeId; +import org.apache.jackrabbit.core.journal.Journal; +import org.apache.jackrabbit.core.journal.JournalFactory; +import org.apache.jackrabbit.core.journal.MemoryJournal; +import org.apache.jackrabbit.core.journal.Record; +import org.apache.jackrabbit.core.journal.RecordConsumer; +import org.apache.jackrabbit.core.journal.MemoryJournal.MemoryRecord; +import org.apache.jackrabbit.spi.commons.namespace.NamespaceResolver; +import org.apache.jackrabbit.test.JUnitTest; + +import EDU.oswego.cs.dl.util.concurrent.Latch; + +/** + * Test cases for cluster synchronization. + */ +public class ClusterSyncTest extends JUnitTest { + + /** Defaut workspace name. */ + private static final String DEFAULT_WORKSPACE = "default"; + + /** Default sync delay: 5 seconds. */ + private static final long SYNC_DELAY = 5000; + + /** Master node. */ + private ClusterNode master; + + /** Slave node. */ + /* avoid synthetic accessor */ ClusterNode slave; + + /** Records shared among multiple memory journals. */ + private final ArrayList records = new ArrayList(); + + /** + * {@inheritDoc} + */ + @Override + protected void setUp() throws Exception { + master = createClusterNode("master", false); + master.start(); + + slave = createClusterNode("slave", true); + slave.start(); + + super.setUp(); + } + + /** + * {@inheritDoc} + */ + @Override + protected void tearDown() throws Exception { + if (slave != null) { + slave.stop(); + } + if (master != null) { + master.stop(); + } + super.tearDown(); + } + + /** + * Verify that sync() on a cluster node will continue fetching results until no more + * changes are detected. + * + * @throws Exception + */ + public void testSyncAllChanges() throws Exception { + // create channel on master and slave + LockEventChannel channel = master.createLockChannel(DEFAULT_WORKSPACE); + slave.createLockChannel(DEFAULT_WORKSPACE).setListener(new SimpleEventListener()); + + // add blocking consumer to slave, this will block on the first non-empty sync() + BlockingConsumer consumer = new BlockingConsumer(); + slave.getJournal().register(consumer); + + // add first entry + LockEvent event = new LockEvent(NodeId.randomId(), true, "admin"); + channel.create(event.getNodeId(), event.isDeep(), event.getUserId()).ended(true); + + // start a manual sync on the slave and ... + Thread syncOnce = new Thread(new Runnable() { + public void run() { + try { + slave.sync(); + } catch (ClusterException e) { + /* ignore */ + } + } + }); + syncOnce.start(); + + // ... wait until it blocks + consumer.waitUntilBlocked(); + + // add second entry + event = new LockEvent(NodeId.randomId(), true, "admin"); + channel.create(event.getNodeId(), event.isDeep(), event.getUserId()).ended(true); + + // now unblock slave + consumer.unblock(); + + // wait for the sync to finish + syncOnce.join(); + + assertEquals(master.getRevision(), slave.getRevision()); + } + + /** + * Create a cluster node, with a memory journal referencing a list of records. + * + * @param id cluster node id + * @param records memory journal's list of records + * @param disableAutoSync if true background synchronization is disabled + */ + private ClusterNode createClusterNode(String id, boolean disableAutoSync) throws Exception { + final MemoryJournal journal = new MemoryJournal() { + protected boolean syncAgainOnNewRecords() { + return true; + } + }; + JournalFactory jf = new JournalFactory() { + public Journal getJournal(NamespaceResolver resolver) + throws RepositoryException { + return journal; + } + }; + ClusterConfig cc = new ClusterConfig(id, SYNC_DELAY, jf); + SimpleClusterContext context = new SimpleClusterContext(cc); + + journal.setRepositoryHome(context.getRepositoryHome()); + journal.init(id, context.getNamespaceResolver()); + journal.setRecords(records); + + ClusterNode clusterNode = new ClusterNode(); + clusterNode.init(context); + if (disableAutoSync) { + clusterNode.disableAutoSync(); + } + return clusterNode; + } + + /** + * Custom consumer that will block inside the journal's sync() method + * until it is unblocked. + */ + static class BlockingConsumer implements RecordConsumer { + + private final Latch blockLatch = new Latch(); + private final Latch unblockLatch = new Latch(); + private long revision; + + public String getId() { + return "CUSTOM"; + } + + public long getRevision() { + return revision; + } + + public void consume(Record record) { + /* nothing to be done here */ + } + + public void setRevision(long revision) { + blockLatch.release(); + + try { + unblockLatch.acquire(); + } catch (InterruptedException e) { + /* ignore */ + } + this.revision = revision; + } + + public void waitUntilBlocked() throws InterruptedException { + blockLatch.acquire(); + } + + public void unblock() { + unblockLatch.release(); + } + } +} Propchange: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/ClusterSyncTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/ClusterSyncTest.java ------------------------------------------------------------------------------ svn:keywords = Author Date Id Revision Rev Url Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/TestAll.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/TestAll.java?rev=1207957&r1=1207956&r2=1207957&view=diff ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/TestAll.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/TestAll.java Tue Nov 29 16:41:20 2011 @@ -38,6 +38,7 @@ public class TestAll extends TestCase { TestSuite suite = new TestSuite(); suite.addTestSuite(ClusterRecordTest.class); + suite.addTestSuite(ClusterSyncTest.class); suite.addTestSuite(DbClusterTest.class); return suite;