jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dpfis...@apache.org
Subject svn commit: r471760 [1/3] - in /jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core: ./ cluster/ config/ lock/ nodetype/virtual/ observation/ state/ version/
Date Mon, 06 Nov 2006 15:22:31 GMT
Author: dpfister
Date: Mon Nov  6 07:22:29 2006
New Revision: 471760

URL: http://svn.apache.org/viewvc?view=rev&rev=471760
Log:
JCR-623 Clustering

Added:
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterContext.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterException.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterSession.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRevision.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ItemOperation.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/Journal.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/JournalException.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/LockEventChannel.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/LockEventListener.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/NodeAddedOperation.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/NodeDeletedOperation.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/NodeModifiedOperation.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/NodeOperation.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/PropertyAddedOperation.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/PropertyDeletedOperation.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/PropertyModifiedOperation.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/PropertyOperation.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/RecordProcessor.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/UpdateEventChannel.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/UpdateEventListener.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/config/ClusterConfig.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/config/JournalConfig.java
Modified:
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/config/RepositoryConfig.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/config/config.dtd
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/lock/LockManagerImpl.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/nodetype/virtual/VirtualNodeTypeStateManager.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/observation/EventState.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/observation/EventStateCollection.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/version/VersionManagerImpl.java

Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java?view=diff&rev=471760&r1=471759&r2=471760
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java (original)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java Mon Nov  6 07:22:29 2006
@@ -29,6 +29,7 @@
 import org.apache.jackrabbit.core.config.RepositoryConfig;
 import org.apache.jackrabbit.core.config.VersioningConfig;
 import org.apache.jackrabbit.core.config.WorkspaceConfig;
+import org.apache.jackrabbit.core.config.ClusterConfig;
 import org.apache.jackrabbit.core.fs.BasedFileSystem;
 import org.apache.jackrabbit.core.fs.FileSystem;
 import org.apache.jackrabbit.core.fs.FileSystemException;
@@ -39,16 +40,25 @@
 import org.apache.jackrabbit.core.nodetype.virtual.VirtualNodeTypeStateManager;
 import org.apache.jackrabbit.core.observation.DelegatingObservationDispatcher;
 import org.apache.jackrabbit.core.observation.ObservationDispatcher;
+import org.apache.jackrabbit.core.observation.EventStateCollection;
 import org.apache.jackrabbit.core.security.AuthContext;
 import org.apache.jackrabbit.core.state.ItemStateException;
 import org.apache.jackrabbit.core.persistence.PMContext;
 import org.apache.jackrabbit.core.persistence.PersistenceManager;
 import org.apache.jackrabbit.core.state.SharedItemStateManager;
+import org.apache.jackrabbit.core.state.ChangeLog;
 import org.apache.jackrabbit.core.version.VersionManager;
 import org.apache.jackrabbit.core.version.VersionManagerImpl;
+import org.apache.jackrabbit.core.cluster.ClusterNode;
+import org.apache.jackrabbit.core.cluster.ClusterException;
+import org.apache.jackrabbit.core.cluster.ClusterContext;
+import org.apache.jackrabbit.core.cluster.LockEventChannel;
+import org.apache.jackrabbit.core.cluster.UpdateEventChannel;
+import org.apache.jackrabbit.core.cluster.UpdateEventListener;
 import org.apache.jackrabbit.name.NoPrefixDeclaredException;
 import org.apache.jackrabbit.name.QName;
 import org.apache.jackrabbit.name.NameFormat;
+import org.apache.jackrabbit.name.NamespaceResolver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xml.sax.InputSource;
@@ -82,6 +92,7 @@
 import java.util.Iterator;
 import java.util.Properties;
 import java.util.Set;
