jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dpfis...@apache.org
Subject svn commit: r509624 [1/3] - in /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core: ./ cluster/ config/ journal/ state/
Date Tue, 20 Feb 2007 16:06:45 GMT
Author: dpfister
Date: Tue Feb 20 08:06:42 2007
New Revision: 509624

URL: http://svn.apache.org/viewvc?view=rev&rev=509624
Log:
JCR-749 Add mysql ddl for clustering (DatabaseJournal)
JCR-756 Concurrent add/remove child node operations in a cluster may corrupt repository
JCR-757 Allow multiple producers to feed/consume journal

Added:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractRecord.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseRecordIterator.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DefaultRecordProducer.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordIterator.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordLog.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRevision.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/JournalException.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/LockableFileRevision.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/ReadRecord.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Record.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordConsumer.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordIterator.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordProducer.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/default.ddl
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/derby.ddl
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/h2.ddl
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/mysql.ddl
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/oracle.ddl
Removed:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/AbstractJournal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/DatabaseJournal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRevision.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/Journal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/JournalException.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/Record.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/RecordInput.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/RecordOutput.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/RecordProcessor.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/default.ddl
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/derby.ddl
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/h2.ddl
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/oracle.ddl
Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/ClusterConfig.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java?view=diff&rev=509624&r1=509623&r2=509624
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java Tue Feb 20 08:06:42 2007
@@ -282,7 +282,7 @@
         vMgr = createVersionManager(repConfig.getVersioningConfig(),
                 delegatingDispatcher);
         if (clusterNode != null) {
-            vMgr.setEventChannel(clusterNode);
+            vMgr.setEventChannel(clusterNode.createUpdateChannel(null));
         }
 
         // init virtual node type manager
@@ -657,7 +657,7 @@
      *
      * @return clustered node
      */
