Return-Path: Delivered-To: apmail-jackrabbit-commits-archive@www.apache.org Received: (qmail 31835 invoked from network); 20 Feb 2007 16:07:17 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 20 Feb 2007 16:07:17 -0000 Received: (qmail 6299 invoked by uid 500); 20 Feb 2007 16:07:25 -0000 Delivered-To: apmail-jackrabbit-commits-archive@jackrabbit.apache.org Received: (qmail 6197 invoked by uid 500); 20 Feb 2007 16:07:24 -0000 Mailing-List: contact commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@jackrabbit.apache.org Delivered-To: mailing list commits@jackrabbit.apache.org Received: (qmail 6188 invoked by uid 99); 20 Feb 2007 16:07:24 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Feb 2007 08:07:24 -0800 X-ASF-Spam-Status: No, hits=-98.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Feb 2007 08:07:12 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id A657D1A981D; Tue, 20 Feb 2007 08:06:51 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r509624 [1/3] - in /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core: ./ cluster/ config/ journal/ state/ Date: Tue, 20 Feb 2007 16:06:45 -0000 To: commits@jackrabbit.apache.org From: dpfister@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070220160651.A657D1A981D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dpfister Date: Tue Feb 20 08:06:42 2007 New Revision: 509624 URL: http://svn.apache.org/viewvc?view=rev&rev=509624 Log: JCR-749 Add mysql ddl for clustering (DatabaseJournal) JCR-756 Concurrent add/remove child node operations in a cluster may corrupt repository JCR-757 Allow multiple producers to feed/consume journal Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractRecord.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseRecordIterator.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DefaultRecordProducer.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordIterator.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordLog.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRevision.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/JournalException.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/LockableFileRevision.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/ReadRecord.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Record.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordConsumer.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordIterator.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordProducer.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/default.ddl jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/derby.ddl jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/h2.ddl jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/mysql.ddl jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/oracle.ddl Removed: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/AbstractJournal.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/DatabaseJournal.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRevision.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/Journal.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/JournalException.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/Record.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/RecordInput.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/RecordOutput.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/RecordProcessor.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/default.ddl jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/derby.ddl jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/h2.ddl jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/oracle.ddl Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/ClusterConfig.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java?view=diff&rev=509624&r1=509623&r2=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java Tue Feb 20 08:06:42 2007 @@ -282,7 +282,7 @@ vMgr = createVersionManager(repConfig.getVersioningConfig(), delegatingDispatcher); if (clusterNode != null) { - vMgr.setEventChannel(clusterNode); + vMgr.setEventChannel(clusterNode.createUpdateChannel(null)); } // init virtual node type manager @@ -657,7 +657,7 @@ * * @return clustered node */ - private ClusterNode createClusterNode() throws RepositoryException { + protected ClusterNode createClusterNode() throws RepositoryException { try { ClusterNode clusterNode = new ClusterNode(); clusterNode.init(new ExternalEventListener()); Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java?view=diff&rev=509624&r1=509623&r2=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java Tue Feb 20 08:06:42 2007 @@ -20,25 +20,43 @@ import org.slf4j.LoggerFactory; import org.apache.jackrabbit.core.config.ClusterConfig; import org.apache.jackrabbit.core.config.ConfigurationException; +import org.apache.jackrabbit.core.config.JournalConfig; import org.apache.jackrabbit.core.NodeId; +import org.apache.jackrabbit.core.journal.Journal; +import org.apache.jackrabbit.core.journal.RecordConsumer; +import org.apache.jackrabbit.core.journal.Record; +import org.apache.jackrabbit.core.journal.JournalException; +import org.apache.jackrabbit.core.journal.FileRevision; import org.apache.jackrabbit.core.nodetype.InvalidNodeTypeDefException; +import org.apache.jackrabbit.core.nodetype.NodeTypeDef; import org.apache.jackrabbit.core.observation.EventState; import org.apache.jackrabbit.core.observation.EventStateCollection; import org.apache.jackrabbit.core.state.ChangeLog; +import org.apache.jackrabbit.core.state.ItemState; +import org.apache.jackrabbit.core.state.NodeState; +import org.apache.jackrabbit.core.state.PropertyState; +import org.apache.jackrabbit.name.QName; +import org.apache.jackrabbit.name.Path; import EDU.oswego.cs.dl.util.concurrent.Mutex; import javax.jcr.RepositoryException; +import javax.jcr.Session; +import javax.jcr.observation.Event; import java.util.List; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Collection; +import java.util.Iterator; +import java.util.Set; +import java.util.HashSet; +import java.io.File; /** * Default clustered node implementation. */ -public class ClusterNode implements Runnable, UpdateEventChannel, - NamespaceEventChannel, NodeTypeEventChannel { +public class ClusterNode implements Runnable, + NamespaceEventChannel, NodeTypeEventChannel, RecordConsumer { /** * System property specifying a node id to use. @@ -46,11 +64,21 @@ public static final String SYSTEM_PROPERTY_NODE_ID = "org.apache.jackrabbit.core.cluster.node_id"; /** + * Revision counter parameter name. + */ + private static final String REVISION_NAME = "revision"; + + /** * Used for padding short string representations. */ private static final String SHORT_PADDING = "0000"; /** + * Producer identifier. + */ + private static final String PRODUCER_ID = "JR"; + + /** * Status constant. */ private static final int NONE = 0; @@ -81,9 +109,9 @@ private String clusterNodeId; /** - * Synchronization delay, in seconds. + * Synchronization delay, in milliseconds. */ - private int syncDelay; + private long syncDelay; /** * Journal used. @@ -126,6 +154,31 @@ private NodeTypeEventListener nodeTypeListener; /** + * Instance revision file. + */ + private FileRevision instanceRevision; + + /** + * Workspace name used when consuming records. + */ + private String workspace; + + /** + * Change log used when consuming records. + */ + private ChangeLog changeLog; + + /** + * List of recorded events; used when consuming records. + */ + private List events; + + /** + * Last used session for event sources. + */ + private Session lastSession; + + /** * Initialize this cluster node. * * @throws ClusterException if an error occurs @@ -146,11 +199,23 @@ clusterNodeId = getClusterNodeId(cc.getId()); syncDelay = cc.getSyncDelay(); + JournalConfig jc = cc.getJournalConfig(); + + String revisionName = jc.getParameters().getProperty(REVISION_NAME); + if (revisionName == null) { + String msg = "Revision not specified."; + throw new ClusterException(msg); + } try { - journal = (Journal) cc.getJournalConfig().newInstance(); - journal.init(clusterNodeId, new SyncListener(), clusterContext.getNamespaceResovler()); + instanceRevision = new FileRevision(new File(revisionName)); + + journal = (Journal) jc.newInstance(); + journal.init(clusterNodeId, clusterContext.getNamespaceResovler()); + journal.register(this); } catch (ConfigurationException e) { throw new ClusterException(e.getMessage(), e.getCause()); + } catch (JournalException e) { + throw new ClusterException(e.getMessage(), e.getCause()); } } @@ -178,7 +243,7 @@ for (;;) { synchronized (this) { try { - wait(syncDelay * 1000); + wait(syncDelay); } catch (InterruptedException e) {} if (status == STOPPED) { @@ -213,8 +278,11 @@ String msg = "Interrupted while waiting for mutex."; throw new ClusterException(msg); } + try { journal.sync(); + } catch (JournalException e) { + throw new ClusterException(e.getMessage(), e.getCause()); } finally { syncLock.release(); } @@ -233,72 +301,6 @@ } /** - * Called when a node has been locked. - * - * @param workspace workspace name - * @param nodeId node id - * @param deep flag indicating whether lock is deep - * @param owner lock owner - */ - private void locked(String workspace, NodeId nodeId, boolean deep, String owner) { - if (status != STARTED) { - log.info("not started: lock operation ignored."); - return; - } - boolean succeeded = false; - - try { - journal.begin(workspace); - journal.log(nodeId, deep, owner); - journal.prepare(); - journal.commit(); - succeeded = true; - } catch (JournalException e) { - String msg = "Unable to create log entry: " + e.getMessage(); - log.error(msg); - } catch (Throwable e) { - String msg = "Unexpected error while creating log entry."; - log.error(msg, e); - } finally { - if (!succeeded) { - journal.cancel(); - } - } - } - - /** - * Called when a node has been unlocked. - * - * @param workspace workspace name - * @param nodeId node id - */ - private void unlocked(String workspace, NodeId nodeId) { - if (status != STARTED) { - log.info("not started: unlock operation ignored."); - return; - } - boolean succeeded = false; - - try { - journal.begin(workspace); - journal.log(nodeId); - journal.prepare(); - journal.commit(); - succeeded = true; - } catch (JournalException e) { - String msg = "Unable to create log entry: " + e.getMessage(); - log.error(msg); - } catch (Throwable e) { - String msg = "Unexpected error while creating log entry."; - log.error(msg, e); - } finally { - if (!succeeded) { - journal.cancel(); - } - } - } - - /** * Create an {@link UpdateEventChannel} for some workspace. * * @param workspace workspace name @@ -349,108 +351,6 @@ return s; } - //--------------------------------------------------< UpdateEventListener > - - /** - * {@inheritDoc} - */ - public void updateCreated() { - if (status != STARTED) { - log.info("not started: update create ignored."); - return; - } - try { - sync(); - } catch (ClusterException e) { - String msg = "Unable to sync with journal: " + e.getMessage(); - log.error(msg); - } catch (Throwable e) { - String msg = "Unexpected error while creating log entry."; - log.error(msg, e); - } - } - - /** - * {@inheritDoc} - *