+import java.util.List;
 
 /**
  * A <code>RepositoryImpl</code> ...
@@ -183,6 +194,11 @@
     private FileLock repLock;
 
     /**
+     * Clustered node used, <code>null</code> if clustering is not configured.
+     */
+    private ClusterNode clusterNode;
+
+    /**
      * Shutdown lock for guaranteeing that no new sessions are started during
      * repository shutdown and that a repository shutdown is not initiated
      * during a login. Each session login acquires a read lock while the
@@ -240,6 +256,13 @@
             wspInfos.put(config.getName(), info);
         }
 
+        // initialize optional clustering
+        // put here before setting up any other external event source that a cluster node
+        // will be interested in
+        if (repConfig.getClusterConfig() != null) {
+            clusterNode = createClusterNode();
+        }
+
         // init version manager
         vMgr = createVersionManager(repConfig.getVersioningConfig(),
                 delegatingDispatcher);
@@ -278,10 +301,22 @@
         // after the workspace is initialized we pass a system session to
         // the virtual node type manager
 
-        // todo FIXME it seems odd that the *global* virtual node type manager
+        // todo FIXME it  odd that the *global* virtual node type manager
         // is using a session that is bound to a single specific workspace
         virtNTMgr.setSession(getSystemSession(repConfig.getDefaultWorkspaceName()));
 
+        // now start cluster node as last step
+        if (clusterNode != null) {
+            try {
+                clusterNode.start();
+            } catch (ClusterException e) {
+                String msg = "Unable to start clustered node, forcing shutdown...";
+                log.error(msg, e);
+                shutdown();
+                throw new RepositoryException(msg, e);
+            }
+        }
+
         log.info("Repository started");
     }
 
@@ -295,6 +330,8 @@
     protected VersionManager createVersionManager(VersioningConfig vConfig,
                                                   DelegatingObservationDispatcher delegatingDispatcher)
             throws RepositoryException {
+
+
         FileSystem fs = vConfig.getFileSystemConfig().createFileSystem();
         PersistenceManager pm = createPersistenceManager(vConfig.getHomeDir(),
                 fs,
@@ -302,8 +339,13 @@
                 rootNodeId,
                 nsReg,
                 ntReg);
-        return new VersionManagerImpl(pm, fs, ntReg, delegatingDispatcher,
+
+        VersionManagerImpl vMgr = new VersionManagerImpl(pm, fs, ntReg, delegatingDispatcher,
                 VERSION_STORAGE_NODE_ID, SYSTEM_ROOT_NODE_ID);
+        if (clusterNode != null) {
+            vMgr.setEventChannel(clusterNode.createUpdateChannel());
+        }
+        return vMgr;
     }
 
     /**
@@ -582,6 +624,21 @@
         return systemSearchMgr;
     }
 
+    /**
+     * Creates the cluster node.
+     *
+     * @return clustered node
+     */
+    private ClusterNode createClusterNode() throws RepositoryException {
+        try {
+            ClusterNode clusterNode = new ClusterNode();
+            clusterNode.init(new ExternalEventListener());
+            return clusterNode;
+        } catch (Exception e) {
+            throw new RepositoryException(e);
+        }
+    }
+
     NamespaceRegistryImpl getNamespaceRegistry() {
         // check sanity of this instance
         sanityCheck();
@@ -878,6 +935,11 @@
     private synchronized void doShutdown() {
         log.info("Shutting down repository...");
 
+        // stop optional cluster node
+        if (clusterNode != null) {
+            clusterNode.stop();
+        }
+
         // close active user sessions
         // (copy sessions to array to avoid ConcurrentModificationException;
         // manually copy entries rather than calling ReferenceMap#toArray() in
@@ -1288,7 +1350,7 @@
      * representing the same named workspace, i.e. the same physical
      * storage.
      */
-    protected class WorkspaceInfo {
+    protected class WorkspaceInfo implements UpdateEventListener {
 
         /**
          * workspace configuration (passed in constructor)
@@ -1352,6 +1414,16 @@
         private final Mutex xaLock = new Mutex();
 
         /**
+         * Update event channel, used in clustered environment.
+         */
+        private UpdateEventChannel updateChannel;
+
+        /**
+         * Lock event channel, used in clustered environment.
+         */
+        private LockEventChannel lockChannel;
+
+        /**
          * Creates a new <code>WorkspaceInfo</code> based on the given
          * <code>config</code>.
          *
@@ -1535,6 +1607,10 @@
                 // 'chicken & egg' bootstrap problems
                 if (lockMgr == null) {
                     lockMgr = new LockManagerImpl(getSystemSession(), fs);
+                    if (clusterNode != null) {
+                        lockChannel = clusterNode.createLockChannel(getName());
+                        lockMgr.setEventChannel(lockChannel);
+                    }
                 }
                 return lockMgr;
             }
@@ -1636,6 +1712,11 @@
                     } catch (Exception e) {
                         log.error("Unable to add vmgr: " + e.toString(), e);
                     }
+                    if (clusterNode != null) {
+                        updateChannel = clusterNode.createUpdateChannel(getName());
+                        itemStateMgr.setEventChannel(updateChannel);
+                        updateChannel.setListener(this);
+                    }
                 } catch (ItemStateException ise) {
                     String msg = "failed to instantiate shared item state manager";
                     log.debug(msg);
@@ -1709,6 +1790,14 @@
 
                 log.info("shutting down workspace '" + getName() + "'...");
 
+                // inform cluster node about disposal
+                if (updateChannel != null) {
+                    updateChannel.setListener(null);
+                }
+                if (lockChannel != null) {
+                    lockChannel.setListener(null);
+                }
+
                 // deregister the observation factory of that workspace
                 delegatingDispatcher.removeDispatcher(dispatcher);
 
@@ -1792,6 +1881,25 @@
         void lockRelease() {
             xaLock.release();
         }
+
+        //----------------------------------------------< UpdateEventListener >
+
+        /**
+         * {@inheritDoc}
+         */
+        public void externalUpdate(ChangeLog external, List events) throws RepositoryException {
+            try {
+                EventStateCollection esc = new EventStateCollection(
+                        getObservationDispatcher(), null, null);
+                esc.addAll(events);
+
+                getItemStateProvider().externalUpdate(external, esc);
+            } catch (IllegalStateException e) {
+                String msg = "Unable to deliver events: " + e.getMessage();
+                throw new RepositoryException(msg);
+            }
+        }
+
     }
 
     /**
@@ -1877,4 +1985,39 @@
         }
     }
 
+    /**
+     * Cluster context passed to a <code>ClusterNode</code>.
+     */
+    class ExternalEventListener implements ClusterContext {
+
+        /**
+         * {@inheritDoc}
+         */
+        public ClusterConfig getClusterConfig() {
+            return getConfig().getClusterConfig();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public NamespaceResolver getNamespaceResovler() {
+            return getNamespaceRegistry();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void updateEventsReady(String workspace) throws RepositoryException {
+            // toggle the initialization of some workspace
+            getWorkspaceInfo(workspace);
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void lockEventsReady(String workspace) throws RepositoryException {
+            // toggle the initialization of some workspace's lock manager
+            getWorkspaceInfo(workspace).getLockManager();
+        }
+    }
 }

Added: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterContext.java?view=auto&rev=471760
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterContext.java (added)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterContext.java Mon Nov  6 07:22:29 2006
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.cluster;
+
+import org.apache.jackrabbit.core.config.ClusterConfig;
+import org.apache.jackrabbit.name.NamespaceResolver;
+
+import javax.jcr.RepositoryException;
+
+/**
+ * Initial interface passed to a <code>ClusterNode</code>.
+ */
+public interface ClusterContext {
+
+    /**
+     * Return the cluster configuration.
+     *
+     * @return cluster configuration
+     */
+    public ClusterConfig getClusterConfig();
+
+    /**
+     * Return a namespace resolver to map prefixes to URIs and vice-versa
+     *
+     * @return namespace resolver
+     */
+    public NamespaceResolver getNamespaceResovler();
+
+    /**
+     * Notifies the cluster context that some workspace update events are available
+     * and that it should start up a listener to receive them.
+     *
+     * @param workspace workspace name
+     * @throws RepositoryException if the context is unable to provide the listener
+     */
+    public void updateEventsReady(String workspace) throws RepositoryException;
+
+    /**
+     * Notifies the cluster context that some workspace lock events are available
+     * and that it should start up a listener to receive them.
+     *
+     * @param workspace workspace name
+     * @throws RepositoryException if the context is unable to provide the listener
+     */
+    public void lockEventsReady(String workspace) throws RepositoryException;
+}

Added: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterException.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterException.java?view=auto&rev=471760
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterException.java (added)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterException.java Mon Nov  6 07:22:29 2006
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.cluster;
+
+import org.apache.jackrabbit.BaseException;
+
+/**
+ * The <code>ClusterException</code> signals an error within a cluster operation.
+ */
+public class ClusterException extends BaseException {
+
+    /**
+     * Constructs a new instance of this class with the specified detail
+     * message.
+     *
+     * @param message the detail message. The detail message is saved for
+     *                later retrieval by the {@link #getMessage()} method.
+     */
+    public ClusterException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs a new instance of this class with the specified detail
+     * message and root cause.
+     *
+     * @param message   the detail message. The detail message is saved for
+     *                  later retrieval by the {@link #getMessage()} method.
+     * @param rootCause root failure cause
+     */
+    public ClusterException(String message, Throwable rootCause) {
+        super(message, rootCause);
+    }
+}

Added: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java?view=auto&rev=471760
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java (added)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java Mon Nov  6 07:22:29 2006
@@ -0,0 +1,678 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.cluster;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.jackrabbit.core.config.ClusterConfig;
+import org.apache.jackrabbit.core.config.ConfigurationException;
+import org.apache.jackrabbit.core.NodeId;
+import org.apache.jackrabbit.core.observation.EventState;
+import org.apache.jackrabbit.core.observation.EventStateCollection;
+import org.apache.jackrabbit.core.state.ChangeLog;
+import org.apache.jackrabbit.name.Path;
+import org.apache.jackrabbit.name.QName;
+import EDU.oswego.cs.dl.util.concurrent.Mutex;
+
+import javax.jcr.observation.Event;
+import javax.jcr.Session;
+import javax.jcr.RepositoryException;
+import java.util.Set;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Default clustered node implementation.
+ */
+public class ClusterNode implements Runnable, UpdateEventChannel  {
+
+    /**
+     * System property specifying a node id to use.
+     */
+    public static final String SYSTEM_PROPERTY_NODE_ID = "org.apache.jackrabbit.core.cluster.node_id";
+
+    /**
+     * Used for padding short string representations.
+     */
+    private static final String SHORT_PADDING = "0000";
+
+    /**
+     * Logger.
+     */
+    private static Logger log = LoggerFactory.getLogger(ClusterNode.class);
+
+    /**
+     * Cluster context.
+     */
+    private ClusterContext clusterContext;
+
+    /**
+     * Cluster node id.
+     */
+    private String clusterNodeId;
+
+    /**
+     * Synchronization delay, in seconds.
+     */
+    private int syncDelay;
+
+    /**
+     * Journal used.
+     */
+    private Journal journal;
+
+    /**
+     * Mutex used when syncing.
+     */
+    private final Mutex syncLock = new Mutex();
+
+    /**
+     * Flag indicating whether this cluster node is stopped.
+     */
+    private boolean stopped;
+
+    /**
+     * Map of available lock listeners, indexed by workspace name.
+     */
+    private final Map wspLockListeners = new HashMap();
+
+    /**
+     * Map of available update listeners, indexed by workspace name.
+     */
+    private final Map wspUpdateListeners = new HashMap();
+
+    /**
+     * Versioning update listener.
+     */
+    private UpdateEventListener versionUpdateListener;
+
+    /**
+     * Initialize this cluster node.
+     *
+     * @throws ClusterException if an error occurs
+     */
+    public void init(ClusterContext clusterContext) throws ClusterException {
+        this.clusterContext = clusterContext;
+
+        init();
+    }
+
+    /**
+     * Initialize this cluster node (overridable).
+     *
+     * @throws ClusterException if an error occurs
+     */
+    protected void init() throws ClusterException {
+        ClusterConfig cc = clusterContext.getClusterConfig();
+        clusterNodeId = getClusterNodeId(cc.getId());
+        syncDelay = cc.getSyncDelay();
+
+        try {
+            journal = (Journal) cc.getJournalConfig().newInstance();
+            journal.init(clusterNodeId, new SyncListener(), clusterContext.getNamespaceResovler());
+        } catch (ConfigurationException e) {
+            throw new ClusterException(e.getMessage(), e.getCause());
+        }
+    }
+
+    /**
+     * Starts this cluster node.
+     *
+     * @throws ClusterException if an error occurs
+     */
+    public synchronized void start() throws ClusterException {
+        sync();
+
+        Thread t = new Thread(this, "ClusterNode-" + clusterNodeId);
+        t.setDaemon(true);
+        t.start();
+    }
+
+    /**
+     * Run loop that will sync this node after some delay.
+     */
+    public void run() {
+        for (;;) {
+            synchronized (this) {
+                try {
+                    wait(syncDelay * 1000);
+                } catch (InterruptedException e) {}
+
+                if (stopped) {
+                    return;
+                }
+            }
+            try {
+                sync();
+            } catch (ClusterException e) {
+                String msg = "Periodic sync of journal failed: " + e.getMessage();
+                log.error(msg);
+            }
+        }
+    }
+
+    /**
+     * Synchronize contents from journal.
+     *
+     * @throws ClusterException if an error occurs
+     */
+    public void sync() throws ClusterException {
+        try {
+            syncLock.acquire();
+        } catch (InterruptedException e) {
+            String msg = "Interrupted while waiting for mutex.";
+            throw new ClusterException(msg);
+        }
+        try {
+            journal.sync();
+        } finally {
+            syncLock.release();
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public synchronized void stop() {
+        stopped = true;
+
+        notifyAll();
+    }
+
+    /**
+     * Called when a node has been locked.
+     *
+     * @param workspace workspace name
+     * @param nodeId node id
+     * @param deep flag indicating whether lock is deep
+     * @param owner lock owner
+     */
+    private void locked(String workspace, NodeId nodeId, boolean deep, String owner) {
+        try {
+            journal.begin(workspace);
+            journal.log(nodeId, deep, owner);
+            journal.prepare();
+            journal.commit();
+        } catch (JournalException e) {
+            String msg = "Unable to create log entry: " + e.getMessage();
+            log.error(msg);
+        } catch (Throwable e) {
+            String msg = "Unexpected error while creating log entry.";
+            log.error(msg, e);
+        }
+    }
+
+    /**
+     * Called when a node has been unlocked.
+     *
+     * @param workspace workspace name
+     * @param nodeId node id
+     */
+    private void unlocked(String workspace, NodeId nodeId) {
+        try {
+            journal.begin(workspace);
+            journal.log(nodeId);
+            journal.prepare();
+            journal.commit();
+        } catch (JournalException e) {
+            String msg = "Unable to create log entry: " + e.getMessage();
+            log.error(msg);
+        } catch (Throwable e) {
+            String msg = "Unexpected error while creating log entry.";
+            log.error(msg, e);
+        }
+    }
+
+    /**
+     * Create an {@link UpdateEventChannel} for versioning operations.
+     *
+     * @return update event channel
+     */
+    public UpdateEventChannel createUpdateChannel() {
+        return this;
+    }
+
+    /**
+     * Create an {@link UpdateEventChannel} for some workspace.
+     *
+     * @param workspace workspace name
+     * @return lock event channel
+     */
+    public UpdateEventChannel createUpdateChannel(String workspace) {
+        return new WorkspaceUpdateChannel(workspace);
+    }
+
+    /**
+     * Create a {@link LockEventChannel} for some workspace.
+     *
+     * @param workspace workspace name
+     * @return lock event channel
+     */
+    public LockEventChannel createLockChannel(String workspace) {
+        return new WorkspaceLockChannel(workspace);
+    }
+
+    /**
+     * Return the instance id to be used for this node in the cluster.
+     *
+     * @param id configured id, <code>null</code> to take random id
+     */
+    private String getClusterNodeId(String id) {
+        if (id == null) {
+            id = System.getProperty(SYSTEM_PROPERTY_NODE_ID);
+            if (id == null) {
+                id = toHexString((short) (Math.random() * (Short.MAX_VALUE - Short.MIN_VALUE)));
+            }
+        }
+        return id;
+    }
+
+    /**
+     * Return a zero-padded short string representation.
+     *
+     * @param n short
+     * @return string representation
+     */
+    private static String toHexString(short n) {
+        String s = Integer.toHexString(n);
+        int padlen = SHORT_PADDING.length() - s.length();
+        if (padlen < 0) {
+            s = s.substring(-padlen);
+        } else if (padlen > 0) {
+            s = SHORT_PADDING.substring(0, padlen) + s;
+        }
+        return s;
+    }
+
+    //--------------------------------------------------< UpdateEventListener >
+
+    /**
+     * {@inheritDoc}
+     * <p/>
+     * Invoked when an update has been created inside versioning. Delegate
+     * to common method with <code>null</code> workspace.
+     */
+    public void updateCreated(ChangeLog changes, EventStateCollection esc) {
+        updateCreated(null, changes, esc);
+    }
+
+    /**
+     * Called when an a update operation has been created.
+     *
+     * @param workspace workspace to use when writing journal entry
+     * @param changes changes
+     * @param esc events as they will be delivered on success
+     */
+    private void updateCreated(String workspace, ChangeLog changes, EventStateCollection esc) {
+        try {
+            journal.begin(workspace);
+            journal.log(changes, esc);
+        } catch (JournalException e) {
+            String msg = "Unable to create log entry: " + e.getMessage();
+            log.error(msg);
+        } catch (Throwable e) {
+            String msg = "Unexpected error while creating log entry.";
+            log.error(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void updatePrepared() {
+        try {
+            journal.prepare();
+        } catch (JournalException e) {
+            String msg = "Unable to create log entry: " + e.getMessage();
+            log.error(msg);
+        } catch (Throwable e) {
+            String msg = "Unexpected error while creating log entry.";
+            log.error(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void updateCommitted() {
+        try {
+            journal.commit();
+        } catch (JournalException e) {
+            String msg = "Unable to create log entry: " + e.getMessage();
+            log.error(msg);
+        } catch (Throwable e) {
+            String msg = "Unexpected error while creating log entry.";
+            log.error(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void updateCancelled() {
+        try {
+            journal.cancel();
+        } catch (JournalException e) {
+            String msg = "Unable to create log entry: " + e.getMessage();
+            log.error(msg);
+        } catch (Throwable e) {
+            String msg = "Unexpected error while creating log entry.";
+            log.error(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p/>
+     * Invoked to set the update event listener responsible for delivering versioning events.
+     */
+    public void setListener(UpdateEventListener listener) {
+        versionUpdateListener = listener;
+    }
+
+    /**
+     * Workspace update channel.
+     */
+    class WorkspaceUpdateChannel implements UpdateEventChannel {
+
+        /**
+         * Workspace name.
+         */
+        private final String workspace;
+
+        /**
+         * Create a new instance of this class.
+         *
+         * @param workspace workspace name
+         */
+        public WorkspaceUpdateChannel(String workspace) {
+            this.workspace = workspace;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void updateCreated(ChangeLog changes, EventStateCollection esc) {
+            ClusterNode.this.updateCreated(workspace, changes, esc);
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void updatePrepared() {
+            ClusterNode.this.updatePrepared();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void updateCommitted() {
+            ClusterNode.this.updateCommitted();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void updateCancelled() {
+            ClusterNode.this.updateCancelled();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void setListener(UpdateEventListener listener) {
+            wspUpdateListeners.remove(workspace);
+            if (listener != null) {
+                wspUpdateListeners.put(workspace, listener);
+            }
+        }
+    }
+
+    /**
+     * Workspace lock channel.
+     */
+    class WorkspaceLockChannel implements LockEventChannel {
+
+        /**
+         * Workspace name.
+         */
+        private final String workspace;
+
+        /**
+         * Create a new instance of this class.
+         *
+         * @param workspace workspace name
+         */
+        public WorkspaceLockChannel(String workspace) {
+            this.workspace = workspace;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void locked(NodeId nodeId, boolean deep, String owner) {
+            ClusterNode.this.locked(workspace, nodeId, deep, owner);
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void unlocked(NodeId nodeId) {
+            ClusterNode.this.unlocked(workspace, nodeId);
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void setListener(LockEventListener listener) {
+            wspLockListeners.remove(workspace);
+            if (listener != null) {
+                wspLockListeners.put(workspace, listener);
+            }
+        }
+    }
+
+    /**
+     * Sync listener on journal.
+     */
+    class SyncListener implements RecordProcessor {
+
+        /**
+         * Workspace name.
+         */
+        private String workspace;
+
+        /**
+         * Change log.
+         */
+        private ChangeLog changeLog;
+
+        /**
+         * List of recorded events.
+         */
+        private List events;
+
+        /**
+         * Last used session for event sources.
+         */
+        private Session lastSession;
+
+        /**
+         * {@inheritDoc}
+         */
+        public void start(String workspace) {
+            this.workspace = workspace;
+
+            changeLog = new ChangeLog();
+            events = new ArrayList();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void process(ItemOperation operation) {
+            operation.apply(changeLog);
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void process(int type, NodeId parentId, Path parentPath, NodeId childId,
+                            Path.PathElement childRelPath, QName ntName, Set mixins, String userId) {
+
+            EventState event = null;
+
+            switch (type) {
+                case Event.NODE_ADDED:
+                    event = EventState.childNodeAdded(parentId, parentPath, childId, childRelPath,
+                            ntName, mixins, getOrCreateSession(userId));
+                    break;
+                case Event.NODE_REMOVED:
+                    event = EventState.childNodeRemoved(parentId, parentPath, childId, childRelPath,
+                            ntName, mixins, getOrCreateSession(userId));
+                    break;
+                case Event.PROPERTY_ADDED:
+                    event = EventState.propertyAdded(parentId, parentPath, childRelPath,
+                            ntName, mixins, getOrCreateSession(userId));
+                    break;
+                case Event.PROPERTY_CHANGED:
+                    event = EventState.propertyChanged(parentId, parentPath, childRelPath,
+                            ntName, mixins, getOrCreateSession(userId));
+                    break;
+                case Event.PROPERTY_REMOVED:
+                    event = EventState.propertyRemoved(parentId, parentPath, childRelPath,
+                            ntName, mixins, getOrCreateSession(userId));
+                    break;
+                default:
+                    String msg = "Unexpected event type: " + type;
+                    log.warn(msg);
+            }
+            events.add(event);
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void process(NodeId nodeId, boolean isDeep, String owner) {
+            //todo should be aggregated
+            LockEventListener listener = (LockEventListener) wspLockListeners.get(workspace);
+            if (listener == null) {
+                try {
+                    clusterContext.lockEventsReady(workspace);
+                } catch (RepositoryException e) {
+                    String msg = "Unable to make lock listener for workspace " +
+                            workspace + " online: " + e.getMessage();
+                    log.warn(msg);
+                }
+                listener = (LockEventListener) wspLockListeners.get(workspace);
+                if (listener ==  null) {
+                    String msg = "Lock channel unavailable for workspace: " + workspace;
+                    log.error(msg);
+                    return;
+                }
+            }
+            try {
+                listener.externalLock(nodeId, isDeep, owner);
+            } catch (RepositoryException e) {
+                String msg = "Unable to deliver lock event: " + e.getMessage();
+                log.error(msg);
+            }
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void process(NodeId nodeId) {
+            //todo should be aggregated
+            LockEventListener listener = (LockEventListener) wspLockListeners.get(workspace);
+            if (listener == null) {
+                try {
+                    clusterContext.lockEventsReady(workspace);
+                } catch (RepositoryException e) {
+                    String msg = "Error making lock listener for workspace " +
+                            workspace + " online: " + e.getMessage();
+                    log.warn(msg);
+                }
+                listener = (LockEventListener) wspLockListeners.get(workspace);
+                if (listener ==  null) {
+                    String msg = "Lock listener unavailable for workspace: " + workspace;
+                    log.error(msg);
+                    return;
+                }
+            }
+            try {
+                listener.externalUnlock(nodeId);
+            } catch (RepositoryException e) {
+                String msg = "Unable to deliver unlock event: " + e.getMessage();
+                log.error(msg);
+            }
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void end() {
+            UpdateEventListener listener = null;
+            if (workspace != null) {
+                listener = (UpdateEventListener) wspUpdateListeners.get(workspace);
+                if (listener == null) {
+                    try {
+                        clusterContext.updateEventsReady(workspace);
+                    } catch (RepositoryException e) {
+                        String msg = "Error making update listener for workspace " +
+                                workspace + " online: " + e.getMessage();
+                        log.warn(msg);
+                    }
+                    listener = (UpdateEventListener) wspUpdateListeners.get(workspace);
+                    if (listener ==  null) {
+                        String msg = "Update listener unavailable for workspace: " + workspace;
+                        log.error(msg);
+                        return;
+                    }
+                }
+            } else {
+                if (versionUpdateListener != null) {
+                    listener = versionUpdateListener;
+                } else {
+                    String msg = "Version update listener unavailable.";
+                    log.error(msg);
+                }
+            }
+            try {
+                listener.externalUpdate(changeLog, events);
+            } catch (RepositoryException e) {
+                String msg = "Unable to deliver update events: " + e.getMessage();
+                log.error(msg);
+            }
+        }
+
+        /**
+         * Return a session matching a certain user id.
+         *
+         * @param userId user id
+         * @return session
+         */
+        private Session getOrCreateSession(String userId) {
+            if (lastSession == null || !lastSession.getUserID().equals(userId)) {
+                lastSession = new ClusterSession(userId);
+            }
+            return lastSession;
+        }
+    }
+}

Added: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterSession.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterSession.java?view=auto&rev=471760
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterSession.java (added)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterSession.java Mon Nov  6 07:22:29 2006
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.cluster;
+
+import org.xml.sax.ContentHandler;
+
+import javax.jcr.Session;
+import javax.jcr.Repository;
+import javax.jcr.Workspace;
+import javax.jcr.Credentials;
+import javax.jcr.Node;
+import javax.jcr.Item;
+import javax.jcr.ValueFactory;
+import javax.jcr.UnsupportedRepositoryOperationException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Represents the session that has made some changes on another node in the cluster. The only method currently
+ * implemented is {@link #getUserID()}.
+ */
+class ClusterSession implements Session {
+
+    /**
+     * User id to represent.
+     */
+    private final String userId;
+
+    /**
+     * Create a new instance of this class.
+     *
+     * @param userId user id
+     */
+    public ClusterSession(String userId) {
+        this.userId = userId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public String getUserID() {
+        return userId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public Repository getRepository() {
+        return null;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public Object getAttribute(String s) {
+        return null;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public String[] getAttributeNames() {
+        return new String[0];
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public Workspace getWorkspace() {
+        return null;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public Session impersonate(Credentials credentials) throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public Node getRootNode() throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public Node getNodeByUUID(String s) throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public Item getItem(String s) throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean itemExists(String s) throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void move(String s, String s1) throws UnsupportedRepositoryOperationException {
+
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void save() throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void refresh(boolean b) throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean hasPendingChanges() throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public ValueFactory getValueFactory() throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void checkPermission(String s, String s1) throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public ContentHandler getImportContentHandler(String s, int i) throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void importXML(String s, InputStream inputStream, int i) throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void exportSystemView(String s, ContentHandler contentHandler, boolean b, boolean b1)
+            throws UnsupportedRepositoryOperationException {
+
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void exportSystemView(String s, OutputStream outputStream, boolean b, boolean b1)
+            throws UnsupportedRepositoryOperationException {
+
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void exportDocumentView(String s, ContentHandler contentHandler, boolean b, boolean b1)
+            throws UnsupportedRepositoryOperationException {
+
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void exportDocumentView(String s, OutputStream outputStream, boolean b, boolean b1)
+            throws UnsupportedRepositoryOperationException {
+
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void setNamespacePrefix(String s, String s1) throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public String[] getNamespacePrefixes() throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public String getNamespaceURI(String s) throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public String getNamespacePrefix(String s) throws UnsupportedRepositoryOperationException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void logout() {
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean isLive() {
+        return true;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void addLockToken(String s) {
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public String[] getLockTokens() {
+        return new String[0];
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void removeLockToken(String s) {
+    }
+}

Added: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java?view=auto&rev=471760
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java (added)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java Mon Nov  6 07:22:29 2006
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.cluster;
+
+import org.apache.jackrabbit.core.NodeId;
+import org.apache.jackrabbit.core.state.ChangeLog;
+import org.apache.jackrabbit.core.state.ItemState;
+import org.apache.jackrabbit.core.state.NodeState;
+import org.apache.jackrabbit.core.state.PropertyState;
+import org.apache.jackrabbit.core.observation.EventState;
+import org.apache.jackrabbit.core.observation.EventStateCollection;
+import org.apache.jackrabbit.name.Path;
+import org.apache.jackrabbit.name.QName;
+import org.apache.jackrabbit.name.NamespaceResolver;
+import org.apache.jackrabbit.name.NoPrefixDeclaredException;
+import org.apache.jackrabbit.name.NameException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.HashSet;
+
+import EDU.oswego.cs.dl.util.concurrent.Mutex;
+
+/**
+ * File-based journal implementation.
+ */
+public class FileJournal implements Journal {
+
+    /**
+     * Global revision counter name, located in the journal directory.
+     */
+    private static final String REVISION_NAME = "revision";
+
+    /**
+     * Logger.
+     */
+    private static Logger log = LoggerFactory.getLogger(FileJournal.class);
+
+    /**
+     * Journal id.
+     */
+    private String id;
+
+    /**
+     * Namespace resolver used to map prefixes to URIs and vice-versa.
+     */
+    private NamespaceResolver resolver;
+
+    /**
+     * Callback.
+     */
+    private RecordProcessor processor;
+
+    /**
+     * Directory name, bean property.
+     */
+    private String directory;
+
+    /**
+     * Revision file name, bean property.
+     */
+    private String revision;
+
+    /**
+     * Journal root directory.
+     */
+    private File root;
+
+    /**
+     * Instance counter.
+     */
+    private FileRevision instanceRevision;
+
+    /**
+     * Global journal counter.
+     */
+    private FileRevision globalRevision;
+
+    /**
+     * Mutex used when writing journal.
+     */
+    private final Mutex writeMutex = new Mutex();
+
+    /**
+     * Current temporary journal log.
+     */
+    private File tempLog;
+
+    /**
+     * Current file record output.
+     */
+    private FileRecordOutput out;
+
+    /**
+     * Current file record.
+     */
+    private FileRecord record;
+
+    /**
+     * Bean getter for journal directory.
+     * @return directory
+     */
+    public String getDirectory() {
+        return directory;
+    }
+
+    /**
+     * Bean setter for journal directory.
+     * @param directory directory used for journaling
+     */
+    public void setDirectory(String directory) {
+        this.directory = directory;
+    }
+
+    /**
+     * Bean getter for revision file.
+     * @return revision file
+     */
+    public String getRevision() {
+        return directory;
+    }
+
+    /**
+     * Bean setter for journal directory.
+     * @param revision directory used for journaling
+     */
+    public void setRevision(String revision) {
+        this.revision = revision;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void init(String id, RecordProcessor processor, NamespaceResolver resolver) throws JournalException {
+        this.id = id;
+        this.resolver = resolver;
+        this.processor = processor;
+
+        if (directory == null) {
+            String msg = "Directory not specified.";
+            throw new JournalException(msg);
+        }
+        if (revision == null) {
+            String msg = "Revision not specified.";
+            throw new JournalException(msg);
+        }
+        root = new File(directory);
+        if (!root.exists() || !root.isDirectory()) {
+            String msg = "Directory specified does either not exist or is not a directory: " + directory;
+            throw new JournalException(msg);
+        }
+        instanceRevision = new FileRevision(new File(revision));
+        globalRevision = new FileRevision(new File(root, REVISION_NAME));
+
+        log.info("FileJournal initialized at path: " + directory);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void sync() throws JournalException {
+        final long instanceValue = instanceRevision.get();
+        final long globalValue = globalRevision.get();
+
+        File[] files = root.listFiles(new FilenameFilter() {
+            public boolean accept(File dir, String name) {
+                if (name.endsWith(FileRecord.EXTENSION)) {
+                    int sep = name.indexOf('.');
+                    if (sep > 0) {
+                        try {
+                            long counter = Long.parseLong(name.substring(0, sep), 16);
+                            return counter > instanceValue && counter <= globalValue;
+                        } catch (NumberFormatException e) {
+                            String msg = "Skipping bogusly named journal file '" + name + "': " + e.getMessage();
+                            log.warn(msg);
+                        }
+                    }
+                }
+                return false;
+            }
+        });
+        Arrays.sort(files, new Comparator() {
+            public int compare(Object o1, Object o2) {
+                File f1 = (File) o1;
+                File f2 = (File) o2;
+                return f1.getName().compareTo(f2.getName());
+            }
+        });
+        if (files.length > 0) {
+            for (int i = 0; i < files.length; i++) {
+                try {
+                    FileRecord record = new FileRecord(files[i]);
+                    if (!record.getJournalId().equals(id)) {
+                        process(record);
+                    } else {
+                        log.info("Log entry matches journal id, skipped: " + files[i]);
+                    }
+                    instanceRevision.set(record.getCounter());
+                } catch (IllegalArgumentException e) {
+                    String msg = "Skipping bogusly named journal file '" + files[i] + ": " + e.getMessage();
+                    log.warn(msg);
+                }
+            }
+            log.info("Sync finished, instance revision is: " + FileRecord.toHexString(instanceRevision.get()));
+        }
+    }
+
+    /**
+     * Process a record.
+     *
+     * @param record record to process
+     * @throws JournalException if an error occurs
+     */
+    void process(FileRecord record) throws JournalException {
+        File file = record.getFile();
+
+        log.info("Processing: " + file);
+
+        FileRecordInput in = null;
+        String workspace = null;
+
+        try {
+            in = new FileRecordInput(new FileInputStream(file), resolver);
+
+            workspace = in.readString();
+            if (workspace.equals("")) {
+                workspace = null;
+            }
+            processor.start(workspace);
+
+            for (;;) {
+                char c = in.readChar();
+                if (c == '\0') {
+                    break;
+                }
+                if (c == 'N') {
+                    NodeOperation operation = NodeOperation.create(in.readByte());
+                    operation.setId(in.readNodeId());
+                    processor.process(operation);
+                } else if (c == 'P') {
+                    PropertyOperation operation = PropertyOperation.create(in.readByte());
+                    operation.setId(in.readPropertyId());
+                    processor.process(operation);
+                } else if (c == 'E') {
+                    int type = in.readByte();
+                    NodeId parentId = in.readNodeId();
+                    Path parentPath = in.readPath();
+                    NodeId childId = in.readNodeId();
+                    Path.PathElement childRelPath = in.readPathElement();
+                    QName ntName = in.readQName();
+
+                    Set mixins = new HashSet();
+                    int mixinCount = in.readInt();
+                    for (int i = 0; i < mixinCount; i++) {
+                        mixins.add(in.readQName());
+                    }
+                    String userId = in.readString();
+                    processor.process(type, parentId, parentPath, childId,
+                            childRelPath, ntName, mixins, userId);
+                } else if (c == 'L') {
+                    NodeId nodeId = in.readNodeId();
+                    boolean isDeep = in.readBoolean();
+                    String owner = in.readString();
+
+                    processor.process(nodeId, isDeep, owner);
+                } else if (c == 'U') {
+                    NodeId nodeId = in.readNodeId();
+                    processor.process(nodeId);
+                } else {
+                    throw new IllegalArgumentException("Unknown entry type: " + c);
+                }
+            }
+            processor.end();
+
+        } catch (NameException e) {
+            String msg = "Unable to read journal entry " + file + ": " + e.getMessage();
+            throw new JournalException(msg);
+        } catch (IOException e) {
+            String msg = "Unable to read journal entry " + file + ": " + e.getMessage();
+            throw new JournalException(msg);
+        } catch (IllegalArgumentException e) {
+            String msg = "Error while processing journal file " + file + ": " + e.getMessage();
+            throw new JournalException(msg);
+        } finally {
+            if (in != null) {
+                try {
+                    in.close();
+                } catch (IOException e) {
+                    String msg = "I/O error while closing " + file + ": " + e.getMessage();
+                    log.warn(msg);
+                }
+            }
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void begin(String workspace) throws JournalException {
+        try {
+            writeMutex.acquire();
+        } catch (InterruptedException e) {
+            String msg = "Interrupted while waiting for write lock.";
+            throw new JournalException(msg);
+        }
+
+        boolean succeeded = false;
+
+        try {
+            sync();
+
+            tempLog = File.createTempFile("journal", ".tmp", root);
+            out = new FileRecordOutput(new FileOutputStream(tempLog), resolver);
+            out.writeString(workspace != null ? workspace : "");
+
+            succeeded = true;
+        } catch (IOException e) {
+            String msg = "Unable to create journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        } finally {
+            if (!succeeded) {
+                writeMutex.release();
+            }
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void log(ChangeLog changeLog, EventStateCollection esc) throws JournalException {
+        Iterator addedStates = changeLog.addedStates();
+        while (addedStates.hasNext()) {
+            ItemState state = (ItemState) addedStates.next();
+            if (state.isNode()) {
+                log(NodeAddedOperation.create((NodeState) state));
+            } else {
+                log(PropertyAddedOperation.create((PropertyState) state));
+            }
+        }
+        Iterator modifiedStates = changeLog.modifiedStates();
+        while (modifiedStates.hasNext()) {
+            ItemState state = (ItemState) modifiedStates.next();
+            if (state.isNode()) {
+                log(NodeModifiedOperation.create((NodeState) state));
+            } else {
+                log(PropertyModifiedOperation.create((PropertyState) state));
+            }
+        }
+        Iterator deletedStates = changeLog.deletedStates();
+        while (deletedStates.hasNext()) {
+            ItemState state = (ItemState) deletedStates.next();
+            if (state.isNode()) {
+                log(NodeDeletedOperation.create((NodeState) state));
+            } else {
+                log(PropertyDeletedOperation.create((PropertyState) state));
+            }
+        }
+
+        Iterator events = esc.getEvents().iterator();
+        while (events.hasNext()) {
+            EventState event = (EventState) events.next();
+            log(event);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void log(NodeId nodeId, boolean isDeep, String owner) throws JournalException {
+        try {
+            out.writeChar('L');
+            out.writeNodeId(nodeId);
+            out.writeBoolean(isDeep);
+            out.writeString(owner);
+        } catch (IOException e) {
+            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void log(NodeId nodeId) throws JournalException {
+        try {
+            out.writeChar('U');
+            out.writeNodeId(nodeId);
+        } catch (IOException e) {
+            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        }
+    }
+
+    /**
+     * Log a property operation.
+     *
+     * @param operation property operation
+     */
+    protected void log(PropertyOperation operation) throws JournalException {
+        try {
+            out.writeChar('P');
+            out.writeByte(operation.getOperationType());
+            out.writePropertyId(operation.getId());
+        } catch (NoPrefixDeclaredException e) {
+            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        } catch (IOException e) {
+            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        }
+    }
+
+    /**
+     * Log a node operation.
+     *
+     * @param operation node operation
+     */
+    protected void log(NodeOperation operation) throws JournalException {
+        try {
+            out.writeChar('N');
+            out.writeByte(operation.getOperationType());
+            out.writeNodeId(operation.getId());
+        } catch (IOException e) {
+            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        }
+    }
+
+    /**
+     * Log an event. Subclass responsibility.
+     *
+     * @param event event to log
+     */
+    protected void log(EventState event) throws JournalException {
+        try {
+            out.writeChar('E');
+            out.writeByte(event.getType());
+            out.writeNodeId(event.getParentId());
+            out.writePath(event.getParentPath());
+            out.writeNodeId(event.getChildId());
+            out.writePathElement(event.getChildRelPath());
+            out.writeQName(event.getNodeType());
+
+            Set mixins = event.getMixinNames();
+            out.writeInt(mixins.size());
+            Iterator iter = mixins.iterator();
+            while (iter.hasNext()) {
+                out.writeQName((QName) iter.next());
+            }
+            out.writeString(event.getUserId());
+        } catch (NoPrefixDeclaredException e) {
+            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        } catch (IOException e) {
+            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void prepare() throws JournalException {
+        globalRevision.lock(false);
+
+        boolean prepared = false;
+
+        try {
+            sync();
+            record = new FileRecord(root, globalRevision.get() + 1, id);
+
+            prepared = true;
+        } finally {
+            if (!prepared) {
+                globalRevision.unlock();
+            }
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void commit() throws JournalException {
+        try {
+            out.writeChar('\0');
+            out.close();
+
+            if (!tempLog.renameTo(record.getFile())) {
+                throw new JournalException("Unable to rename " + tempLog + " to " + record.getFile());
+            }
+            globalRevision.set(record.getCounter());
+
+        } catch (IOException e) {
+            String msg = "Unable to close journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        } finally {
+            globalRevision.unlock();
+            writeMutex.release();
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void cancel() throws JournalException {
+        try {
+            out.close();
+            tempLog.delete();
+        } catch (IOException e) {
+            String msg = "Unable to close journal log " + tempLog + ": " + e.getMessage();
+            log.warn(msg);
+        } finally {
+            globalRevision.unlock();
+            writeMutex.release();
+        }
+    }
+}

Added: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java?view=auto&rev=471760
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java (added)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java Mon Nov  6 07:22:29 2006
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.cluster;
+
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import java.io.File;
+
+/**
+ * Represents a file-based record.
+ */
+class FileRecord {
+
+    /**
+     * File record extension.
+     */
+    static final String EXTENSION = ".log";
+
+    /**
+     * Indicator for a literal UUID.
+     */
+    static final byte UUID_LITERAL = 0x00;
+
+    /**
+     * Indicator for a UUID index.
+     */
+    static final byte UUID_INDEX = 0x01;
+
+    /**
+     * Used for padding long string representations.
+     */
+    private static final String LONG_PADDING = "0000000000000000";
+
+    /**
+     * Underlying file.
+     */
+    private final File file;
+
+    /**
+     * Counter.
+     */
+    private final long counter;
+
+    /**
+     * Journal id.
+     */
+    private final String journalId;
+
+    /**
+     * Creates a new file record from an existing file. Retrieves meta data by parsing the file's name.
+     *
+     * @param file file to use as record
+     * @throws IllegalArgumentException if file name is bogus
+     */
+    public FileRecord(File file) throws IllegalArgumentException {
+        this.file = file;
+
+        String name = file.getName();
+
+        int sep1 = name.indexOf('.');
+        if (sep1 == -1) {
+            throw new IllegalArgumentException("Missing first . separator.");
+        }
+        try {
+            counter = Long.parseLong(name.substring(0, sep1), 16);
+        } catch (NumberFormatException e) {
+            throw new IllegalArgumentException("Unable to decompose long: " + e.getMessage());
+        }
+        int sep2 = name.lastIndexOf('.');
+        if (sep2 == -1) {
+            throw new IllegalArgumentException("Missing second . separator.");
+        }
+        journalId = name.substring(sep1 + 1, sep2);
+    }
+
+    /**
+     * Creates a new file record from a counter and instance ID.
+     *
+     * @param parent parent directory
+     * @param counter counter to use
+     * @param journalId journal id to use
+     */
+    public FileRecord(File parent, long counter, String journalId) {
+        StringBuffer name = new StringBuffer();
+        name.append(toHexString(counter));
+        name.append('.');
+        name.append(journalId);
+
+        name.append(EXTENSION);
+
+        this.file = new File(parent, name.toString());
+        this.counter = counter;
+        this.journalId = journalId;
+    }
+
+    /**
+     * Return the journal counter associated with this record.
+     *
+     * @return counter
+     */
+    public long getCounter() {
+        return counter;
+    }
+
+    /**
+     * Return the id of the journal that created this record.
+     *
+     * @return journal id
+     */
+    public String getJournalId() {
+        return journalId;
+    }
+
+    /**
+     * Return this record's file.
+     *
+     * @return file
+     */
+    public File getFile() {
+        return file;
+    }
+
+    /**
+     * Return a zero-padded long string representation.
+     */
+    public static String toHexString(long l) {
+        String s = Long.toHexString(l);
+        int padlen = LONG_PADDING.length() - s.length();
+        if (padlen > 0) {
+            s = LONG_PADDING.substring(0, padlen) + s;
+        }
+        return s;
+    }
+}
\ No newline at end of file

Added: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java?view=auto&rev=471760
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java (added)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java Mon Nov  6 07:22:29 2006
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.cluster;
+
+import org.apache.jackrabbit.core.NodeId;
+import org.apache.jackrabbit.core.PropertyId;
+import org.apache.jackrabbit.name.NamespaceResolver;
+import org.apache.jackrabbit.name.QName;
+import org.apache.jackrabbit.name.NameFormat;
+import org.apache.jackrabbit.name.IllegalNameException;
+import org.apache.jackrabbit.name.UnknownPrefixException;
+import org.apache.jackrabbit.name.Path;
+import org.apache.jackrabbit.name.PathFormat;
+import org.apache.jackrabbit.name.MalformedPathException;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.DataInputStream;
+import java.util.ArrayList;
+
+/**
+ * Defines methods to read members out of a file record.
+ */
+class FileRecordInput {
+
+    /**
+     * Underlying input stream.
+     */
+    private final DataInputStream in;
+
+    /**
+     * Name resolver.
+     */
+    private final NamespaceResolver resolver;
+
+    /**
+     * UUID index.
+     */
+    private final ArrayList uuidIndex = new ArrayList();
+
+    /**
+     * Flag indicating whether this input is closed.
+     */
+    private boolean closed;
+
+    /**
+     * Open an existing file record.
+     *
+     * @param in       underlying input stream
+     * @param resolver namespace resolver
+     */
+    public FileRecordInput(InputStream in, NamespaceResolver resolver) {
+        this.in = new DataInputStream(in);
+        this.resolver = resolver;
+    }
+
+    /**
+     * Read a byte from the underlying stream.
+     *
+     * @return byte
+     * @throws IOException if an I/O error occurs
+     */
+    public byte readByte() throws IOException {
+        checkOpen();
+
+        return in.readByte();
+    }
+
+    /**
+     * Read a character from the underlying stream.
+     *
+     * @return character
+     * @throws IOException if an I/O error occurs
+     */
+    public char readChar() throws IOException {
+        checkOpen();
+
+        return in.readChar();
+    }
+
+    /**
+     * Read a boolean from the underlying stream.
+     *
+     * @return boolean
+     * @throws IOException if an I/O error occurs
+     */
+    public boolean readBoolean() throws IOException {
+        checkOpen();
+
+        return in.readBoolean();
+    }
+
+    /**
+     * Read an integer from the underlying stream.
+     *
+     * @return integer
+     * @throws IOException if an I/O error occurs
+     */
+    public int readInt() throws IOException {
+        checkOpen();
+
+        return in.readInt();
+    }
+
+    /**
+     * Read a string from the underlying stream.
+     *
+     * @return string
+     * @throws IOException if an I/O error occurs
+     */
+    public String readString() throws IOException {
+        checkOpen();
+
+        return in.readUTF();
+    }
+
+    /**
+     * Read a <code>QName</code>.
+     *
+     * @return name
+     * @throws IOException if an I/O error occurs
+     * @throws IllegalNameException if the name retrieved is illegal
+     * @throws UnknownPrefixException if the prefix is unknown
+     */
+    public QName readQName() throws IOException, IllegalNameException, UnknownPrefixException {
+        checkOpen();
+
+        return NameFormat.parse(readString(), resolver);
+    }
+
+    /**
+     * Read a <code>PathElement</code>.
+     *
+     * @return path element
+     * @throws IOException if an I/O error occurs
+     * @throws IllegalNameException if the name retrieved is illegal
+     * @throws UnknownPrefixException if the prefix is unknown
+     */
+    public Path.PathElement readPathElement() throws IOException, IllegalNameException, UnknownPrefixException {
+        checkOpen();
+
+        QName name = NameFormat.parse(readString(), resolver);
+        int index = readInt();
+        if (index != 0) {
+            return Path.PathElement.create(name, index);
+        } else {
+            return Path.PathElement.create(name);
+        }
+    }
+
+    /**
+     * Read a <code>Path</code>.
+     *
+     * @return path
+     * @throws IOException if an I/O error occurs
+     * @throws MalformedPathException if the path is malformed
+     */
+    public Path readPath() throws IOException, MalformedPathException {
+        checkOpen();
+
+        return PathFormat.parse(readString(), resolver);
+    }
+
+    /**
+     * Read a <code>NodeId</code>
+     *
+     * @return node id
+     * @throws IOException if an I/O error occurs
+     */
+    public NodeId readNodeId() throws IOException {
+        checkOpen();
+
+        byte b = readByte();
+        if (b == FileRecord.UUID_INDEX) {
+            int index = readInt();
+            if (index == -1) {
+                return null;
+            } else {
+                return (NodeId) uuidIndex.get(index);
+            }
+        } else if (b == FileRecord.UUID_LITERAL) {
+            NodeId nodeId = NodeId.valueOf(readString());
+            uuidIndex.add(nodeId);
+            return nodeId;
+        } else {
+            String msg = "UUID indicator unknown: " + b;
+            throw new IOException(msg);
+        }
+    }
+
+    /**
+     * Read a <code>PropertyId</code>
+     *
+     * @return property id
+     * @throws IOException if an I/O error occurs
+     * @throws IllegalNameException if the name retrieved is illegal
+     * @throws UnknownPrefixException if the prefix is unknown
+     */
+    public PropertyId readPropertyId() throws IOException, IllegalNameException, UnknownPrefixException  {
+        checkOpen();
+
+        return new PropertyId(readNodeId(), readQName());
+    }
+
+    /**
+     * Close this input.
+     *
+     * @throws IOException if an I/O error occurs
+     */
+    public void close() throws IOException {
+        checkOpen();
+
+        try {
+            in.close();
+        } finally {
+            closed = true;
+        }
+    }
+
+    /**
+     * Check that this input is open, throw otherwise.
+     *
+     * @throws IllegalStateException if input is closed.
+     */
+    private void checkOpen() throws IllegalStateException {
+        if (closed) {
+            throw new IllegalStateException("Input closed.");
+        }
+    }
+
+}

Added: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java?view=auto&rev=471760
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java (added)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java Mon Nov  6 07:22:29 2006
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.cluster;
+
+import org.apache.jackrabbit.core.NodeId;
+import org.apache.jackrabbit.core.PropertyId;
+import org.apache.jackrabbit.name.NamespaceResolver;
+import org.apache.jackrabbit.name.QName;
+import org.apache.jackrabbit.name.NameFormat;
+import org.apache.jackrabbit.name.Path;
+import org.apache.jackrabbit.name.PathFormat;
+import org.apache.jackrabbit.name.NoPrefixDeclaredException;
+
+import java.io.IOException;
+import java.io.DataOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+
+/**
+ * Defines methods to write members to a file record.
+ */
+class FileRecordOutput {
+
+    /**
+     * Underlying output stream.
+     */
+    private final DataOutputStream out;
+
+    /**
+     * Name resolver.
+     */
+    private final NamespaceResolver resolver;
+
+    /**
+     * UUID index.
+     */
+    private final ArrayList uuidIndex = new ArrayList();
+
+    /**
+     * Flag indicating whether this output is closed.
+     */
+    private boolean closed;
+
+    /**
+     * Create a new file record.
+     *
+     * @param out      outputstream to write to
+     * @param resolver namespace resolver
+     */
+    public FileRecordOutput(OutputStream out, NamespaceResolver resolver) {
+        this.out = new DataOutputStream(out);
+        this.resolver = resolver;
+    }
+
+    /**
+     * Write a byte to the underlying stream.
+     *
+     * @param n byte
+     * @throws IOException if an I/O error occurs
+     */
+    public void writeByte(int n) throws IOException {
+        checkOpen();
+
+        out.writeByte(n);
+    }
+
+    /**
+     * Write a character to the underlying stream.
+     *
+     * @param c character
+     * @throws IOException if an I/O error occurs
+     */
+    public void writeChar(char c) throws IOException {
+        checkOpen();
+
+        out.writeChar(c);
+    }
+
+    /**
+     * Write a boolean from the underlying stream.
+     *
+     * @param b boolean
+     * @throws IOException if an I/O error occurs
+     */
+    public void writeBoolean(boolean b) throws IOException {
+        checkOpen();
+
+        out.writeBoolean(b);
+    }
+
+    /**
+     * Write an integer to the underlying stream.
+     *
+     * @param n integer
+     * @throws IOException if an I/O error occurs
+     */
+    public void writeInt(int n) throws IOException {
+        checkOpen();
+
+        out.writeInt(n);
+    }
+
+    /**
+     * Write a string from the underlying stream.
+     *
+     * @param s string
+     * @throws IOException if an I/O error occurs
+     */
+    public void writeString(String s) throws IOException {
+        checkOpen();
+
+        out.writeUTF(s);
+    }
+
+    /**
+     * Write a <code>QName</code>.
+     *
+     * @param name name
+     * @throws IOException if an I/O error occurs
+     * @throws NoPrefixDeclaredException if the prefix is not declared
+     */
+    public void writeQName(QName name) throws IOException, NoPrefixDeclaredException {
+        checkOpen();
+
+        writeString(NameFormat.format(name, resolver));
+    }
+
+    /**
+     * Write a <code>PathElement</code>.
+     *
+     * @param element path element
+     * @throws IOException if an I/O error occurs
+     * @throws NoPrefixDeclaredException if the prefix is not declared
+     */
+    public void writePathElement(Path.PathElement element) throws IOException, NoPrefixDeclaredException {
+        checkOpen();
+
+        writeQName(element.getName());
+        writeInt(element.getIndex());
+    }
+
+    /**
+     * Write a <code>Path</code>.
+     *
+     * @param path path
+     * @throws IOException if an I/O error occurs
+     * @throws NoPrefixDeclaredException if the prefix is not declared
+     */
+    public void writePath(Path path) throws IOException, NoPrefixDeclaredException {
+        checkOpen();
+
+        writeString(PathFormat.format(path, resolver));
+    }
+
+    /**
+     * Write a <code>NodeId</code>. Since the same node ids are likely to appear multiple times,
+     * only the first one will actually be literally appended, while all other reference the
+     * previous entry's index.
+     *
+     * @param nodeId node id
+     * @throws IOException if an I/O error occurs
+     */
+    public void writeNodeId(NodeId nodeId) throws IOException {
+        checkOpen();
+
+        if (nodeId == null) {
+            writeByte(FileRecord.UUID_INDEX);
+            writeInt(-1);
+        } else {
+            int index = getOrCreateIndex(nodeId);
+            if (index != -1) {
+                writeByte(FileRecord.UUID_INDEX);
+                writeInt(index);
+            } else {
+                writeByte(FileRecord.UUID_LITERAL);
+                writeString(nodeId.toString());
+            }
+        }
+    }
+
+    /**
+     * Write a <code>PropertyId</code>
+     *
+     * @param propertyId property id
+     * @throws IOException if an I/O error occurs
+     * @throws NoPrefixDeclaredException if the prefix is not declared
+     */
+    public void writePropertyId(PropertyId propertyId) throws IOException, NoPrefixDeclaredException {
+        checkOpen();
+
+        writeNodeId(propertyId.getParentId());
+        writeQName(propertyId.getName());
+    }
+
+    /**
+     * Close this output.
+     *
+     * @throws IOException if an I/O error occurs
+     */
+    public void close() throws IOException {
+        checkOpen();
+
+        try {
+            out.close();
+        } finally {
+            closed = true;
+        }
+    }
+
+    /**
+     * Get a <code>NodeId</code>'s existing cache index, creating a new entry if necesary.
+     *
+     * @param nodeId nodeId to lookup
+     * @return cache index of existing entry or <code>-1</code> to indicate the entry was added
+     */
+    private int getOrCreateIndex(NodeId nodeId) {
+        int index = uuidIndex.indexOf(nodeId);
+        if (index == -1) {
+            uuidIndex.add(nodeId);
+        }
+        return index;
+    }
+
+    /**
+     * Check that this output is open, throw otherwise.
+     *
+     * @throws IllegalStateException if output is closed.
+     */
+    private void checkOpen() throws IllegalStateException {
+        if (closed) {
+            throw new IllegalStateException("Output closed.");
+        }
+    }
+}



Mime
View raw message