-    private ClusterNode createClusterNode() throws RepositoryException {
+    protected ClusterNode createClusterNode() throws RepositoryException {
         try {
             ClusterNode clusterNode = new ClusterNode();
             clusterNode.init(new ExternalEventListener());

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java?view=diff&rev=509624&r1=509623&r2=509624
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java Tue Feb 20 08:06:42 2007
@@ -20,25 +20,43 @@
 import org.slf4j.LoggerFactory;
 import org.apache.jackrabbit.core.config.ClusterConfig;
 import org.apache.jackrabbit.core.config.ConfigurationException;
+import org.apache.jackrabbit.core.config.JournalConfig;
 import org.apache.jackrabbit.core.NodeId;
+import org.apache.jackrabbit.core.journal.Journal;
+import org.apache.jackrabbit.core.journal.RecordConsumer;
+import org.apache.jackrabbit.core.journal.Record;
+import org.apache.jackrabbit.core.journal.JournalException;
+import org.apache.jackrabbit.core.journal.FileRevision;
 import org.apache.jackrabbit.core.nodetype.InvalidNodeTypeDefException;
+import org.apache.jackrabbit.core.nodetype.NodeTypeDef;
 import org.apache.jackrabbit.core.observation.EventState;
 import org.apache.jackrabbit.core.observation.EventStateCollection;
 import org.apache.jackrabbit.core.state.ChangeLog;
+import org.apache.jackrabbit.core.state.ItemState;
+import org.apache.jackrabbit.core.state.NodeState;
+import org.apache.jackrabbit.core.state.PropertyState;
+import org.apache.jackrabbit.name.QName;
+import org.apache.jackrabbit.name.Path;
 import EDU.oswego.cs.dl.util.concurrent.Mutex;
 
 import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.observation.Event;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.HashSet;
+import java.io.File;
 
 /**
  * Default clustered node implementation.
  */
-public class ClusterNode implements Runnable, UpdateEventChannel,
-        NamespaceEventChannel, NodeTypeEventChannel  {
+public class ClusterNode implements Runnable,
+        NamespaceEventChannel, NodeTypeEventChannel, RecordConsumer  {
 
     /**
      * System property specifying a node id to use.
@@ -46,11 +64,21 @@
     public static final String SYSTEM_PROPERTY_NODE_ID = "org.apache.jackrabbit.core.cluster.node_id";
 
     /**
+     * Revision counter parameter name.
+     */
+    private static final String REVISION_NAME = "revision";
+
+    /**
      * Used for padding short string representations.
      */
     private static final String SHORT_PADDING = "0000";
 
     /**
+     * Producer identifier.
+     */
+    private static final String PRODUCER_ID = "JR";
+
+    /**
      * Status constant.
      */
     private static final int NONE = 0;
@@ -81,9 +109,9 @@
     private String clusterNodeId;
 
     /**
-     * Synchronization delay, in seconds.
+     * Synchronization delay, in milliseconds.
      */
-    private int syncDelay;
+    private long syncDelay;
 
     /**
      * Journal used.
@@ -126,6 +154,31 @@
     private NodeTypeEventListener nodeTypeListener;
 
     /**
+     * Instance revision file.
+     */
+    private FileRevision instanceRevision;
+
+    /**
+     * Workspace name used when consuming records.
+     */
+    private String workspace;
+
+    /**
+     * Change log used when consuming records.
+     */
+    private ChangeLog changeLog;
+
+    /**
+     * List of recorded events; used when consuming records.
+     */
+    private List events;
+
+    /**
+     * Last used session for event sources.
+     */
+    private Session lastSession;
+
+    /**
      * Initialize this cluster node.
      *
      * @throws ClusterException if an error occurs
@@ -146,11 +199,23 @@
         clusterNodeId = getClusterNodeId(cc.getId());
         syncDelay = cc.getSyncDelay();
 
+        JournalConfig jc = cc.getJournalConfig();
+
+        String revisionName = jc.getParameters().getProperty(REVISION_NAME);
+        if (revisionName == null) {
+            String msg = "Revision not specified.";
+            throw new ClusterException(msg);
+        }
         try {
-            journal = (Journal) cc.getJournalConfig().newInstance();
-            journal.init(clusterNodeId, new SyncListener(), clusterContext.getNamespaceResovler());
+            instanceRevision = new FileRevision(new File(revisionName));
+
+            journal = (Journal) jc.newInstance();
+            journal.init(clusterNodeId, clusterContext.getNamespaceResovler());
+            journal.register(this);
         } catch (ConfigurationException e) {
             throw new ClusterException(e.getMessage(), e.getCause());
+        } catch (JournalException e) {
+            throw new ClusterException(e.getMessage(), e.getCause());
         }
     }
 
@@ -178,7 +243,7 @@
         for (;;) {
             synchronized (this) {
                 try {
-                    wait(syncDelay * 1000);
+                    wait(syncDelay);
                 } catch (InterruptedException e) {}
 
                 if (status == STOPPED) {
@@ -213,8 +278,11 @@
             String msg = "Interrupted while waiting for mutex.";
             throw new ClusterException(msg);
         }
+
         try {
             journal.sync();
+        } catch (JournalException e) {
+            throw new ClusterException(e.getMessage(), e.getCause());
         } finally {
             syncLock.release();
         }
@@ -233,72 +301,6 @@
     }
 
     /**
-     * Called when a node has been locked.
-     *
-     * @param workspace workspace name
-     * @param nodeId node id
-     * @param deep flag indicating whether lock is deep
-     * @param owner lock owner
-     */
-    private void locked(String workspace, NodeId nodeId, boolean deep, String owner) {
-        if (status != STARTED) {
-            log.info("not started: lock operation ignored.");
-            return;
-        }
-        boolean succeeded = false;
-
-        try {
-            journal.begin(workspace);
-            journal.log(nodeId, deep, owner);
-            journal.prepare();
-            journal.commit();
-            succeeded = true;
-        } catch (JournalException e) {
-            String msg = "Unable to create log entry: " + e.getMessage();
-            log.error(msg);
-        } catch (Throwable e) {
-            String msg = "Unexpected error while creating log entry.";
-            log.error(msg, e);
-        } finally {
-            if (!succeeded) {
-                journal.cancel();
-            }
-        }
-    }
-
-    /**
-     * Called when a node has been unlocked.
-     *
-     * @param workspace workspace name
-     * @param nodeId node id
-     */
-    private void unlocked(String workspace, NodeId nodeId) {
-        if (status != STARTED) {
-            log.info("not started: unlock operation ignored.");
-            return;
-        }
-        boolean succeeded = false;
-
-        try {
-            journal.begin(workspace);
-            journal.log(nodeId);
-            journal.prepare();
-            journal.commit();
-            succeeded = true;
-        } catch (JournalException e) {
-            String msg = "Unable to create log entry: " + e.getMessage();
-            log.error(msg);
-        } catch (Throwable e) {
-            String msg = "Unexpected error while creating log entry.";
-            log.error(msg, e);
-        } finally {
-            if (!succeeded) {
-                journal.cancel();
-            }
-        }
-    }
-
-    /**
      * Create an {@link UpdateEventChannel} for some workspace.
      *
      * @param workspace workspace name
@@ -349,108 +351,6 @@
         return s;
     }
 
-    //--------------------------------------------------< UpdateEventListener >
-
-    /**
-     * {@inheritDoc}
-     */
-    public void updateCreated() {
-        if (status != STARTED) {
-            log.info("not started: update create ignored.");
-            return;
-        }
-        try {
-            sync();
-        } catch (ClusterException e) {
-            String msg = "Unable to sync with journal: " + e.getMessage();
-            log.error(msg);
-        } catch (Throwable e) {
-            String msg = "Unexpected error while creating log entry.";
-            log.error(msg, e);
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     * <p/>
-     * Invoked when an update has been prepared inside versioning. Delegate
-     * to common method with <code>null</code> workspace.
-     */
-    public void updatePrepared(ChangeLog changes, EventStateCollection esc) {
-        updatePrepared(null, changes, esc);
-    }
-
-    /**
-     * Called when an a update operation has been prepared.
-     *
-     * @param workspace workspace to use when writing journal entry
-     * @param changes changes
-     * @param esc events as they will be delivered on success
-     */
-    private void updatePrepared(String workspace, ChangeLog changes, EventStateCollection esc) {
-        if (status != STARTED) {
-            log.info("not started: update prepare ignored.");
-            return;
-        }
-        boolean succeeded = false;
-
-        try {
-            journal.begin(workspace);
-            journal.log(changes, esc);
-            journal.prepare();
-            succeeded = true;
-        } catch (JournalException e) {
-            String msg = "Unable to create log entry: " + e.getMessage();
-            log.error(msg);
-        } catch (Throwable e) {
-            String msg = "Unexpected error while preparing log entry.";
-            log.error(msg, e);
-        } finally {
-            if (!succeeded) {
-                journal.cancel();
-            }
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    public void updateCommitted() {
-        if (status != STARTED) {
-            log.info("not started: update commit ignored.");
-            return;
-        }
-        try {
-            journal.commit();
-        } catch (JournalException e) {
-            String msg = "Unable to create log entry: " + e.getMessage();
-            log.error(msg);
-        } catch (Throwable e) {
-            String msg = "Unexpected error while committing log entry.";
-            log.error(msg, e);
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    public void updateCancelled() {
-        if (status != STARTED) {
-            log.info("not started: update cancel ignored.");
-            return;
-        }
-        journal.cancel();
-    }
-
-    /**
-     * {@inheritDoc}
-     * <p/>
-     * Invoked to set the update event listener responsible for delivering versioning events.
-     */
-    public void setListener(UpdateEventListener listener) {
-        versionUpdateListener = listener;
-    }
-
     //-----------------------------------------------< NamespaceEventListener >
 
     /**
@@ -461,13 +361,16 @@
             log.info("not started: namespace operation ignored.");
             return;
         }
+        Record record = null;
         boolean succeeded = false;
 
         try {
-            journal.begin(null);
-            journal.log(oldPrefix, newPrefix, uri);
-            journal.prepare();
-            journal.commit();
+            record = journal.getProducer(PRODUCER_ID).append();
+            record.writeString(null);
+            write(record, oldPrefix, newPrefix, uri);
+            record.writeChar('\0');
+            record.update();
+            setRevision(record.getRevision());
             succeeded = true;
         } catch (JournalException e) {
             String msg = "Unable to create log entry: " + e.getMessage();
@@ -476,8 +379,8 @@
             String msg = "Unexpected error while creating log entry.";
             log.error(msg, e);
         } finally {
-            if (!succeeded) {
-                journal.cancel();
+            if (!succeeded && record != null) {
+                record.cancelUpdate();
             }
         }
     }
@@ -496,13 +399,16 @@
             log.info("not started: nodetype operation ignored.");
             return;
         }
+        Record record = null;
         boolean succeeded = false;
 
         try {
-            journal.begin(null);
-            journal.log(ntDefs);
-            journal.prepare();
-            journal.commit();
+            record = journal.getProducer(PRODUCER_ID).append();
+            record.writeString(null);
+            write(record, ntDefs);
+            record.writeChar('\0');
+            record.update();
+            setRevision(record.getRevision());
             succeeded = true;
         } catch (JournalException e) {
             String msg = "Unable to create log entry: " + e.getMessage();
@@ -511,8 +417,8 @@
             String msg = "Unexpected error while creating log entry.";
             log.error(msg, e);
         } finally {
-            if (!succeeded) {
-                journal.cancel();
+            if (!succeeded && record != null) {
+                record.cancelUpdate();
             }
         }
     }
@@ -524,7 +430,6 @@
         nodeTypeListener = listener;
     }
 
-
     /**
      * Workspace update channel.
      */
@@ -536,6 +441,11 @@
         private final String workspace;
 
         /**
+         * Record being appended.
+         */
+        private Record record;
+
+        /**
          * Create a new instance of this class.
          *
          * @param workspace workspace name
@@ -548,37 +458,117 @@
          * {@inheritDoc}
          */
         public void updateCreated() {
-            ClusterNode.this.updateCreated();
+            if (status != STARTED) {
+                log.info("not started: update create ignored.");
+                return;
+            }
+            if (record != null) {
+                String msg = "Record already created.";
+                log.warn(msg);
+                return;
+            }
+            try {
+                sync();
+                record = journal.getProducer(PRODUCER_ID).append();
+                //sync();
+            } catch (JournalException e) {
+                String msg = "Unable to create log entry.";
+                log.error(msg, e);
+            } catch (Throwable e) {
+                String msg = "Unexpected error while creating log entry.";
+                log.error(msg, e);
+            }
         }
 
         /**
          * {@inheritDoc}
          */
         public void updatePrepared(ChangeLog changes, EventStateCollection esc) {
-            ClusterNode.this.updatePrepared(workspace, changes, esc);
+            if (status != STARTED) {
+                log.info("not started: update prepare ignored.");
+                return;
+            }
+            if (record == null) {
+                String msg = "No record created.";
+                log.warn(msg);
+                return;
+            }
+
+            boolean succeeded = false;
+
+            try {
+                //record = journal.getProducer(PRODUCER_ID).append();
+                record.writeString(workspace);
+                write(record, changes, esc);
+                record.writeChar('\0');
+                succeeded = true;
+            } catch (JournalException e) {
+                String msg = "Unable to create log entry: " + e.getMessage();
+                log.error(msg);
+            } catch (Throwable e) {
+                String msg = "Unexpected error while preparing log entry.";
+                log.error(msg, e);
+            } finally {
+                if (!succeeded && record != null) {
+                    record.cancelUpdate();
+                    record = null;
+                }
+            }
         }
 
         /**
          * {@inheritDoc}
          */
         public void updateCommitted() {
-            ClusterNode.this.updateCommitted();
+            if (status != STARTED) {
+                log.info("not started: update commit ignored.");
+                return;
+            }
+            if (record == null) {
+                String msg = "No record prepared.";
+                log.warn(msg);
+                return;
+            }
+            try {
+                record.update();
+                setRevision(record.getRevision());
+                log.info("Appended revision: " + record.getRevision());
+            } catch (JournalException e) {
+                String msg = "Unable to commit log entry.";
+                log.error(msg, e);
+            } catch (Throwable e) {
+                String msg = "Unexpected error while committing log entry.";
+                log.error(msg, e);
+            } finally {
+                record = null;
+            }
         }
 
         /**
          * {@inheritDoc}
          */
         public void updateCancelled() {
-            ClusterNode.this.updateCancelled();
+            if (status != STARTED) {
+                log.info("not started: update cancel ignored.");
+                return;
+            }
+            if (record != null) {
+                record.cancelUpdate();
+                record = null;
+            }
         }
 
         /**
          * {@inheritDoc}
          */
         public void setListener(UpdateEventListener listener) {
-            wspUpdateListeners.remove(workspace);
-            if (listener != null) {
-                wspUpdateListeners.put(workspace, listener);
+            if (workspace == null) {
+                versionUpdateListener = listener;
+            } else {
+                wspUpdateListeners.remove(workspace);
+                if (listener != null) {
+                    wspUpdateListeners.put(workspace, listener);
+                }
             }
         }
     }
@@ -606,14 +596,63 @@
          * {@inheritDoc}
          */
         public void locked(NodeId nodeId, boolean deep, String owner) {
-            ClusterNode.this.locked(workspace, nodeId, deep, owner);
+            if (status != STARTED) {
+                log.info("not started: lock operation ignored.");
+                return;
+            }
+            Record record = null;
+            boolean succeeded = false;
+
+            try {
+                record = journal.getProducer(PRODUCER_ID).append();
+                record.writeString(workspace);
+                write(record, nodeId, deep, owner);
+                record.writeChar('\0');
+                record.update();
+                setRevision(record.getRevision());
+                succeeded = true;
+            } catch (JournalException e) {
+                String msg = "Unable to create log entry: " + e.getMessage();
+                log.error(msg);
+            } catch (Throwable e) {
+                String msg = "Unexpected error while creating log entry.";
+                log.error(msg, e);
+            } finally {
+                if (!succeeded && record != null) {
+                    record.cancelUpdate();
+                }
+            }
         }
 
         /**
          * {@inheritDoc}
          */
         public void unlocked(NodeId nodeId) {
-            ClusterNode.this.unlocked(workspace, nodeId);
+            if (status != STARTED) {
+                log.info("not started: unlock operation ignored.");
+                return;
+            }
+            Record record = null;
+            boolean succeeded = false;
+
+            try {
+                record = journal.getProducer(PRODUCER_ID).append();
+                record.writeString(workspace);
+                write(record, nodeId);
+                record.update();
+                setRevision(record.getRevision());
+                succeeded = true;
+            } catch (JournalException e) {
+                String msg = "Unable to create log entry: " + e.getMessage();
+                log.error(msg);
+            } catch (Throwable e) {
+                String msg = "Unexpected error while creating log entry.";
+                log.error(msg, e);
+            } finally {
+                if (!succeeded && record != null) {
+                    record.cancelUpdate();
+                }
+            }
         }
 
         /**
@@ -628,179 +667,464 @@
     }
 
     /**
-     * Sync listener on journal.
+     * Invoked when a record starts.
+     *
+     * @param workspace workspace, may be <code>null</code>
      */
-    class SyncListener implements RecordProcessor {
+    private void start(String workspace) {
+        this.workspace = workspace;
 
-        /**
-         * Workspace name.
-         */
-        private String workspace;
+        changeLog = new ChangeLog();
+        events = new ArrayList();
+    }
 
-        /**
-         * Change log.
-         */
-        private ChangeLog changeLog;
+    /**
+     * Process an update operation.
+     *
+     * @param operation operation to process
+     */
+    private void process(ItemOperation operation) {
+        operation.apply(changeLog);
+    }
 
-        /**
-         * List of recorded events.
-         */
-        private List events;
+    /**
+     * Process an event.
+     *
+     * @param event event
+     */
+    private void process(EventState event) {
+        events.add(event);
+    }
 
-        /**
-         * {@inheritDoc}
-         */
-        public void start(String workspace) {
-            this.workspace = workspace;
+    /**
+     * Process a lock operation.
+     *
+     * @param nodeId node id
+     * @param isDeep flag indicating whether lock is deep
+     * @param owner lock owner
+     */
+    private void process(NodeId nodeId, boolean isDeep, String owner) {
+        LockEventListener listener = (LockEventListener) wspLockListeners.get(workspace);
+        if (listener == null) {
+            try {
+                clusterContext.lockEventsReady(workspace);
+            } catch (RepositoryException e) {
+                String msg = "Unable to make lock listener for workspace " +
+                        workspace + " online: " + e.getMessage();
+                log.warn(msg);
+            }
+            listener = (LockEventListener) wspLockListeners.get(workspace);
+            if (listener ==  null) {
+                String msg = "Lock channel unavailable for workspace: " + workspace;
+                log.error(msg);
+                return;
+            }
+        }
+        try {
+            listener.externalLock(nodeId, isDeep, owner);
+        } catch (RepositoryException e) {
+            String msg = "Unable to deliver lock event: " + e.getMessage();
+            log.error(msg);
+        }
+    }
 
-            changeLog = new ChangeLog();
-            events = new ArrayList();
+    /**
+     * Process an unlock operation.
+     *
+     * @param nodeId node id
+     */
+    private void process(NodeId nodeId) {
+        LockEventListener listener = (LockEventListener) wspLockListeners.get(workspace);
+        if (listener == null) {
+            try {
+                clusterContext.lockEventsReady(workspace);
+            } catch (RepositoryException e) {
+                String msg = "Unable to make lock listener for workspace " +
+                        workspace + " online: " + e.getMessage();
+                log.warn(msg);
+            }
+            listener = (LockEventListener) wspLockListeners.get(workspace);
+            if (listener ==  null) {
+                String msg = "Lock channel unavailable for workspace: " + workspace;
+                log.error(msg);
+                return;
+            }
+        }
+        try {
+            listener.externalUnlock(nodeId);
+        } catch (RepositoryException e) {
+            String msg = "Unable to deliver lock event: " + e.getMessage();
+            log.error(msg);
         }
+    }
 
-        /**
-         * {@inheritDoc}
-         */
-        public void process(ItemOperation operation) {
-            operation.apply(changeLog);
+    /**
+     * Process a namespace operation.
+     *
+     * @param oldPrefix old prefix. if <code>null</code> this is a fresh mapping
+     * @param newPrefix new prefix. if <code>null</code> this is an unmap operation
+     * @param uri uri to map prefix to
+     */
+    private void process(String oldPrefix, String newPrefix, String uri) {
+        if (namespaceListener == null) {
+            String msg = "Namespace listener unavailable.";
+            log.error(msg);
+            return;
+        }
+        try {
+            namespaceListener.externalRemap(oldPrefix, newPrefix, uri);
+        } catch (RepositoryException e) {
+            String msg = "Unable to deliver namespace operation: " + e.getMessage();
+            log.error(msg);
         }
+    }
 
-        /**
-         * {@inheritDoc}
-         */
-        public void process(EventState event) {
-            events.add(event);
+    /**
+     * Process one or more node type registrations.
+     *
+     * @param ntDefs node type definition
+     */
+    private void process(Collection ntDefs) {
+        if (nodeTypeListener == null) {
+            String msg = "NodeType listener unavailable.";
+            log.error(msg);
+            return;
         }
+        try {
+            nodeTypeListener.externalRegistered(ntDefs);
+        } catch (InvalidNodeTypeDefException e) {
+            String msg = "Unable to deliver node type operation: " + e.getMessage();
+            log.error(msg);
+        } catch (RepositoryException e) {
+            String msg = "Unable to deliver node type operation: " + e.getMessage();
+            log.error(msg);
+        }
+    }
 
-        /**
-         * {@inheritDoc}
-         */
-        public void process(NodeId nodeId, boolean isDeep, String owner) {
-            LockEventListener listener = (LockEventListener) wspLockListeners.get(workspace);
+    /**
+     * Invoked when a record ends.
+     */
+    private void end() {
+        UpdateEventListener listener = null;
+        if (workspace != null) {
+            listener = (UpdateEventListener) wspUpdateListeners.get(workspace);
             if (listener == null) {
                 try {
-                    clusterContext.lockEventsReady(workspace);
+                    clusterContext.updateEventsReady(workspace);
                 } catch (RepositoryException e) {
-                    String msg = "Unable to make lock listener for workspace " +
+                    String msg = "Error making update listener for workspace " +
                             workspace + " online: " + e.getMessage();
                     log.warn(msg);
                 }
-                listener = (LockEventListener) wspLockListeners.get(workspace);
+                listener = (UpdateEventListener) wspUpdateListeners.get(workspace);
                 if (listener ==  null) {
-                    String msg = "Lock channel unavailable for workspace: " + workspace;
+                    String msg = "Update listener unavailable for workspace: " + workspace;
                     log.error(msg);
                     return;
                 }
             }
-            try {
-                listener.externalLock(nodeId, isDeep, owner);
-            } catch (RepositoryException e) {
-                String msg = "Unable to deliver lock event: " + e.getMessage();
+        } else {
+            if (versionUpdateListener != null) {
+                listener = versionUpdateListener;
+            } else {
+                String msg = "Version update listener unavailable.";
                 log.error(msg);
+                return;
             }
         }
+        try {
+            listener.externalUpdate(changeLog, events);
+        } catch (RepositoryException e) {
+            String msg = "Unable to deliver update events: " + e.getMessage();
+            log.error(msg);
+        }
+    }
+
+    //-------------------------------------------------------< RecordConsumer >
+
+    /**
+     * {@inheritDoc}
+     */
+    public String getId() {
+        return PRODUCER_ID;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public long getRevision() {
+        try {
+            return instanceRevision.get();
+        } catch (JournalException e) {
+            log.warn("Unable to return current revision.", e);
+            return Long.MAX_VALUE;
+        }
+    }
 
-        /**
-         * {@inheritDoc}
-         */
-        public void process(NodeId nodeId) {
-            LockEventListener listener = (LockEventListener) wspLockListeners.get(workspace);
-            if (listener == null) {
-                try {
-                    clusterContext.lockEventsReady(workspace);
-                } catch (RepositoryException e) {
-                    String msg = "Unable to make lock listener for workspace " +
-                            workspace + " online: " + e.getMessage();
-                    log.warn(msg);
+    /**
+     * {@inheritDoc}
+     */
+    public void consume(Record record) {
+        log.info("Processing revision: " + record.getRevision());
+
+        String workspace = null;
+
+        try {
+            workspace = record.readString();
+            start(workspace);
+
+            for (;;) {
+                char c = record.readChar();
+                if (c == '\0') {
+                    break;
                 }
-                listener = (LockEventListener) wspLockListeners.get(workspace);
-                if (listener ==  null) {
-                    String msg = "Lock channel unavailable for workspace: " + workspace;
-                    log.error(msg);
-                    return;
+                if (c == 'N') {
+                    NodeOperation operation = NodeOperation.create(record.readByte());
+                    operation.setId(record.readNodeId());
+                    process(operation);
+                } else if (c == 'P') {
+                    PropertyOperation operation = PropertyOperation.create(record.readByte());
+                    operation.setId(record.readPropertyId());
+                    process(operation);
+                } else if (c == 'E') {
+                    int type = record.readByte();
+                    NodeId parentId = record.readNodeId();
+                    Path parentPath = record.readPath();
+                    NodeId childId = record.readNodeId();
+                    Path.PathElement childRelPath = record.readPathElement();
+                    QName ntName = record.readQName();
+
+                    Set mixins = new HashSet();
+                    int mixinCount = record.readInt();
+                    for (int i = 0; i < mixinCount; i++) {
+                        mixins.add(record.readQName());
+                    }
+                    String userId = record.readString();
+                    process(createEventState(type, parentId, parentPath, childId,
+                            childRelPath, ntName, mixins, userId));
+                } else if (c == 'L') {
+                    NodeId nodeId = record.readNodeId();
+                    boolean isLock = record.readBoolean();
+                    if (isLock) {
+                        boolean isDeep = record.readBoolean();
+                        String owner = record.readString();
+                        process(nodeId, isDeep, owner);
+                    } else {
+                        process(nodeId);
+                    }
+                } else if (c == 'S') {
+                    String oldPrefix = record.readString();
+                    String newPrefix = record.readString();
+                    String uri = record.readString();
+                    process(oldPrefix, newPrefix, uri);
+                } else if (c == 'T') {
+                    int size = record.readInt();
+                    HashSet ntDefs = new HashSet();
+                    for (int i = 0; i < size; i++) {
+                        ntDefs.add(record.readNodeTypeDef());
+                    }
+                    process(ntDefs);
+                } else {
+                    throw new IllegalArgumentException("Unknown entry type: " + c);
                 }
             }
-            try {
-                listener.externalUnlock(nodeId);
-            } catch (RepositoryException e) {
-                String msg = "Unable to deliver lock event: " + e.getMessage();
-                log.error(msg);
-            }
+            end();
+
+        } catch (JournalException e) {
+            String msg = "Unable to read revision '" + record.getRevision() + "'.";
+            log.error(msg, e);
+        } catch (IllegalArgumentException e) {
+            String msg = "Error while processing revision " +
+                    record.getRevision() + ": " + e.getMessage();
+            log.error(msg);
         }
+    }
 
-        /**
-         * {@inheritDoc}
-         */
-        public void process(String oldPrefix, String newPrefix, String uri) {
-            if (namespaceListener == null) {
-                String msg = "Namespace listener unavailable.";
-                log.error(msg);
-                return;
-            }
-            try {
-                namespaceListener.externalRemap(oldPrefix, newPrefix, uri);
-            } catch (RepositoryException e) {
-                String msg = "Unable to deliver namespace operation: " + e.getMessage();
-                log.error(msg);
-            }
+    /**
+     * {@inheritDoc}
+     */
+    public void setRevision(long revision) {
+        try {
+            instanceRevision.set(revision);
+        } catch (JournalException e) {
+            log.warn("Unable to set current revision to " + revision + ".", e);
         }
+    }
 
-        /**
-         * {@inheritDoc}
-         */
-        public void process(Collection ntDefs) {
-            if (nodeTypeListener == null) {
-                String msg = "NodeType listener unavailable.";
-                log.error(msg);
-                return;
-            }
-            try {
-                nodeTypeListener.externalRegistered(ntDefs);
-            } catch (InvalidNodeTypeDefException e) {
-                String msg = "Unable to deliver node type operation: " + e.getMessage();
-                log.error(msg);
-            } catch (RepositoryException e) {
-                String msg = "Unable to deliver node type operation: " + e.getMessage();
-                log.error(msg);
-            }
+    /**
+     * Create an event state.
+     *
+     * @param type event type
+     * @param parentId parent id
+     * @param parentPath parent path
+     * @param childId child id
+     * @param childRelPath child relative path
+     * @param ntName ndoe type name
+     * @param userId user id
+     * @return event
+     */
+    private EventState createEventState(int type, NodeId parentId, Path parentPath,
+                                        NodeId childId, Path.PathElement childRelPath,
+                                        QName ntName, Set mixins, String userId) {
+        switch (type) {
+            case Event.NODE_ADDED:
+                return EventState.childNodeAdded(parentId, parentPath, childId, childRelPath,
+                        ntName, mixins, getOrCreateSession(userId), true);
+            case Event.NODE_REMOVED:
+                return EventState.childNodeRemoved(parentId, parentPath, childId, childRelPath,
+                        ntName, mixins, getOrCreateSession(userId), true);
+            case Event.PROPERTY_ADDED:
+                return EventState.propertyAdded(parentId, parentPath, childRelPath,
+                        ntName, mixins, getOrCreateSession(userId), true);
+            case Event.PROPERTY_CHANGED:
+                return EventState.propertyChanged(parentId, parentPath, childRelPath,
+                        ntName, mixins, getOrCreateSession(userId), true);
+            case Event.PROPERTY_REMOVED:
+                return EventState.propertyRemoved(parentId, parentPath, childRelPath,
+                        ntName, mixins, getOrCreateSession(userId), true);
+            default:
+                String msg = "Unexpected event type: " + type;
+                throw new IllegalArgumentException(msg);
         }
+    }
 
-        /**
-         * {@inheritDoc}
-         */
-        public void end() {
-            UpdateEventListener listener = null;
-            if (workspace != null) {
-                listener = (UpdateEventListener) wspUpdateListeners.get(workspace);
-                if (listener == null) {
-                    try {
-                        clusterContext.updateEventsReady(workspace);
-                    } catch (RepositoryException e) {
-                        String msg = "Error making update listener for workspace " +
-                                workspace + " online: " + e.getMessage();
-                        log.warn(msg);
-                    }
-                    listener = (UpdateEventListener) wspUpdateListeners.get(workspace);
-                    if (listener ==  null) {
-                        String msg = "Update listener unavailable for workspace: " + workspace;
-                        log.error(msg);
-                        return;
-                    }
-                }
+    /**
+     * Return a session matching a certain user id.
+     *
+     * @param userId user id
+     * @return session
+     */
+    private Session getOrCreateSession(String userId) {
+        if (lastSession == null || !lastSession.getUserID().equals(userId)) {
+            lastSession = new ClusterSession(userId);
+        }
+        return lastSession;
+    }
+
+    //-----------------------------------------------< Record writing methods >
+
+    private static void write(Record record, ChangeLog changeLog, EventStateCollection esc)
+            throws JournalException {
+
+        Iterator addedStates = changeLog.addedStates();
+        while (addedStates.hasNext()) {
+            ItemState state = (ItemState) addedStates.next();
+            if (state.isNode()) {
+                write(record, NodeAddedOperation.create((NodeState) state));
             } else {
-                if (versionUpdateListener != null) {
-                    listener = versionUpdateListener;
-                } else {
-                    String msg = "Version update listener unavailable.";
-                    log.error(msg);
-                    return;
-                }
+                write(record, PropertyAddedOperation.create((PropertyState) state));
             }
-            try {
-                listener.externalUpdate(changeLog, events);
-            } catch (RepositoryException e) {
-                String msg = "Unable to deliver update events: " + e.getMessage();
-                log.error(msg);
+        }
+        Iterator modifiedStates = changeLog.modifiedStates();
+        while (modifiedStates.hasNext()) {
+            ItemState state = (ItemState) modifiedStates.next();
+            if (state.isNode()) {
+                write(record, NodeModifiedOperation.create((NodeState) state));
+            } else {
+                write(record, PropertyModifiedOperation.create((PropertyState) state));
             }
+        }
+        Iterator deletedStates = changeLog.deletedStates();
+        while (deletedStates.hasNext()) {
+            ItemState state = (ItemState) deletedStates.next();
+            if (state.isNode()) {
+                write(record, NodeDeletedOperation.create((NodeState) state));
+            } else {
+                write(record, PropertyDeletedOperation.create((PropertyState) state));
+            }
+        }
+
+        Iterator events = esc.getEvents().iterator();
+        while (events.hasNext()) {
+            EventState event = (EventState) events.next();
+            write(record, event);
+        }
+    }
+
+    private static void write(Record record, String oldPrefix, String newPrefix, String uri)
+            throws JournalException {
+
+        record.writeChar('S');
+        record.writeString(oldPrefix);
+        record.writeString(newPrefix);
+        record.writeString(uri);
+    }
+
+    private static void write(Record record, NodeId nodeId, boolean isDeep, String owner)
+            throws JournalException {
+
+        write(record, nodeId, true, isDeep, owner);
+    }
+
+    private static void write(Record record, NodeId nodeId)
+            throws JournalException {
+
+        write(record, nodeId, false, false, null);
+    }
+
+    private static void write(Record record, Collection ntDefs)
+            throws JournalException {
+
+        record.writeChar('T');
+        record.writeInt(ntDefs.size());
+
+        Iterator iter = ntDefs.iterator();
+        while (iter.hasNext()) {
+            record.writeNodeTypeDef((NodeTypeDef) iter.next());
+        }
+    }
+
+    private static void write(Record record, PropertyOperation operation)
+            throws JournalException {
+
+        record.writeChar('P');
+        record.writeByte(operation.getOperationType());
+        record.writePropertyId(operation.getId());
+    }
+
+    private static void write(Record record, NodeOperation operation)
+            throws JournalException {
+
+        record.writeChar('N');
+        record.writeByte(operation.getOperationType());
+        record.writeNodeId(operation.getId());
+    }
+
+    /**
+     * Log an event. Subclass responsibility.
+     *
+     * @param event event to log
+     */
+    private static void write(Record record, EventState event)
+            throws JournalException {
+
+        record.writeChar('E');
+        record.writeByte(event.getType());
+        record.writeNodeId(event.getParentId());
+        record.writePath(event.getParentPath());
+        record.writeNodeId(event.getChildId());
+        record.writePathElement(event.getChildRelPath());
+        record.writeQName(event.getNodeType());
+
+        Set mixins = event.getMixinNames();
+        record.writeInt(mixins.size());
+        Iterator iter = mixins.iterator();
+        while (iter.hasNext()) {
+            record.writeQName((QName) iter.next());
+        }
+        record.writeString(event.getUserId());
+    }
+
+    private static void write(Record record, NodeId nodeId, boolean isLock,
+                              boolean isDeep, String owner)
+            throws JournalException {
+
+        record.writeChar('L');
+        record.writeNodeId(nodeId);
+        record.writeBoolean(isLock);
+        if (isLock) {
+            record.writeBoolean(isDeep);
+            record.writeString(owner);
         }
     }
 }

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/ClusterConfig.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/ClusterConfig.java?view=diff&rev=509624&r1=509623&r2=509624
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/ClusterConfig.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/ClusterConfig.java Tue Feb 20 08:06:42 2007
@@ -30,7 +30,7 @@
     /**
      * Sync delay.
      */
-    private final int syncDelay;
+    private final long syncDelay;
 
     /**
      * Journal configuration.
@@ -41,9 +41,10 @@
      * Creates a new cluster configuration.
      *
      * @param id custom cluster node id
+     * @param syncDelay syncDelay, in milliseconds
      * @param jc journal configuration
      */
-    public ClusterConfig(String id, int syncDelay, JournalConfig jc) {
+    public ClusterConfig(String id, long syncDelay, JournalConfig jc) {
         this.id = id;
         this.syncDelay = syncDelay;
         this.jc = jc;
@@ -63,7 +64,7 @@
      *
      * @return syncDelay
      */
-    public int getSyncDelay() {
+    public long getSyncDelay() {
         return syncDelay;
     }
 

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java?view=diff&rev=509624&r1=509623&r2=509624
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java Tue Feb 20 08:06:42 2007
@@ -115,6 +115,9 @@
     /** Name of the clustered configuration attribute. */
     public static final String CLUSTERED_ATTRIBUTE = "clustered";
 
+    /** Default synchronization delay, in milliseconds. */
+    public static final String DEFAULT_SYNC_DELAY = "5000";
+
     /**
      * Creates a new configuration parser with the given parser variables.
      *
@@ -478,8 +481,8 @@
                 Element element = (Element) child;
 
                 String id = getAttribute(element, ID_ATTRIBUTE, null);
-                int syncDelay = Integer.parseInt(
-                        getAttribute(element, SYNC_DELAY_ATTRIBUTE, "5"));
+                long syncDelay = Long.parseLong(
+                        getAttribute(element, SYNC_DELAY_ATTRIBUTE, DEFAULT_SYNC_DELAY));
 
                 JournalConfig jc = parseJournalConfig(element);
                 return new ClusterConfig(id, syncDelay, jc);

Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java?view=auto&rev=509624
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java (added)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java Tue Feb 20 08:06:42 2007
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.journal;
+
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.io.File;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.jackrabbit.name.NamespaceResolver;
+
+/**
+ * Base implementation for a journal.
+ */
+public abstract class AbstractJournal implements Journal {
+
+    /**
+     * Logger.
+     */
+    private static Logger log = LoggerFactory.getLogger(AbstractJournal.class);
+
+    /**
+     * Journal id.
+     */
+    private String id;
+
+    /**
+     * Namespace resolver.
+     */
+    private NamespaceResolver resolver;
+
+    /**
+     * Map of registered consumers.
+     */
+    private final Map consumers = new HashMap();
+
+    /**
+     * Map of registered producers.
+     */
+    private final Map producers = new HashMap();
+
+    /**
+     * Read
+     */
+    private final ReadWriteLock rwLock = new WriterPreferenceReadWriteLock();
+
+    /**
+     * {@inheritDoc}
+     */
+    public void init(String id, NamespaceResolver resolver) throws JournalException {
+        this.id = id;
+        this.resolver = resolver;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void register(RecordConsumer consumer) throws JournalException {
+        synchronized (consumers) {
+            String consumerId = consumer.getId();
+            if (consumers.containsKey(consumerId)) {
+                String msg = "Record consumer with identifier '" +
+                        consumerId + "' already registered.";
+                throw new JournalException(msg);
+            }
+            consumers.put(consumerId, consumer);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean unregister(RecordConsumer consumer) {
+        synchronized (consumers) {
+            String consumerId = consumer.getId();
+            return consumers.remove(consumerId) != null;
+        }
+    }
+
+    /**
+     * Return the consumer given its identifier.
+     *
+     * @param identifier identifier
+     * @return consumer associated with identifier;
+     *         <code>null</code> if no consumer is associated with identifier
+     */
+    public RecordConsumer getConsumer(String identifier) {
+        synchronized (consumers) {
+            return (RecordConsumer) consumers.get(identifier);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public RecordProducer getProducer(String identifier) {
+        synchronized (producers) {
+            RecordProducer producer = (RecordProducer) producers.get(identifier);
+            if (producer == null) {
+                producer = createProducer(identifier);
+                producers.put(identifier, producer);
+            }
+            return producer;
+        }
+    }
+
+    /**
+     * Create the record producer for a given identifier. May be overridden
+     * by subclasses.
+     *
+     * @param identifier producer identifier
+     */
+    protected RecordProducer createProducer(String identifier) {
+        return new DefaultRecordProducer(this, identifier);
+    }
+
+    /**
+     * Return the minimal revision of all registered consumers.
+     */
+    private long getMinimalRevision() {
+        long minimalRevision = Long.MAX_VALUE;
+
+        synchronized (consumers) {
+            Iterator iter = consumers.values().iterator();
+            while (iter.hasNext()) {
+                RecordConsumer consumer = (RecordConsumer) iter.next();
+                if (consumer.getRevision() < minimalRevision) {
+                    minimalRevision = consumer.getRevision();
+                }
+            }
+        }
+        return minimalRevision;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void sync() throws JournalException {
+        try {
+            rwLock.readLock().acquire();
+        } catch (InterruptedException e) {
+            String msg = "Unable to acquire read lock.";
+            throw new JournalException(msg, e);
+        }
+        try {
+            doSync(getMinimalRevision());
+        } finally {
+            rwLock.readLock().release();
+        }
+    }
+
+    /**
+     * Synchronize contents from journal. May be overridden by subclasses.
+     *
+     * @param startRevision start point (exlusive)
+     * @throws JournalException if an error occurs
+     */
+    protected void doSync(long startRevision) throws JournalException {
+        RecordIterator iterator = getRecords(startRevision);
+        long stopRevision = Long.MIN_VALUE;
+
+        try {
+            while (iterator.hasNext()) {
+                Record record = iterator.nextRecord();
+                if (record.getJournalId().equals(id)) {
+                    log.info("Record with revision '" + record.getRevision() +
+                            "' created by this journal, skipped.");
+                } else {
+                    RecordConsumer consumer = getConsumer(record.getProducerId());
+                    if (consumer != null) {
+                        consumer.consume(record);
+                    }
+                }
+                stopRevision = record.getRevision();
+            }
+        } finally {
+            iterator.close();
+        }
+
+        if (stopRevision > 0) {
+            Iterator iter = consumers.values().iterator();
+            while (iter.hasNext()) {
+                RecordConsumer consumer = (RecordConsumer) iter.next();
+                consumer.setRevision(stopRevision);
+            }
+            log.info("Synchronized to revision: " + stopRevision);
+        }
+    }
+
+    /**
+     * Return an iterator over all records after the specified revision.
+     *
+     * @param startRevision start point (exlusive)
+     * @throws JournalException if an error occurs
+     */
+    protected abstract RecordIterator getRecords(long startRevision)
+            throws JournalException;
+
+    /**
+     * Lock the journal revision, disallowing changes from other sources until
+     * {@link #unlock has been called, and synchronizes to the latest change.
+     *
+     * @throws JournalException if an error occurs
+     */
+    public void lockAndSync() throws JournalException {
+        try {
+            rwLock.writeLock().acquire();
+        } catch (InterruptedException e) {
+            String msg = "Unable to acquire write lock.";
+            throw new JournalException(msg, e);
+        }
+
+        boolean succeeded = false;
+
+        try {
+            // lock
+            doLock();
+            try {
+                // and sync
+                doSync(getMinimalRevision());
+                succeeded = true;
+            } finally {
+                if (!succeeded) {
+                    doUnlock(false);
+                }
+            }
+        } finally {
+            if (!succeeded) {
+                rwLock.writeLock().release();
+            }
+        }
+    }
+
+    /**
+     * Unlock the journal revision.
+     *
+     * @param successful flag indicating whether the update process was
+     *                   successful
+     */
+    public void unlock(boolean successful) {
+        doUnlock(successful);
+
+        rwLock.writeLock().release();
+    }
+
+    /**
+     * Lock the journal revision. Subclass responsibility.
+     *
+     * @throws JournalException if an error occurs
+     */
+    protected abstract void doLock() throws JournalException;
+
+    /**
+     * Append a record backed by a file. Subclass responsibility.
+     *
+     * @param producerId producer identifier
+     * @return the new record's revision
+     *
+     * @throws JournalException if an error occurs
+     */
+    protected abstract long append(String producerId, File file)
+            throws JournalException;
+
+    /**
+     * Unlock the journal revision. Subclass responsibility.
+     *
+     * @param successful flag indicating whether the update process was
+     *                   successful
+     */
+    protected abstract void doUnlock(boolean successful);
+
+    /**
+     * Return this journal's identifier.
+     *
+     * @return journal identifier
+     */
+    public String getId() {
+        return id;
+    }
+
+    /**
+     * Return this journal's namespace resolver.
+     *
+     * @return namespace resolver
+     */
+    public NamespaceResolver getResolver() {
+        return resolver;
+    }
+}
\ No newline at end of file

Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractRecord.java?view=auto&rev=509624
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractRecord.java (added)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractRecord.java Tue Feb 20 08:06:42 2007
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.journal;
+
+import org.apache.jackrabbit.name.NamespaceResolver;
+import org.apache.jackrabbit.name.QName;
+import org.apache.jackrabbit.name.NameFormat;
+import org.apache.jackrabbit.name.NoPrefixDeclaredException;
+import org.apache.jackrabbit.name.Path;
+import org.apache.jackrabbit.name.PathFormat;
+import org.apache.jackrabbit.name.UnknownPrefixException;
+import org.apache.jackrabbit.name.IllegalNameException;
+import org.apache.jackrabbit.name.MalformedPathException;
+import org.apache.jackrabbit.core.NodeId;
+import org.apache.jackrabbit.core.PropertyId;
+import org.apache.jackrabbit.core.nodetype.NodeTypeDef;
+import org.apache.jackrabbit.core.nodetype.compact.CompactNodeTypeDefWriter;
+import org.apache.jackrabbit.core.nodetype.compact.CompactNodeTypeDefReader;
+import org.apache.jackrabbit.core.nodetype.compact.ParseException;
+import org.apache.jackrabbit.uuid.Constants;
+import org.apache.jackrabbit.uuid.UUID;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.io.StringWriter;
+import java.io.IOException;
+import java.io.StringReader;
+
+/**
+ * Base implementation for a record.
+ */
+public abstract class AbstractRecord implements Record {
+
+    /**
+     * Indicator for a literal UUID.
+     */
+    private static final byte UUID_LITERAL = 'L';
+
+    /**
+     * Indicator for a UUID index.
+     */
+    private static final byte UUID_INDEX = 'I';
+
+    /**
+     * UUID index.
+     */
+    private final ArrayList uuidIndex = new ArrayList();
+
+    /**
+     * Namespace resolver.
+     */
+    protected final NamespaceResolver resolver;
+
+    /**
+     * Create a new instance of this class.
+     */
+    public AbstractRecord(NamespaceResolver resolver) {
+        this.resolver = resolver;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void writeQName(QName name) throws JournalException {
+        try {
+            writeString(NameFormat.format(name, resolver));
+        } catch (NoPrefixDeclaredException e) {
+            String msg = "Undeclared prefix error while writing name.";
+            throw new JournalException(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void writePathElement(Path.PathElement element) throws JournalException {
+        writeQName(element.getName());
+        writeInt(element.getIndex());
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void writePath(Path path) throws JournalException {
+        try {
+            writeString(PathFormat.format(path, resolver));
+        } catch (NoPrefixDeclaredException e) {
+            String msg = "Undeclared prefix error while writing path.";
+            throw new JournalException(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void writeNodeId(NodeId nodeId) throws JournalException {
+        if (nodeId == null) {
+            writeByte(UUID_INDEX);
+            writeInt(-1);
+        } else {
+            int index = getOrCreateIndex(nodeId);
+            if (index != -1) {
+                writeByte(UUID_INDEX);
+                writeInt(index);
+            } else {
+                writeByte(UUID_LITERAL);
+                write(nodeId.getUUID().getRawBytes());
+            }
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void writePropertyId(PropertyId propertyId) throws JournalException {
+        writeNodeId(propertyId.getParentId());
+        writeQName(propertyId.getName());
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void writeNodeTypeDef(NodeTypeDef ntd) throws JournalException {
+        try {
+            StringWriter sw = new StringWriter();
+            CompactNodeTypeDefWriter writer = new CompactNodeTypeDefWriter(sw, resolver, true);
+            writer.write(ntd);
+            writer.close();
+
+            writeString(sw.toString());
+        } catch (IOException e) {
+            String msg = "I/O error while writing node type definition.";
+            throw new JournalException(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public QName readQName() throws JournalException {
+        try {
+            return NameFormat.parse(readString(), resolver);
+        } catch (UnknownPrefixException e) {
+            String msg = "Unknown prefix error while reading name.";
+            throw new JournalException(msg, e);
+        } catch (IllegalNameException e) {
+            String msg = "Illegal name error while reading name.";
+            throw new JournalException(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public Path.PathElement readPathElement() throws JournalException {
+        try {
+            QName name = NameFormat.parse(readString(), resolver);
+            int index = readInt();
+            if (index != 0) {
+                return Path.PathElement.create(name, index);
+            } else {
+                return Path.PathElement.create(name);
+            }
+        } catch (UnknownPrefixException e) {
+            String msg = "Unknown prefix error while reading path element.";
+            throw new JournalException(msg, e);
+        } catch (IllegalNameException e) {
+            String msg = "Illegal name error while reading path element.";
+            throw new JournalException(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public Path readPath() throws JournalException {
+        try {
+            return PathFormat.parse(readString(), resolver);
+        } catch (MalformedPathException e) {
+            String msg = "Malformed path error while reading path.";
+            throw new JournalException(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public NodeId readNodeId() throws JournalException {
+        byte uuidType = readByte();
+        if (uuidType == UUID_INDEX) {
+            int index = readInt();
+            if (index == -1) {
+                return null;
+            } else {
+                return (NodeId) uuidIndex.get(index);
+            }
+        } else if (uuidType == UUID_LITERAL) {
+            byte[] b = new byte[Constants.UUID_BYTE_LENGTH];
+            readFully(b);
+            NodeId nodeId = new NodeId(new UUID(b));
+            uuidIndex.add(nodeId);
+            return nodeId;
+        } else {
+            String msg = "Unknown UUID type found: " + uuidType;
+            throw new JournalException(msg);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public PropertyId readPropertyId() throws JournalException {
+        return new PropertyId(readNodeId(), readQName());
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public NodeTypeDef readNodeTypeDef() throws JournalException {
+        try {
+            StringReader sr = new StringReader(readString());
+
+            CompactNodeTypeDefReader reader = new CompactNodeTypeDefReader(sr, "(internal)");
+            List ntds = reader.getNodeTypeDefs();
+            if (ntds.size() != 1) {
+                throw new JournalException("Expected one node type definition: got " + ntds.size());
+            }
+            return (NodeTypeDef) ntds.get(0);
+        } catch (ParseException e) {
+            String msg = "Parse error while reading node type definition.";
+            throw new JournalException(msg, e);
+        }
+    }
+
+    /**
+     * Get a <code>NodeId</code>'s existing cache index, creating a new entry if necesary.
+     *
+     * @param nodeId nodeId to lookup
+     * @return cache index of existing entry or <code>-1</code> to indicate the entry was added
+     */
+    private int getOrCreateIndex(NodeId nodeId) {
+        int index = uuidIndex.indexOf(nodeId);
+        if (index == -1) {
+            uuidIndex.add(nodeId);
+        }
+        return index;
+    }
+}
\ No newline at end of file

Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java?view=auto&rev=509624
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java (added)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java Tue Feb 20 08:06:42 2007
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.journal;
+
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Default temporary record used for appending to some journal.
+ */
+public class AppendRecord extends AbstractRecord {
+
+    /**
+     * Logger.
+     */
+    private static Logger log = LoggerFactory.getLogger(AppendRecord.class);
+
+    /**
+     * Default prefix for appended records in the file system.
+     */
+    private static final String DEFAULT_PREFIX = "journal";
+
+    /**
+     * Default extension for appended records in the file system.
+     */
+    private static final String DEFAULT_EXT = ".tmp";
+
+    /**
+     * Journal where record is being appended.
+     */
+    private final AbstractJournal journal;
+
+    /**
+     * Producer identifier.
+     */
+    private final String producerId;
+
+    /**
+     * This record's revision.
+     */
+    private long revision;
+
+    /**
+     * Underlying file.
+     */
+    private File file;
+
+    /**
+     * Underlying data output.
+     */
+    private DataOutputStream dataOut;
+
+    /**
+     * Create a new instance of this class.
+     *
+     * @param journal journal where record is being appended
+     * @param producerId producer identifier
+     */
+    public AppendRecord(AbstractJournal journal, String producerId) {
+        super(journal.getResolver());
+
+        this.journal = journal;
+        this.producerId = producerId;
+        this.revision = 0L;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public String getJournalId() {
+        return journal.getId();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public String getProducerId() {
+        return producerId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public long getRevision() {
+        return revision;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void writeByte(int n) throws JournalException {
+        open();
+
+        try {
+            dataOut.writeByte(n);
+        } catch (IOException e) {
+            String msg = "I/O error while writing byte.";
+            throw new JournalException(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void writeChar(char c) throws JournalException {
+        open();
+
+        try {
+            dataOut.writeChar(c);
+        } catch (IOException e) {
+            String msg = "I/O error while writing character.";
+            throw new JournalException(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void writeBoolean(boolean b) throws JournalException {
+        open();
+
+        try {
+            dataOut.writeBoolean(b);
+        } catch (IOException e) {
+            String msg = "I/O error while writing boolean.";
+            throw new JournalException(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void writeInt(int n) throws JournalException {
+        open();
+
+        try {
+            dataOut.writeInt(n);
+        } catch (IOException e) {
+            String msg = "I/O error while writing integer.";
+            throw new JournalException(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void writeString(String s) throws JournalException {
+        open();
+
+        try {
+            if (s == null) {
+                dataOut.writeBoolean(true);
+            } else {
+                dataOut.writeBoolean(false);
+                dataOut.writeUTF(s);
+            }
+        } catch (IOException e) {
+            String msg = "I/O error while writing string.";
+            throw new JournalException(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void write(byte[] b) throws JournalException {
+        open();
+
+        try {
+            dataOut.write(b);
+        } catch (IOException e) {
+            String msg = "I/O error while writing a byte array.";
+            throw new JournalException(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void update() throws JournalException {
+        boolean succeeded = false;
+
+        try {
+            close();
+            revision = journal.append(producerId, file);
+            succeeded = true;
+        } finally {
+            dispose();
+
+            journal.unlock(succeeded);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void cancelUpdate() {
+        if (dataOut != null) {
+            dispose();
+            
+            journal.unlock(false);
+        }
+    }
+
+    /**
+     * Create temporary file and open data output on it.
+     *
+     * @throws JournalException
+     */
+    private void open() throws JournalException {
+        if (file == null) {
+            try {
+                file = File.createTempFile(DEFAULT_PREFIX, DEFAULT_EXT);
+                dataOut = new DataOutputStream(new FileOutputStream(file));
+            } catch (IOException e) {
+                String msg = "Unable to create temporary file.";
+                throw new JournalException(msg, e);
+            }
+        }
+    }
+
+    /**
+     * Close this record, keeping the underlying file.
+     *
+     * @throws JournalException if an error occurs
+     */
+    private void close() throws JournalException {
+        if (dataOut != null) {
+            try {
+                dataOut.close();
+            } catch (IOException e) {
+                String msg = "I/O error while closing stream.";
+                throw new JournalException(msg, e);
+            } finally {
+                dataOut = null;
+            }
+        }
+    }
+
+    /**
+     * Dispose this record, deleting the underlying file.
+     */
+    private void dispose() {
+        if (dataOut != null) {
+            try {
+                dataOut.close();
+            } catch (IOException e) {
+                String msg = "I/O error while closing stream.";
+                log.warn(msg, e);
+            } finally {
+                dataOut = null;
+            }
+        }
+        if (file != null) {
+            file.delete();
+            file = null;
+        }
+    }
+
+    /**
+     * Unsupported methods when appending.
+     */
+    public byte readByte() throws JournalException {
+        throw unsupported();
+    }
+
+    public char readChar() throws JournalException {
+        throw unsupported();
+    }
+
+    public boolean readBoolean() throws JournalException {
+        throw unsupported();
+    }
+
+    public int readInt() throws JournalException {
+        throw unsupported();
+    }
+
+    public String readString() throws JournalException {
+        throw unsupported();
+    }
+
+    public void readFully(byte[] b) throws JournalException {
+        throw unsupported();
+    }
+
+    private JournalException unsupported() {
+        String msg = "Reading from an appended record is not supported.";
+        return new JournalException(msg);
+    }
+}



Mime
View raw message