- * Invoked when an update has been prepared inside versioning. Delegate - * to common method with null workspace. - */ - public void updatePrepared(ChangeLog changes, EventStateCollection esc) { - updatePrepared(null, changes, esc); - } - - /** - * Called when an a update operation has been prepared. - * - * @param workspace workspace to use when writing journal entry - * @param changes changes - * @param esc events as they will be delivered on success - */ - private void updatePrepared(String workspace, ChangeLog changes, EventStateCollection esc) { - if (status != STARTED) { - log.info("not started: update prepare ignored."); - return; - } - boolean succeeded = false; - - try { - journal.begin(workspace); - journal.log(changes, esc); - journal.prepare(); - succeeded = true; - } catch (JournalException e) { - String msg = "Unable to create log entry: " + e.getMessage(); - log.error(msg); - } catch (Throwable e) { - String msg = "Unexpected error while preparing log entry."; - log.error(msg, e); - } finally { - if (!succeeded) { - journal.cancel(); - } - } - } - - /** - * {@inheritDoc} - */ - public void updateCommitted() { - if (status != STARTED) { - log.info("not started: update commit ignored."); - return; - } - try { - journal.commit(); - } catch (JournalException e) { - String msg = "Unable to create log entry: " + e.getMessage(); - log.error(msg); - } catch (Throwable e) { - String msg = "Unexpected error while committing log entry."; - log.error(msg, e); - } - } - - /** - * {@inheritDoc} - */ - public void updateCancelled() { - if (status != STARTED) { - log.info("not started: update cancel ignored."); - return; - } - journal.cancel(); - } - - /** - * {@inheritDoc} - *

- * Invoked to set the update event listener responsible for delivering versioning events. - */ - public void setListener(UpdateEventListener listener) { - versionUpdateListener = listener; - } - //-----------------------------------------------< NamespaceEventListener > /** @@ -461,13 +361,16 @@ log.info("not started: namespace operation ignored."); return; } + Record record = null; boolean succeeded = false; try { - journal.begin(null); - journal.log(oldPrefix, newPrefix, uri); - journal.prepare(); - journal.commit(); + record = journal.getProducer(PRODUCER_ID).append(); + record.writeString(null); + write(record, oldPrefix, newPrefix, uri); + record.writeChar('\0'); + record.update(); + setRevision(record.getRevision()); succeeded = true; } catch (JournalException e) { String msg = "Unable to create log entry: " + e.getMessage(); @@ -476,8 +379,8 @@ String msg = "Unexpected error while creating log entry."; log.error(msg, e); } finally { - if (!succeeded) { - journal.cancel(); + if (!succeeded && record != null) { + record.cancelUpdate(); } } } @@ -496,13 +399,16 @@ log.info("not started: nodetype operation ignored."); return; } + Record record = null; boolean succeeded = false; try { - journal.begin(null); - journal.log(ntDefs); - journal.prepare(); - journal.commit(); + record = journal.getProducer(PRODUCER_ID).append(); + record.writeString(null); + write(record, ntDefs); + record.writeChar('\0'); + record.update(); + setRevision(record.getRevision()); succeeded = true; } catch (JournalException e) { String msg = "Unable to create log entry: " + e.getMessage(); @@ -511,8 +417,8 @@ String msg = "Unexpected error while creating log entry."; log.error(msg, e); } finally { - if (!succeeded) { - journal.cancel(); + if (!succeeded && record != null) { + record.cancelUpdate(); } } } @@ -524,7 +430,6 @@ nodeTypeListener = listener; } - /** * Workspace update channel. */ @@ -536,6 +441,11 @@ private final String workspace; /** + * Record being appended. + */ + private Record record; + + /** * Create a new instance of this class. * * @param workspace workspace name @@ -548,37 +458,117 @@ * {@inheritDoc} */ public void updateCreated() { - ClusterNode.this.updateCreated(); + if (status != STARTED) { + log.info("not started: update create ignored."); + return; + } + if (record != null) { + String msg = "Record already created."; + log.warn(msg); + return; + } + try { + sync(); + record = journal.getProducer(PRODUCER_ID).append(); + //sync(); + } catch (JournalException e) { + String msg = "Unable to create log entry."; + log.error(msg, e); + } catch (Throwable e) { + String msg = "Unexpected error while creating log entry."; + log.error(msg, e); + } } /** * {@inheritDoc} */ public void updatePrepared(ChangeLog changes, EventStateCollection esc) { - ClusterNode.this.updatePrepared(workspace, changes, esc); + if (status != STARTED) { + log.info("not started: update prepare ignored."); + return; + } + if (record == null) { + String msg = "No record created."; + log.warn(msg); + return; + } + + boolean succeeded = false; + + try { + //record = journal.getProducer(PRODUCER_ID).append(); + record.writeString(workspace); + write(record, changes, esc); + record.writeChar('\0'); + succeeded = true; + } catch (JournalException e) { + String msg = "Unable to create log entry: " + e.getMessage(); + log.error(msg); + } catch (Throwable e) { + String msg = "Unexpected error while preparing log entry."; + log.error(msg, e); + } finally { + if (!succeeded && record != null) { + record.cancelUpdate(); + record = null; + } + } } /** * {@inheritDoc} */ public void updateCommitted() { - ClusterNode.this.updateCommitted(); + if (status != STARTED) { + log.info("not started: update commit ignored."); + return; + } + if (record == null) { + String msg = "No record prepared."; + log.warn(msg); + return; + } + try { + record.update(); + setRevision(record.getRevision()); + log.info("Appended revision: " + record.getRevision()); + } catch (JournalException e) { + String msg = "Unable to commit log entry."; + log.error(msg, e); + } catch (Throwable e) { + String msg = "Unexpected error while committing log entry."; + log.error(msg, e); + } finally { + record = null; + } } /** * {@inheritDoc} */ public void updateCancelled() { - ClusterNode.this.updateCancelled(); + if (status != STARTED) { + log.info("not started: update cancel ignored."); + return; + } + if (record != null) { + record.cancelUpdate(); + record = null; + } } /** * {@inheritDoc} */ public void setListener(UpdateEventListener listener) { - wspUpdateListeners.remove(workspace); - if (listener != null) { - wspUpdateListeners.put(workspace, listener); + if (workspace == null) { + versionUpdateListener = listener; + } else { + wspUpdateListeners.remove(workspace); + if (listener != null) { + wspUpdateListeners.put(workspace, listener); + } } } } @@ -606,14 +596,63 @@ * {@inheritDoc} */ public void locked(NodeId nodeId, boolean deep, String owner) { - ClusterNode.this.locked(workspace, nodeId, deep, owner); + if (status != STARTED) { + log.info("not started: lock operation ignored."); + return; + } + Record record = null; + boolean succeeded = false; + + try { + record = journal.getProducer(PRODUCER_ID).append(); + record.writeString(workspace); + write(record, nodeId, deep, owner); + record.writeChar('\0'); + record.update(); + setRevision(record.getRevision()); + succeeded = true; + } catch (JournalException e) { + String msg = "Unable to create log entry: " + e.getMessage(); + log.error(msg); + } catch (Throwable e) { + String msg = "Unexpected error while creating log entry."; + log.error(msg, e); + } finally { + if (!succeeded && record != null) { + record.cancelUpdate(); + } + } } /** * {@inheritDoc} */ public void unlocked(NodeId nodeId) { - ClusterNode.this.unlocked(workspace, nodeId); + if (status != STARTED) { + log.info("not started: unlock operation ignored."); + return; + } + Record record = null; + boolean succeeded = false; + + try { + record = journal.getProducer(PRODUCER_ID).append(); + record.writeString(workspace); + write(record, nodeId); + record.update(); + setRevision(record.getRevision()); + succeeded = true; + } catch (JournalException e) { + String msg = "Unable to create log entry: " + e.getMessage(); + log.error(msg); + } catch (Throwable e) { + String msg = "Unexpected error while creating log entry."; + log.error(msg, e); + } finally { + if (!succeeded && record != null) { + record.cancelUpdate(); + } + } } /** @@ -628,179 +667,464 @@ } /** - * Sync listener on journal. + * Invoked when a record starts. + * + * @param workspace workspace, may be null */ - class SyncListener implements RecordProcessor { + private void start(String workspace) { + this.workspace = workspace; - /** - * Workspace name. - */ - private String workspace; + changeLog = new ChangeLog(); + events = new ArrayList(); + } - /** - * Change log. - */ - private ChangeLog changeLog; + /** + * Process an update operation. + * + * @param operation operation to process + */ + private void process(ItemOperation operation) { + operation.apply(changeLog); + } - /** - * List of recorded events. - */ - private List events; + /** + * Process an event. + * + * @param event event + */ + private void process(EventState event) { + events.add(event); + } - /** - * {@inheritDoc} - */ - public void start(String workspace) { - this.workspace = workspace; + /** + * Process a lock operation. + * + * @param nodeId node id + * @param isDeep flag indicating whether lock is deep + * @param owner lock owner + */ + private void process(NodeId nodeId, boolean isDeep, String owner) { + LockEventListener listener = (LockEventListener) wspLockListeners.get(workspace); + if (listener == null) { + try { + clusterContext.lockEventsReady(workspace); + } catch (RepositoryException e) { + String msg = "Unable to make lock listener for workspace " + + workspace + " online: " + e.getMessage(); + log.warn(msg); + } + listener = (LockEventListener) wspLockListeners.get(workspace); + if (listener == null) { + String msg = "Lock channel unavailable for workspace: " + workspace; + log.error(msg); + return; + } + } + try { + listener.externalLock(nodeId, isDeep, owner); + } catch (RepositoryException e) { + String msg = "Unable to deliver lock event: " + e.getMessage(); + log.error(msg); + } + } - changeLog = new ChangeLog(); - events = new ArrayList(); + /** + * Process an unlock operation. + * + * @param nodeId node id + */ + private void process(NodeId nodeId) { + LockEventListener listener = (LockEventListener) wspLockListeners.get(workspace); + if (listener == null) { + try { + clusterContext.lockEventsReady(workspace); + } catch (RepositoryException e) { + String msg = "Unable to make lock listener for workspace " + + workspace + " online: " + e.getMessage(); + log.warn(msg); + } + listener = (LockEventListener) wspLockListeners.get(workspace); + if (listener == null) { + String msg = "Lock channel unavailable for workspace: " + workspace; + log.error(msg); + return; + } + } + try { + listener.externalUnlock(nodeId); + } catch (RepositoryException e) { + String msg = "Unable to deliver lock event: " + e.getMessage(); + log.error(msg); } + } - /** - * {@inheritDoc} - */ - public void process(ItemOperation operation) { - operation.apply(changeLog); + /** + * Process a namespace operation. + * + * @param oldPrefix old prefix. if null this is a fresh mapping + * @param newPrefix new prefix. if null this is an unmap operation + * @param uri uri to map prefix to + */ + private void process(String oldPrefix, String newPrefix, String uri) { + if (namespaceListener == null) { + String msg = "Namespace listener unavailable."; + log.error(msg); + return; + } + try { + namespaceListener.externalRemap(oldPrefix, newPrefix, uri); + } catch (RepositoryException e) { + String msg = "Unable to deliver namespace operation: " + e.getMessage(); + log.error(msg); } + } - /** - * {@inheritDoc} - */ - public void process(EventState event) { - events.add(event); + /** + * Process one or more node type registrations. + * + * @param ntDefs node type definition + */ + private void process(Collection ntDefs) { + if (nodeTypeListener == null) { + String msg = "NodeType listener unavailable."; + log.error(msg); + return; } + try { + nodeTypeListener.externalRegistered(ntDefs); + } catch (InvalidNodeTypeDefException e) { + String msg = "Unable to deliver node type operation: " + e.getMessage(); + log.error(msg); + } catch (RepositoryException e) { + String msg = "Unable to deliver node type operation: " + e.getMessage(); + log.error(msg); + } + } - /** - * {@inheritDoc} - */ - public void process(NodeId nodeId, boolean isDeep, String owner) { - LockEventListener listener = (LockEventListener) wspLockListeners.get(workspace); + /** + * Invoked when a record ends. + */ + private void end() { + UpdateEventListener listener = null; + if (workspace != null) { + listener = (UpdateEventListener) wspUpdateListeners.get(workspace); if (listener == null) { try { - clusterContext.lockEventsReady(workspace); + clusterContext.updateEventsReady(workspace); } catch (RepositoryException e) { - String msg = "Unable to make lock listener for workspace " + + String msg = "Error making update listener for workspace " + workspace + " online: " + e.getMessage(); log.warn(msg); } - listener = (LockEventListener) wspLockListeners.get(workspace); + listener = (UpdateEventListener) wspUpdateListeners.get(workspace); if (listener == null) { - String msg = "Lock channel unavailable for workspace: " + workspace; + String msg = "Update listener unavailable for workspace: " + workspace; log.error(msg); return; } } - try { - listener.externalLock(nodeId, isDeep, owner); - } catch (RepositoryException e) { - String msg = "Unable to deliver lock event: " + e.getMessage(); + } else { + if (versionUpdateListener != null) { + listener = versionUpdateListener; + } else { + String msg = "Version update listener unavailable."; log.error(msg); + return; } } + try { + listener.externalUpdate(changeLog, events); + } catch (RepositoryException e) { + String msg = "Unable to deliver update events: " + e.getMessage(); + log.error(msg); + } + } + + //-------------------------------------------------------< RecordConsumer > + + /** + * {@inheritDoc} + */ + public String getId() { + return PRODUCER_ID; + } + + /** + * {@inheritDoc} + */ + public long getRevision() { + try { + return instanceRevision.get(); + } catch (JournalException e) { + log.warn("Unable to return current revision.", e); + return Long.MAX_VALUE; + } + } - /** - * {@inheritDoc} - */ - public void process(NodeId nodeId) { - LockEventListener listener = (LockEventListener) wspLockListeners.get(workspace); - if (listener == null) { - try { - clusterContext.lockEventsReady(workspace); - } catch (RepositoryException e) { - String msg = "Unable to make lock listener for workspace " + - workspace + " online: " + e.getMessage(); - log.warn(msg); + /** + * {@inheritDoc} + */ + public void consume(Record record) { + log.info("Processing revision: " + record.getRevision()); + + String workspace = null; + + try { + workspace = record.readString(); + start(workspace); + + for (;;) { + char c = record.readChar(); + if (c == '\0') { + break; } - listener = (LockEventListener) wspLockListeners.get(workspace); - if (listener == null) { - String msg = "Lock channel unavailable for workspace: " + workspace; - log.error(msg); - return; + if (c == 'N') { + NodeOperation operation = NodeOperation.create(record.readByte()); + operation.setId(record.readNodeId()); + process(operation); + } else if (c == 'P') { + PropertyOperation operation = PropertyOperation.create(record.readByte()); + operation.setId(record.readPropertyId()); + process(operation); + } else if (c == 'E') { + int type = record.readByte(); + NodeId parentId = record.readNodeId(); + Path parentPath = record.readPath(); + NodeId childId = record.readNodeId(); + Path.PathElement childRelPath = record.readPathElement(); + QName ntName = record.readQName(); + + Set mixins = new HashSet(); + int mixinCount = record.readInt(); + for (int i = 0; i < mixinCount; i++) { + mixins.add(record.readQName()); + } + String userId = record.readString(); + process(createEventState(type, parentId, parentPath, childId, + childRelPath, ntName, mixins, userId)); + } else if (c == 'L') { + NodeId nodeId = record.readNodeId(); + boolean isLock = record.readBoolean(); + if (isLock) { + boolean isDeep = record.readBoolean(); + String owner = record.readString(); + process(nodeId, isDeep, owner); + } else { + process(nodeId); + } + } else if (c == 'S') { + String oldPrefix = record.readString(); + String newPrefix = record.readString(); + String uri = record.readString(); + process(oldPrefix, newPrefix, uri); + } else if (c == 'T') { + int size = record.readInt(); + HashSet ntDefs = new HashSet(); + for (int i = 0; i < size; i++) { + ntDefs.add(record.readNodeTypeDef()); + } + process(ntDefs); + } else { + throw new IllegalArgumentException("Unknown entry type: " + c); } } - try { - listener.externalUnlock(nodeId); - } catch (RepositoryException e) { - String msg = "Unable to deliver lock event: " + e.getMessage(); - log.error(msg); - } + end(); + + } catch (JournalException e) { + String msg = "Unable to read revision '" + record.getRevision() + "'."; + log.error(msg, e); + } catch (IllegalArgumentException e) { + String msg = "Error while processing revision " + + record.getRevision() + ": " + e.getMessage(); + log.error(msg); } + } - /** - * {@inheritDoc} - */ - public void process(String oldPrefix, String newPrefix, String uri) { - if (namespaceListener == null) { - String msg = "Namespace listener unavailable."; - log.error(msg); - return; - } - try { - namespaceListener.externalRemap(oldPrefix, newPrefix, uri); - } catch (RepositoryException e) { - String msg = "Unable to deliver namespace operation: " + e.getMessage(); - log.error(msg); - } + /** + * {@inheritDoc} + */ + public void setRevision(long revision) { + try { + instanceRevision.set(revision); + } catch (JournalException e) { + log.warn("Unable to set current revision to " + revision + ".", e); } + } - /** - * {@inheritDoc} - */ - public void process(Collection ntDefs) { - if (nodeTypeListener == null) { - String msg = "NodeType listener unavailable."; - log.error(msg); - return; - } - try { - nodeTypeListener.externalRegistered(ntDefs); - } catch (InvalidNodeTypeDefException e) { - String msg = "Unable to deliver node type operation: " + e.getMessage(); - log.error(msg); - } catch (RepositoryException e) { - String msg = "Unable to deliver node type operation: " + e.getMessage(); - log.error(msg); - } + /** + * Create an event state. + * + * @param type event type + * @param parentId parent id + * @param parentPath parent path + * @param childId child id + * @param childRelPath child relative path + * @param ntName ndoe type name + * @param userId user id + * @return event + */ + private EventState createEventState(int type, NodeId parentId, Path parentPath, + NodeId childId, Path.PathElement childRelPath, + QName ntName, Set mixins, String userId) { + switch (type) { + case Event.NODE_ADDED: + return EventState.childNodeAdded(parentId, parentPath, childId, childRelPath, + ntName, mixins, getOrCreateSession(userId), true); + case Event.NODE_REMOVED: + return EventState.childNodeRemoved(parentId, parentPath, childId, childRelPath, + ntName, mixins, getOrCreateSession(userId), true); + case Event.PROPERTY_ADDED: + return EventState.propertyAdded(parentId, parentPath, childRelPath, + ntName, mixins, getOrCreateSession(userId), true); + case Event.PROPERTY_CHANGED: + return EventState.propertyChanged(parentId, parentPath, childRelPath, + ntName, mixins, getOrCreateSession(userId), true); + case Event.PROPERTY_REMOVED: + return EventState.propertyRemoved(parentId, parentPath, childRelPath, + ntName, mixins, getOrCreateSession(userId), true); + default: + String msg = "Unexpected event type: " + type; + throw new IllegalArgumentException(msg); } + } - /** - * {@inheritDoc} - */ - public void end() { - UpdateEventListener listener = null; - if (workspace != null) { - listener = (UpdateEventListener) wspUpdateListeners.get(workspace); - if (listener == null) { - try { - clusterContext.updateEventsReady(workspace); - } catch (RepositoryException e) { - String msg = "Error making update listener for workspace " + - workspace + " online: " + e.getMessage(); - log.warn(msg); - } - listener = (UpdateEventListener) wspUpdateListeners.get(workspace); - if (listener == null) { - String msg = "Update listener unavailable for workspace: " + workspace; - log.error(msg); - return; - } - } + /** + * Return a session matching a certain user id. + * + * @param userId user id + * @return session + */ + private Session getOrCreateSession(String userId) { + if (lastSession == null || !lastSession.getUserID().equals(userId)) { + lastSession = new ClusterSession(userId); + } + return lastSession; + } + + //-----------------------------------------------< Record writing methods > + + private static void write(Record record, ChangeLog changeLog, EventStateCollection esc) + throws JournalException { + + Iterator addedStates = changeLog.addedStates(); + while (addedStates.hasNext()) { + ItemState state = (ItemState) addedStates.next(); + if (state.isNode()) { + write(record, NodeAddedOperation.create((NodeState) state)); } else { - if (versionUpdateListener != null) { - listener = versionUpdateListener; - } else { - String msg = "Version update listener unavailable."; - log.error(msg); - return; - } + write(record, PropertyAddedOperation.create((PropertyState) state)); } - try { - listener.externalUpdate(changeLog, events); - } catch (RepositoryException e) { - String msg = "Unable to deliver update events: " + e.getMessage(); - log.error(msg); + } + Iterator modifiedStates = changeLog.modifiedStates(); + while (modifiedStates.hasNext()) { + ItemState state = (ItemState) modifiedStates.next(); + if (state.isNode()) { + write(record, NodeModifiedOperation.create((NodeState) state)); + } else { + write(record, PropertyModifiedOperation.create((PropertyState) state)); } + } + Iterator deletedStates = changeLog.deletedStates(); + while (deletedStates.hasNext()) { + ItemState state = (ItemState) deletedStates.next(); + if (state.isNode()) { + write(record, NodeDeletedOperation.create((NodeState) state)); + } else { + write(record, PropertyDeletedOperation.create((PropertyState) state)); + } + } + + Iterator events = esc.getEvents().iterator(); + while (events.hasNext()) { + EventState event = (EventState) events.next(); + write(record, event); + } + } + + private static void write(Record record, String oldPrefix, String newPrefix, String uri) + throws JournalException { + + record.writeChar('S'); + record.writeString(oldPrefix); + record.writeString(newPrefix); + record.writeString(uri); + } + + private static void write(Record record, NodeId nodeId, boolean isDeep, String owner) + throws JournalException { + + write(record, nodeId, true, isDeep, owner); + } + + private static void write(Record record, NodeId nodeId) + throws JournalException { + + write(record, nodeId, false, false, null); + } + + private static void write(Record record, Collection ntDefs) + throws JournalException { + + record.writeChar('T'); + record.writeInt(ntDefs.size()); + + Iterator iter = ntDefs.iterator(); + while (iter.hasNext()) { + record.writeNodeTypeDef((NodeTypeDef) iter.next()); + } + } + + private static void write(Record record, PropertyOperation operation) + throws JournalException { + + record.writeChar('P'); + record.writeByte(operation.getOperationType()); + record.writePropertyId(operation.getId()); + } + + private static void write(Record record, NodeOperation operation) + throws JournalException { + + record.writeChar('N'); + record.writeByte(operation.getOperationType()); + record.writeNodeId(operation.getId()); + } + + /** + * Log an event. Subclass responsibility. + * + * @param event event to log + */ + private static void write(Record record, EventState event) + throws JournalException { + + record.writeChar('E'); + record.writeByte(event.getType()); + record.writeNodeId(event.getParentId()); + record.writePath(event.getParentPath()); + record.writeNodeId(event.getChildId()); + record.writePathElement(event.getChildRelPath()); + record.writeQName(event.getNodeType()); + + Set mixins = event.getMixinNames(); + record.writeInt(mixins.size()); + Iterator iter = mixins.iterator(); + while (iter.hasNext()) { + record.writeQName((QName) iter.next()); + } + record.writeString(event.getUserId()); + } + + private static void write(Record record, NodeId nodeId, boolean isLock, + boolean isDeep, String owner) + throws JournalException { + + record.writeChar('L'); + record.writeNodeId(nodeId); + record.writeBoolean(isLock); + if (isLock) { + record.writeBoolean(isDeep); + record.writeString(owner); } } } Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/ClusterConfig.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/ClusterConfig.java?view=diff&rev=509624&r1=509623&r2=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/ClusterConfig.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/ClusterConfig.java Tue Feb 20 08:06:42 2007 @@ -30,7 +30,7 @@ /** * Sync delay. */ - private final int syncDelay; + private final long syncDelay; /** * Journal configuration. @@ -41,9 +41,10 @@ * Creates a new cluster configuration. * * @param id custom cluster node id + * @param syncDelay syncDelay, in milliseconds * @param jc journal configuration */ - public ClusterConfig(String id, int syncDelay, JournalConfig jc) { + public ClusterConfig(String id, long syncDelay, JournalConfig jc) { this.id = id; this.syncDelay = syncDelay; this.jc = jc; @@ -63,7 +64,7 @@ * * @return syncDelay */ - public int getSyncDelay() { + public long getSyncDelay() { return syncDelay; } Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java?view=diff&rev=509624&r1=509623&r2=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java Tue Feb 20 08:06:42 2007 @@ -115,6 +115,9 @@ /** Name of the clustered configuration attribute. */ public static final String CLUSTERED_ATTRIBUTE = "clustered"; + /** Default synchronization delay, in milliseconds. */ + public static final String DEFAULT_SYNC_DELAY = "5000"; + /** * Creates a new configuration parser with the given parser variables. * @@ -478,8 +481,8 @@ Element element = (Element) child; String id = getAttribute(element, ID_ATTRIBUTE, null); - int syncDelay = Integer.parseInt( - getAttribute(element, SYNC_DELAY_ATTRIBUTE, "5")); + long syncDelay = Long.parseLong( + getAttribute(element, SYNC_DELAY_ATTRIBUTE, DEFAULT_SYNC_DELAY)); JournalConfig jc = parseJournalConfig(element); return new ClusterConfig(id, syncDelay, jc); Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock; +import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock; + +import java.util.Map; +import java.util.HashMap; +import java.util.Iterator; +import java.io.File; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.jackrabbit.name.NamespaceResolver; + +/** + * Base implementation for a journal. + */ +public abstract class AbstractJournal implements Journal { + + /** + * Logger. + */ + private static Logger log = LoggerFactory.getLogger(AbstractJournal.class); + + /** + * Journal id. + */ + private String id; + + /** + * Namespace resolver. + */ + private NamespaceResolver resolver; + + /** + * Map of registered consumers. + */ + private final Map consumers = new HashMap(); + + /** + * Map of registered producers. + */ + private final Map producers = new HashMap(); + + /** + * Read + */ + private final ReadWriteLock rwLock = new WriterPreferenceReadWriteLock(); + + /** + * {@inheritDoc} + */ + public void init(String id, NamespaceResolver resolver) throws JournalException { + this.id = id; + this.resolver = resolver; + } + + /** + * {@inheritDoc} + */ + public void register(RecordConsumer consumer) throws JournalException { + synchronized (consumers) { + String consumerId = consumer.getId(); + if (consumers.containsKey(consumerId)) { + String msg = "Record consumer with identifier '" + + consumerId + "' already registered."; + throw new JournalException(msg); + } + consumers.put(consumerId, consumer); + } + } + + /** + * {@inheritDoc} + */ + public boolean unregister(RecordConsumer consumer) { + synchronized (consumers) { + String consumerId = consumer.getId(); + return consumers.remove(consumerId) != null; + } + } + + /** + * Return the consumer given its identifier. + * + * @param identifier identifier + * @return consumer associated with identifier; + * null if no consumer is associated with identifier + */ + public RecordConsumer getConsumer(String identifier) { + synchronized (consumers) { + return (RecordConsumer) consumers.get(identifier); + } + } + + /** + * {@inheritDoc} + */ + public RecordProducer getProducer(String identifier) { + synchronized (producers) { + RecordProducer producer = (RecordProducer) producers.get(identifier); + if (producer == null) { + producer = createProducer(identifier); + producers.put(identifier, producer); + } + return producer; + } + } + + /** + * Create the record producer for a given identifier. May be overridden + * by subclasses. + * + * @param identifier producer identifier + */ + protected RecordProducer createProducer(String identifier) { + return new DefaultRecordProducer(this, identifier); + } + + /** + * Return the minimal revision of all registered consumers. + */ + private long getMinimalRevision() { + long minimalRevision = Long.MAX_VALUE; + + synchronized (consumers) { + Iterator iter = consumers.values().iterator(); + while (iter.hasNext()) { + RecordConsumer consumer = (RecordConsumer) iter.next(); + if (consumer.getRevision() < minimalRevision) { + minimalRevision = consumer.getRevision(); + } + } + } + return minimalRevision; + } + + /** + * {@inheritDoc} + */ + public void sync() throws JournalException { + try { + rwLock.readLock().acquire(); + } catch (InterruptedException e) { + String msg = "Unable to acquire read lock."; + throw new JournalException(msg, e); + } + try { + doSync(getMinimalRevision()); + } finally { + rwLock.readLock().release(); + } + } + + /** + * Synchronize contents from journal. May be overridden by subclasses. + * + * @param startRevision start point (exlusive) + * @throws JournalException if an error occurs + */ + protected void doSync(long startRevision) throws JournalException { + RecordIterator iterator = getRecords(startRevision); + long stopRevision = Long.MIN_VALUE; + + try { + while (iterator.hasNext()) { + Record record = iterator.nextRecord(); + if (record.getJournalId().equals(id)) { + log.info("Record with revision '" + record.getRevision() + + "' created by this journal, skipped."); + } else { + RecordConsumer consumer = getConsumer(record.getProducerId()); + if (consumer != null) { + consumer.consume(record); + } + } + stopRevision = record.getRevision(); + } + } finally { + iterator.close(); + } + + if (stopRevision > 0) { + Iterator iter = consumers.values().iterator(); + while (iter.hasNext()) { + RecordConsumer consumer = (RecordConsumer) iter.next(); + consumer.setRevision(stopRevision); + } + log.info("Synchronized to revision: " + stopRevision); + } + } + + /** + * Return an iterator over all records after the specified revision. + * + * @param startRevision start point (exlusive) + * @throws JournalException if an error occurs + */ + protected abstract RecordIterator getRecords(long startRevision) + throws JournalException; + + /** + * Lock the journal revision, disallowing changes from other sources until + * {@link #unlock has been called, and synchronizes to the latest change. + * + * @throws JournalException if an error occurs + */ + public void lockAndSync() throws JournalException { + try { + rwLock.writeLock().acquire(); + } catch (InterruptedException e) { + String msg = "Unable to acquire write lock."; + throw new JournalException(msg, e); + } + + boolean succeeded = false; + + try { + // lock + doLock(); + try { + // and sync + doSync(getMinimalRevision()); + succeeded = true; + } finally { + if (!succeeded) { + doUnlock(false); + } + } + } finally { + if (!succeeded) { + rwLock.writeLock().release(); + } + } + } + + /** + * Unlock the journal revision. + * + * @param successful flag indicating whether the update process was + * successful + */ + public void unlock(boolean successful) { + doUnlock(successful); + + rwLock.writeLock().release(); + } + + /** + * Lock the journal revision. Subclass responsibility. + * + * @throws JournalException if an error occurs + */ + protected abstract void doLock() throws JournalException; + + /** + * Append a record backed by a file. Subclass responsibility. + * + * @param producerId producer identifier + * @return the new record's revision + * + * @throws JournalException if an error occurs + */ + protected abstract long append(String producerId, File file) + throws JournalException; + + /** + * Unlock the journal revision. Subclass responsibility. + * + * @param successful flag indicating whether the update process was + * successful + */ + protected abstract void doUnlock(boolean successful); + + /** + * Return this journal's identifier. + * + * @return journal identifier + */ + public String getId() { + return id; + } + + /** + * Return this journal's namespace resolver. + * + * @return namespace resolver + */ + public NamespaceResolver getResolver() { + return resolver; + } +} \ No newline at end of file Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractRecord.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractRecord.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractRecord.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractRecord.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +import org.apache.jackrabbit.name.NamespaceResolver; +import org.apache.jackrabbit.name.QName; +import org.apache.jackrabbit.name.NameFormat; +import org.apache.jackrabbit.name.NoPrefixDeclaredException; +import org.apache.jackrabbit.name.Path; +import org.apache.jackrabbit.name.PathFormat; +import org.apache.jackrabbit.name.UnknownPrefixException; +import org.apache.jackrabbit.name.IllegalNameException; +import org.apache.jackrabbit.name.MalformedPathException; +import org.apache.jackrabbit.core.NodeId; +import org.apache.jackrabbit.core.PropertyId; +import org.apache.jackrabbit.core.nodetype.NodeTypeDef; +import org.apache.jackrabbit.core.nodetype.compact.CompactNodeTypeDefWriter; +import org.apache.jackrabbit.core.nodetype.compact.CompactNodeTypeDefReader; +import org.apache.jackrabbit.core.nodetype.compact.ParseException; +import org.apache.jackrabbit.uuid.Constants; +import org.apache.jackrabbit.uuid.UUID; + +import java.util.ArrayList; +import java.util.List; +import java.io.StringWriter; +import java.io.IOException; +import java.io.StringReader; + +/** + * Base implementation for a record. + */ +public abstract class AbstractRecord implements Record { + + /** + * Indicator for a literal UUID. + */ + private static final byte UUID_LITERAL = 'L'; + + /** + * Indicator for a UUID index. + */ + private static final byte UUID_INDEX = 'I'; + + /** + * UUID index. + */ + private final ArrayList uuidIndex = new ArrayList(); + + /** + * Namespace resolver. + */ + protected final NamespaceResolver resolver; + + /** + * Create a new instance of this class. + */ + public AbstractRecord(NamespaceResolver resolver) { + this.resolver = resolver; + } + + /** + * {@inheritDoc} + */ + public void writeQName(QName name) throws JournalException { + try { + writeString(NameFormat.format(name, resolver)); + } catch (NoPrefixDeclaredException e) { + String msg = "Undeclared prefix error while writing name."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public void writePathElement(Path.PathElement element) throws JournalException { + writeQName(element.getName()); + writeInt(element.getIndex()); + } + + /** + * {@inheritDoc} + */ + public void writePath(Path path) throws JournalException { + try { + writeString(PathFormat.format(path, resolver)); + } catch (NoPrefixDeclaredException e) { + String msg = "Undeclared prefix error while writing path."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public void writeNodeId(NodeId nodeId) throws JournalException { + if (nodeId == null) { + writeByte(UUID_INDEX); + writeInt(-1); + } else { + int index = getOrCreateIndex(nodeId); + if (index != -1) { + writeByte(UUID_INDEX); + writeInt(index); + } else { + writeByte(UUID_LITERAL); + write(nodeId.getUUID().getRawBytes()); + } + } + } + + /** + * {@inheritDoc} + */ + public void writePropertyId(PropertyId propertyId) throws JournalException { + writeNodeId(propertyId.getParentId()); + writeQName(propertyId.getName()); + } + + /** + * {@inheritDoc} + */ + public void writeNodeTypeDef(NodeTypeDef ntd) throws JournalException { + try { + StringWriter sw = new StringWriter(); + CompactNodeTypeDefWriter writer = new CompactNodeTypeDefWriter(sw, resolver, true); + writer.write(ntd); + writer.close(); + + writeString(sw.toString()); + } catch (IOException e) { + String msg = "I/O error while writing node type definition."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public QName readQName() throws JournalException { + try { + return NameFormat.parse(readString(), resolver); + } catch (UnknownPrefixException e) { + String msg = "Unknown prefix error while reading name."; + throw new JournalException(msg, e); + } catch (IllegalNameException e) { + String msg = "Illegal name error while reading name."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public Path.PathElement readPathElement() throws JournalException { + try { + QName name = NameFormat.parse(readString(), resolver); + int index = readInt(); + if (index != 0) { + return Path.PathElement.create(name, index); + } else { + return Path.PathElement.create(name); + } + } catch (UnknownPrefixException e) { + String msg = "Unknown prefix error while reading path element."; + throw new JournalException(msg, e); + } catch (IllegalNameException e) { + String msg = "Illegal name error while reading path element."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public Path readPath() throws JournalException { + try { + return PathFormat.parse(readString(), resolver); + } catch (MalformedPathException e) { + String msg = "Malformed path error while reading path."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public NodeId readNodeId() throws JournalException { + byte uuidType = readByte(); + if (uuidType == UUID_INDEX) { + int index = readInt(); + if (index == -1) { + return null; + } else { + return (NodeId) uuidIndex.get(index); + } + } else if (uuidType == UUID_LITERAL) { + byte[] b = new byte[Constants.UUID_BYTE_LENGTH]; + readFully(b); + NodeId nodeId = new NodeId(new UUID(b)); + uuidIndex.add(nodeId); + return nodeId; + } else { + String msg = "Unknown UUID type found: " + uuidType; + throw new JournalException(msg); + } + } + + /** + * {@inheritDoc} + */ + public PropertyId readPropertyId() throws JournalException { + return new PropertyId(readNodeId(), readQName()); + } + + /** + * {@inheritDoc} + */ + public NodeTypeDef readNodeTypeDef() throws JournalException { + try { + StringReader sr = new StringReader(readString()); + + CompactNodeTypeDefReader reader = new CompactNodeTypeDefReader(sr, "(internal)"); + List ntds = reader.getNodeTypeDefs(); + if (ntds.size() != 1) { + throw new JournalException("Expected one node type definition: got " + ntds.size()); + } + return (NodeTypeDef) ntds.get(0); + } catch (ParseException e) { + String msg = "Parse error while reading node type definition."; + throw new JournalException(msg, e); + } + } + + /** + * Get a NodeId's existing cache index, creating a new entry if necesary. + * + * @param nodeId nodeId to lookup + * @return cache index of existing entry or -1 to indicate the entry was added + */ + private int getOrCreateIndex(NodeId nodeId) { + int index = uuidIndex.indexOf(nodeId); + if (index == -1) { + uuidIndex.add(nodeId); + } + return index; + } +} \ No newline at end of file Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +/** + * Default temporary record used for appending to some journal. + */ +public class AppendRecord extends AbstractRecord { + + /** + * Logger. + */ + private static Logger log = LoggerFactory.getLogger(AppendRecord.class); + + /** + * Default prefix for appended records in the file system. + */ + private static final String DEFAULT_PREFIX = "journal"; + + /** + * Default extension for appended records in the file system. + */ + private static final String DEFAULT_EXT = ".tmp"; + + /** + * Journal where record is being appended. + */ + private final AbstractJournal journal; + + /** + * Producer identifier. + */ + private final String producerId; + + /** + * This record's revision. + */ + private long revision; + + /** + * Underlying file. + */ + private File file; + + /** + * Underlying data output. + */ + private DataOutputStream dataOut; + + /** + * Create a new instance of this class. + * + * @param journal journal where record is being appended + * @param producerId producer identifier + */ + public AppendRecord(AbstractJournal journal, String producerId) { + super(journal.getResolver()); + + this.journal = journal; + this.producerId = producerId; + this.revision = 0L; + } + + /** + * {@inheritDoc} + */ + public String getJournalId() { + return journal.getId(); + } + + /** + * {@inheritDoc} + */ + public String getProducerId() { + return producerId; + } + + /** + * {@inheritDoc} + */ + public long getRevision() { + return revision; + } + + /** + * {@inheritDoc} + */ + public void writeByte(int n) throws JournalException { + open(); + + try { + dataOut.writeByte(n); + } catch (IOException e) { + String msg = "I/O error while writing byte."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public void writeChar(char c) throws JournalException { + open(); + + try { + dataOut.writeChar(c); + } catch (IOException e) { + String msg = "I/O error while writing character."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public void writeBoolean(boolean b) throws JournalException { + open(); + + try { + dataOut.writeBoolean(b); + } catch (IOException e) { + String msg = "I/O error while writing boolean."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public void writeInt(int n) throws JournalException { + open(); + + try { + dataOut.writeInt(n); + } catch (IOException e) { + String msg = "I/O error while writing integer."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public void writeString(String s) throws JournalException { + open(); + + try { + if (s == null) { + dataOut.writeBoolean(true); + } else { + dataOut.writeBoolean(false); + dataOut.writeUTF(s); + } + } catch (IOException e) { + String msg = "I/O error while writing string."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public void write(byte[] b) throws JournalException { + open(); + + try { + dataOut.write(b); + } catch (IOException e) { + String msg = "I/O error while writing a byte array."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public void update() throws JournalException { + boolean succeeded = false; + + try { + close(); + revision = journal.append(producerId, file); + succeeded = true; + } finally { + dispose(); + + journal.unlock(succeeded); + } + } + + /** + * {@inheritDoc} + */ + public void cancelUpdate() { + if (dataOut != null) { + dispose(); + + journal.unlock(false); + } + } + + /** + * Create temporary file and open data output on it. + * + * @throws JournalException + */ + private void open() throws JournalException { + if (file == null) { + try { + file = File.createTempFile(DEFAULT_PREFIX, DEFAULT_EXT); + dataOut = new DataOutputStream(new FileOutputStream(file)); + } catch (IOException e) { + String msg = "Unable to create temporary file."; + throw new JournalException(msg, e); + } + } + } + + /** + * Close this record, keeping the underlying file. + * + * @throws JournalException if an error occurs + */ + private void close() throws JournalException { + if (dataOut != null) { + try { + dataOut.close(); + } catch (IOException e) { + String msg = "I/O error while closing stream."; + throw new JournalException(msg, e); + } finally { + dataOut = null; + } + } + } + + /** + * Dispose this record, deleting the underlying file. + */ + private void dispose() { + if (dataOut != null) { + try { + dataOut.close(); + } catch (IOException e) { + String msg = "I/O error while closing stream."; + log.warn(msg, e); + } finally { + dataOut = null; + } + } + if (file != null) { + file.delete(); + file = null; + } + } + + /** + * Unsupported methods when appending. + */ + public byte readByte() throws JournalException { + throw unsupported(); + } + + public char readChar() throws JournalException { + throw unsupported(); + } + + public boolean readBoolean() throws JournalException { + throw unsupported(); + } + + public int readInt() throws JournalException { + throw unsupported(); + } + + public String readString() throws JournalException { + throw unsupported(); + } + + public void readFully(byte[] b) throws JournalException { + throw unsupported(); + } + + private JournalException unsupported() { + String msg = "Reading from an appended record is not supported."; + return new JournalException(msg); + } +}