jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dpfis...@apache.org
Subject svn commit: r495239 [1/2] - /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/
Date Thu, 11 Jan 2007 14:40:01 GMT
Author: dpfister
Date: Thu Jan 11 06:40:00 2007
New Revision: 495239

URL: http://svn.apache.org/viewvc?view=rev&rev=495239
Log:
JCR-702: Allow database as backend for clustering
JCR-703: Add signature and major/minor version to the journal files used for clustering 

Added:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/AbstractJournal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/DatabaseJournal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/Record.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/RecordInput.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/RecordOutput.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/default.ddl
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/derby.ddl
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/h2.ddl
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/oracle.ddl
Removed:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java
Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRevision.java

Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/AbstractJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/AbstractJournal.java?view=auto&rev=495239
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/AbstractJournal.java (added)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/AbstractJournal.java Thu Jan 11 06:40:00 2007
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.cluster;
+
+import org.apache.jackrabbit.core.NodeId;
+import org.apache.jackrabbit.core.nodetype.NodeTypeDef;
+import org.apache.jackrabbit.core.nodetype.compact.ParseException;
+import org.apache.jackrabbit.core.state.ChangeLog;
+import org.apache.jackrabbit.core.state.ItemState;
+import org.apache.jackrabbit.core.state.NodeState;
+import org.apache.jackrabbit.core.state.PropertyState;
+import org.apache.jackrabbit.core.observation.EventState;
+import org.apache.jackrabbit.core.observation.EventStateCollection;
+import org.apache.jackrabbit.name.Path;
+import org.apache.jackrabbit.name.QName;
+import org.apache.jackrabbit.name.NamespaceResolver;
+import org.apache.jackrabbit.name.NoPrefixDeclaredException;
+import org.apache.jackrabbit.name.NameException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.io.FileOutputStream;
+import java.io.DataOutputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collection;
+
+import EDU.oswego.cs.dl.util.concurrent.Mutex;
+
+import javax.jcr.observation.Event;
+import javax.jcr.Session;
+
+/**
+ * Base journal implementation, providing common functionality.
+ * <p/>
+ * It manages the following bean properties :
+ * <ul>
+ * <li><code>revision</code>: the filename where the parent cluster node's revision
+ * file should be written to; this is a required property with no default value</li>
+ * </ul>
+ */
+public abstract class AbstractJournal implements Journal {
+
+    /**
+     * Logger.
+     */
+    private static Logger log = LoggerFactory.getLogger(AbstractJournal.class);
+
+    /**
+     * Journal id.
+     */
+    protected String id;
+
+    /**
+     * Namespace resolver used to map prefixes to URIs and vice-versa.
+     */
+    protected NamespaceResolver resolver;
+
+    /**
+     * Record processor.
+     */
+    private RecordProcessor processor;
+
+    /**
+     * Mutex used when writing journal.
+     */
+    private final Mutex writeMutex = new Mutex();
+
+    /**
+     * Revision file name, bean property.
+     */
+    private String revision;
+
+    /**
+     * Current temporary journal log.
+     */
+    private File tempLog;
+
+    /**
+     * Current file record output.
+     */
+    private RecordOutput out;
+
+    /**
+     * Last used session for event sources.
+     */
+    private Session lastSession;
+
+    /**
+     * Next revision that will be available.
+     */
+    private long nextRevision;
+
+    /**
+     * Instance counter, file-based.
+     */
+    private FileRevision instanceRevision;
+
+    /**
+     * Bean getter for revision file.
+     * @return revision file
+     */
+    public String getRevision() {
+        return revision;
+    }
+
+    /**
+     * Bean setter for journal directory.
+     * @param revision directory used for journaling
+     */
+    public void setRevision(String revision) {
+        this.revision = revision;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void init(String id, RecordProcessor processor, NamespaceResolver resolver)
+            throws JournalException {
+
+        this.id = id;
+        this.resolver = resolver;
+        this.processor = processor;
+
+        if (revision == null) {
+            String msg = "Revision not specified.";
+            throw new JournalException(msg);
+        }
+        instanceRevision = new FileRevision(new File(revision));
+    }
+
+    /**
+     * Process a record.
+     *
+     * @param revision revision
+     * @param in record data
+     * @throws JournalException if an error occurs
+     */
+    protected void process(long revision, RecordInput in) throws JournalException {
+        log.info("Processing revision: " + revision);
+
+        String workspace = null;
+
+        try {
+            workspace = in.readString();
+            processor.start(workspace);
+
+            for (;;) {
+                char c = in.readChar();
+                if (c == '\0') {
+                    break;
+                }
+                if (c == 'N') {
+                    NodeOperation operation = NodeOperation.create(in.readByte());
+                    operation.setId(in.readNodeId());
+                    processor.process(operation);
+                } else if (c == 'P') {
+                    PropertyOperation operation = PropertyOperation.create(in.readByte());
+                    operation.setId(in.readPropertyId());
+                    processor.process(operation);
+                } else if (c == 'E') {
+                    int type = in.readByte();
+                    NodeId parentId = in.readNodeId();
+                    Path parentPath = in.readPath();
+                    NodeId childId = in.readNodeId();
+                    Path.PathElement childRelPath = in.readPathElement();
+                    QName ntName = in.readQName();
+
+                    Set mixins = new HashSet();
+                    int mixinCount = in.readInt();
+                    for (int i = 0; i < mixinCount; i++) {
+                        mixins.add(in.readQName());
+                    }
+                    String userId = in.readString();
+                    processor.process(createEventState(type, parentId, parentPath, childId,
+                            childRelPath, ntName, mixins, userId));
+                } else if (c == 'L') {
+                    NodeId nodeId = in.readNodeId();
+                    boolean isLock = in.readBoolean();
+                    if (isLock) {
+                        boolean isDeep = in.readBoolean();
+                        String owner = in.readString();
+                        processor.process(nodeId, isDeep, owner);
+                    } else {
+                        processor.process(nodeId);
+                    }
+                } else if (c == 'S') {
+                    String oldPrefix = in.readString();
+                    String newPrefix = in.readString();
+                    String uri = in.readString();
+                    processor.process(oldPrefix, newPrefix, uri);
+                } else if (c == 'T') {
+                    int size = in.readInt();
+                    HashSet ntDefs = new HashSet();
+                    for (int i = 0; i < size; i++) {
+                        ntDefs.add(in.readNodeTypeDef());
+                    }
+                    processor.process(ntDefs);
+                } else {
+                    throw new IllegalArgumentException("Unknown entry type: " + c);
+                }
+            }
+            processor.end();
+
+        } catch (NameException e) {
+            String msg = "Unable to read revision '" + revision + "'.";
+            throw new JournalException(msg, e);
+        } catch (ParseException e) {
+            String msg = "Unable to read revision '" + revision + "'.";
+            throw new JournalException(msg, e);
+        } catch (IOException e) {
+            String msg = "Unable to read revision '" + revision + "'.";
+            throw new JournalException(msg, e);
+        } catch (IllegalArgumentException e) {
+            String msg = "Error while processing revision " +
+                    revision + ": " + e.getMessage();
+            throw new JournalException(msg);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void begin(String workspace) throws JournalException {
+        try {
+            writeMutex.acquire();
+        } catch (InterruptedException e) {
+            String msg = "Interrupted while waiting for write lock.";
+            throw new JournalException(msg);
+        }
+
+        boolean succeeded = false;
+
+        try {
+            sync();
+
+            tempLog = File.createTempFile("journal", ".tmp");
+
+            out = new RecordOutput(new DataOutputStream(
+                    new FileOutputStream(tempLog)), resolver);
+            out.writeString(workspace);
+
+            succeeded = true;
+        } catch (IOException e) {
+            String msg = "Unable to create journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        } finally {
+            if (!succeeded) {
+                writeMutex.release();
+            }
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void log(ChangeLog changeLog, EventStateCollection esc) throws JournalException {
+        Iterator addedStates = changeLog.addedStates();
+        while (addedStates.hasNext()) {
+            ItemState state = (ItemState) addedStates.next();
+            if (state.isNode()) {
+                log(NodeAddedOperation.create((NodeState) state));
+            } else {
+                log(PropertyAddedOperation.create((PropertyState) state));
+            }
+        }
+        Iterator modifiedStates = changeLog.modifiedStates();
+        while (modifiedStates.hasNext()) {
+            ItemState state = (ItemState) modifiedStates.next();
+            if (state.isNode()) {
+                log(NodeModifiedOperation.create((NodeState) state));
+            } else {
+                log(PropertyModifiedOperation.create((PropertyState) state));
+            }
+        }
+        Iterator deletedStates = changeLog.deletedStates();
+        while (deletedStates.hasNext()) {
+            ItemState state = (ItemState) deletedStates.next();
+            if (state.isNode()) {
+                log(NodeDeletedOperation.create((NodeState) state));
+            } else {
+                log(PropertyDeletedOperation.create((PropertyState) state));
+            }
+        }
+
+        Iterator events = esc.getEvents().iterator();
+        while (events.hasNext()) {
+            EventState event = (EventState) events.next();
+            log(event);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void log(String oldPrefix, String newPrefix, String uri) throws JournalException {
+        try {
+            out.writeChar('S');
+            out.writeString(oldPrefix);
+            out.writeString(newPrefix);
+            out.writeString(uri);
+        } catch (IOException e) {
+            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void log(NodeId nodeId, boolean isDeep, String owner) throws JournalException {
+        log(nodeId, true, isDeep, owner);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void log(NodeId nodeId) throws JournalException {
+        log(nodeId, false, false, null);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void log(Collection ntDefs) throws JournalException {
+        try {
+            out.writeChar('T');
+            out.writeInt(ntDefs.size());
+
+            Iterator iter = ntDefs.iterator();
+            while (iter.hasNext()) {
+                out.writeNodeTypeDef((NodeTypeDef) iter.next());
+            }
+        } catch (IOException e) {
+            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        }
+
+    }
+
+    /**
+     * Log a property operation.
+     *
+     * @param operation property operation
+     */
+    protected void log(PropertyOperation operation) throws JournalException {
+        try {
+            out.writeChar('P');
+            out.writeByte(operation.getOperationType());
+            out.writePropertyId(operation.getId());
+        } catch (NoPrefixDeclaredException e) {
+            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        } catch (IOException e) {
+            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        }
+    }
+
+    /**
+     * Log a node operation.
+     *
+     * @param operation node operation
+     */
+    protected void log(NodeOperation operation) throws JournalException {
+        try {
+            out.writeChar('N');
+            out.writeByte(operation.getOperationType());
+            out.writeNodeId(operation.getId());
+        } catch (IOException e) {
+            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        }
+    }
+
+    /**
+     * Log an event. Subclass responsibility.
+     *
+     * @param event event to log
+     */
+    protected void log(EventState event) throws JournalException {
+        try {
+            out.writeChar('E');
+            out.writeByte(event.getType());
+            out.writeNodeId(event.getParentId());
+            out.writePath(event.getParentPath());
+            out.writeNodeId(event.getChildId());
+            out.writePathElement(event.getChildRelPath());
+            out.writeQName(event.getNodeType());
+
+            Set mixins = event.getMixinNames();
+            out.writeInt(mixins.size());
+            Iterator iter = mixins.iterator();
+            while (iter.hasNext()) {
+                out.writeQName((QName) iter.next());
+            }
+            out.writeString(event.getUserId());
+        } catch (NoPrefixDeclaredException e) {
+            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        } catch (IOException e) {
+            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        }
+    }
+
+    /**
+     * Log either a lock or an unlock operation.
+     *
+     * @param nodeId node id
+     * @param isLock <code>true</code> if this is a lock;
+     *               <code>false</code> if this is an unlock
+     * @param isDeep flag indicating whether lock is deep
+     * @param owner lock owner
+     */
+    protected void log(NodeId nodeId, boolean isLock, boolean isDeep, String owner)
+            throws JournalException {
+
+        try {
+            out.writeChar('L');
+            out.writeNodeId(nodeId);
+            out.writeBoolean(isLock);
+            if (isLock) {
+                out.writeBoolean(isDeep);
+                out.writeString(owner);
+            }
+        } catch (IOException e) {
+            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        }
+    }
+
+    /**
+     * Lock the global revision, disallowing changes from other sources until
+     * {@link #unlockRevision} has been called.
+     *
+     * @return current global revision, passed to {@link #append} when changes
+     *         are committed
+     * @throws JournalException
+     */
+    protected abstract long lockRevision() throws JournalException;
+
+    /**
+     * Unlock the global revision. An additional flag indicates whether the
+     * append operation was successful.
+     *
+     * @param successful whether the append operation was successful
+     */
+    protected abstract void unlockRevision(boolean successful);
+
+    /**
+     * {@inheritDoc}
+     */
+    public void prepare() throws JournalException {
+        nextRevision = lockRevision();
+
+        boolean succeeded = false;
+
+        try {
+            sync();
+
+            succeeded = true;
+        } finally {
+            if (!succeeded) {
+                unlockRevision(false);
+                writeMutex.release();
+            }
+        }
+    }
+
+    /**
+     * Append the given record to the journal. On exit, the global and local revision
+     * should have been updated as well.
+     *
+     * @param revision record revision, as returned by {@link #lockRevision()}
+     * @param record record to append
+     * @throws JournalException if an error occurs
+     */
+    protected abstract void append(long revision, File record) throws JournalException;
+
+    /**
+     * Returns the current local revision.
+     *
+     * @return current local revision
+     * @throws JournalException if an error occurs
+     */
+    protected long getLocalRevision() throws JournalException {
+        return instanceRevision.get();
+    }
+
+    /**
+     * Sets the current local revision.
+     *
+     * @param revision revision
+     * @throws JournalException if an error occurs
+     */
+    protected void setLocalRevision(long revision) throws JournalException {
+        instanceRevision.set(revision);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void commit() throws JournalException {
+        boolean succeeded = false;
+
+        try {
+            out.writeChar('\0');
+            out.close();
+
+            append(nextRevision, tempLog);
+
+            succeeded = true;
+
+        } catch (IOException e) {
+            String msg = "Unable to close journal log " + tempLog + ": " + e.getMessage();
+            throw new JournalException(msg);
+        } finally {
+            out = null;
+            tempLog.delete();
+            unlockRevision(succeeded);
+            writeMutex.release();
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void cancel() {
+        if (out != null) {
+            try {
+                out.close();
+                tempLog.delete();
+            } catch (IOException e) {
+                String msg = "Unable to close journal log " + tempLog + ": " + e.getMessage();
+                log.warn(msg);
+            } finally {
+                out = null;
+                unlockRevision(false);
+                writeMutex.release();
+            }
+        }
+    }
+
+    /**
+     * Create an event state.
+     *
+     * @param type event type
+     * @param parentId parent id
+     * @param parentPath parent path
+     * @param childId child id
+     * @param childRelPath child relative path
+     * @param ntName ndoe type name
+     * @param userId user id
+     * @return event
+     */
+    protected EventState createEventState(int type, NodeId parentId, Path parentPath,
+                                          NodeId childId, Path.PathElement childRelPath,
+                                          QName ntName, Set mixins, String userId) {
+        switch (type) {
+            case Event.NODE_ADDED:
+                return EventState.childNodeAdded(parentId, parentPath, childId, childRelPath,
+                        ntName, mixins, getOrCreateSession(userId));
+            case Event.NODE_REMOVED:
+                return EventState.childNodeRemoved(parentId, parentPath, childId, childRelPath,
+                        ntName, mixins, getOrCreateSession(userId));
+            case Event.PROPERTY_ADDED:
+                return EventState.propertyAdded(parentId, parentPath, childRelPath,
+                        ntName, mixins, getOrCreateSession(userId));
+            case Event.PROPERTY_CHANGED:
+                return EventState.propertyChanged(parentId, parentPath, childRelPath,
+                        ntName, mixins, getOrCreateSession(userId));
+            case Event.PROPERTY_REMOVED:
+                return EventState.propertyRemoved(parentId, parentPath, childRelPath,
+                        ntName, mixins, getOrCreateSession(userId));
+            default:
+                String msg = "Unexpected event type: " + type;
+                throw new IllegalArgumentException(msg);
+        }
+    }
+
+
+    /**
+     * Return a session matching a certain user id.
+     *
+     * @param userId user id
+     * @return session
+     */
+    protected Session getOrCreateSession(String userId) {
+        if (lastSession == null || !lastSession.getUserID().equals(userId)) {
+            lastSession = new ClusterSession(userId);
+        }
+        return lastSession;
+    }
+}

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java?view=diff&rev=495239&r1=495238&r2=495239
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java Thu Jan 11 06:40:00 2007
@@ -320,10 +320,9 @@
 
     /**
      * Return the instance id to be used for this node in the cluster.
-     *
      * @param id configured id, <code>null</code> to take random id
      */
-    private String getClusterNodeId(String id) {
+    private String getClusterNodeId(String id) throws ClusterException {
         if (id == null) {
             id = System.getProperty(SYSTEM_PROPERTY_NODE_ID);
             if (id == null) {

Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/DatabaseJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/DatabaseJournal.java?view=auto&rev=495239
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/DatabaseJournal.java (added)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/DatabaseJournal.java Thu Jan 11 06:40:00 2007
@@ -0,0 +1,583 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.cluster;
+
+import org.apache.jackrabbit.name.NamespaceResolver;
+import org.apache.jackrabbit.util.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.DataInputStream;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.DatabaseMetaData;
+import java.sql.Statement;
+
+/**
+ * Database-based journal implementation. Stores records inside a database.
+ * <p/>
+ * It is configured through the following properties:
+ * <ul>
+ * <li><code>revision</code>: the filename where the parent cluster node's revision
+ * file should be written to; this is a required property with no default value</li>
+ * <li><code>driver</code>: the JDBC driver class name to use; this is a required
+ * property with no default value</li>
+ * <li><code>url</code>: the JDBC connection url; this is a required property with
+ * no default value </li>
+ * <li><code>schema</code>: the schema to be used; if not specified, this is the
+ * second field inside the JDBC connection url, delimeted by colons</li>
+ * <li><code>schemaObjectPrefix</code>: the schema object prefix to be used;
+ * defaults to an empty string</li>
+ * <li><code>user</code>: username to specify when connecting</li>
+ * <li><code>password</code>: password to specify when connecting</li>
+ * <li><code>schema</code>: </li>
+ * </ul>
+ * This implementation maintains a database table, containing exactly one record with the
+ * last available revision.
+ */
+public class DatabaseJournal extends AbstractJournal {
+
+    /**
+     * Schema object prefix.
+     */
+    private static final String SCHEMA_OBJECT_PREFIX_VARIABLE =
+            "${schemaObjectPrefix}";
+
+    /**
+     * Default DDL script name.
+     */
+    private static final String DEFAULT_DDL_NAME = "default.ddl";
+
+    /**
+     * Logger.
+     */
+    private static Logger log = LoggerFactory.getLogger(DatabaseJournal.class);
+
+    /**
+     * Driver name, bean property.
+     */
+    private String driver;
+
+    /**
+     * Connection URL, bean property.
+     */
+    private String url;
+
+    /**
+     * Schema name, bean property.
+     */
+    private String schema;
+
+    /**
+     * Schema object prefix, bean property.
+     */
+    protected String schemaObjectPrefix;
+
+    /**
+     * User name, bean property.
+     */
+    private String user;
+
+    /**
+     * Password, bean property.
+     */
+    private String password;
+
+    /**
+     * JDBC Connection used.
+     */
+    private Connection con;
+
+    /**
+     * Statement returning all revisions within a range.
+     */
+    private PreparedStatement selectRevisionsStmt;
+
+    /**
+     * Statement updating the global revision.
+     */
+    private PreparedStatement updateGlobalStmt;
+
+    /**
+     * Statement returning the global revision.
+     */
+    private PreparedStatement selectGlobalStmt;
+
+    /**
+     * Statement appending a new record.
+     */
+    private PreparedStatement insertRevisionStmt;
+
+    /**
+     * Bean getter for driver.
+     * @return driver
+     */
+    public String getDriver() {
+        return driver;
+    }
+
+    /**
+     * Bean setter for driver.
+     * @param driver driver
+     */
+    public void setDriver(String driver) {
+        this.driver = driver;
+    }
+
+    /**
+     * Bean getter for url.
+     * @return url
+     */
+    public String getUrl() {
+        return url;
+    }
+
+    /**
+     * Bean setter for url.
+     * @param url url
+     */
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    /**
+     * Bean getter for schema.
+     * @return schema
+     */
+    public String getSchema() {
+        return schema;
+    }
+
+    /**
+     * Bean getter for schema.
+     * @param schema schema
+     */
+    public void setSchema(String schema) {
+        this.schema = schema;
+    }
+
+    /**
+     * Bean getter for schema object prefix.
+     * @return schema object prefix
+     */
+    public String getSchemaObjectPrefix() {
+        return schemaObjectPrefix;
+    }
+
+    /**
+     * Bean getter for schema object prefix.
+     * @param schemaObjectPrefix schema object prefix
+     */
+    public void setSchemaObjectPrefix(String schemaObjectPrefix) {
+        this.schemaObjectPrefix = schemaObjectPrefix.toUpperCase();
+    }
+
+    /**
+     * Bean getter for user.
+     * @return user
+     */
+    public String getUser() {
+        return user;
+    }
+
+    /**
+     * Bean setter for user.
+     * @param user user
+     */
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    /**
+     * Bean getter for password.
+     * @return password
+     */
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * Bean setter for password.
+     * @param password password
+     */
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void init(String id, RecordProcessor processor, NamespaceResolver resolver)
+            throws JournalException {
+        
+        super.init(id, processor, resolver);
+
+        if (driver == null) {
+            String msg = "Driver not specified.";
+            throw new JournalException(msg);
+        }
+        if (url == null) {
+            String msg = "Connection URL not specified.";
+            throw new JournalException(msg);
+        }
+        try {
+            if (schema == null) {
+                schema = getSchemaFromURL(url);
+            }
+            if (schemaObjectPrefix == null) {
+                schemaObjectPrefix = "";
+            }
+        } catch (IllegalArgumentException e) {
+            String msg = "Unable to derive schema from URL: " + e.getMessage();
+            throw new JournalException(msg);
+        }
+        try {
+            Class.forName(driver);
+            con = DriverManager.getConnection(url, user, password);
+            con.setAutoCommit(false);
+
+            checkSchema();
+            prepareStatements();
+        } catch (Exception e) {
+            String msg = "Unable to initialize connection.";
+            throw new JournalException(msg, e);
+        }
+        log.info("DatabaseJournal initialized at URL: " + url);
+    }
+
+    /**
+     * Derive a schema from a JDBC connection URL. This simply treats the given URL
+     * as delimeted by colons and takes the 2nd field.
+     *
+     * @param url JDBC connection URL
+     * @return schema
+     * @throws IllegalArgumentException if the JDBC connection URL is invalid
+     */
+    private String getSchemaFromURL(String url) throws IllegalArgumentException {
+        int start = url.indexOf(':');
+        if (start != -1) {
+            int end = url.indexOf(':', start + 1);
+            if (end != -1) {
+                return url.substring(start + 1, end);
+            }
+        }
+        throw new IllegalArgumentException(url);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void sync() throws JournalException {
+        long oldRevision = getLocalRevision();
+        ResultSet rs = null;
+
+        try {
+            selectRevisionsStmt.clearParameters();
+            selectRevisionsStmt.clearWarnings();
+            selectRevisionsStmt.setLong(1, oldRevision);
+            selectRevisionsStmt.execute();
+
+            rs = selectRevisionsStmt.getResultSet();
+            while (rs.next()) {
+                long revision = rs.getLong(1);
+                String creator = rs.getString(2);
+                if (!creator.equals(id)) {
+                    DataInputStream in = new DataInputStream(rs.getBinaryStream(3));
+                    try {
+                        process(revision, in);
+                    } catch (IllegalArgumentException e) {
+                        String msg = "Error while processing revision " +
+                                revision + ": " + e.getMessage();
+                        throw new JournalException(msg);
+                    } finally {
+                        close(in);
+                    }
+                } else {
+                    log.info("Log entry matches journal id, skipped: " + revision);
+                }
+                setLocalRevision(revision);
+            }
+        } catch (SQLException e) {
+            String msg = "Unable to iterate over modified records.";
+            throw new JournalException(msg, e);
+        } finally {
+            close(rs);
+        }
+
+        long currentRevision = getLocalRevision();
+        if (oldRevision < currentRevision) {
+            log.info("Sync finished, instance revision is: " + currentRevision);
+        }
+    }
+
+    /**
+     * Process a record.
+     *
+     * @param revision revision
+     * @param dataIn data input
+     * @throws JournalException if an error occurs
+     */
+    private void process(long revision, DataInputStream dataIn) throws JournalException {
+        RecordInput in = new RecordInput(dataIn, resolver);
+
+        try {
+            process(revision, in);
+        } finally {
+            in.close();
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    protected long lockRevision() throws JournalException {
+        ResultSet rs = null;
+        boolean succeeded = false;
+
+        try {
+            updateGlobalStmt.clearParameters();
+            updateGlobalStmt.clearWarnings();
+            updateGlobalStmt.execute();
+
+            selectGlobalStmt.clearParameters();
+            selectGlobalStmt.clearWarnings();
+            selectGlobalStmt.execute();
+
+            rs = selectGlobalStmt.getResultSet();
+            if (!rs.next()) {
+                 throw new JournalException("No revision available.");
+            }
+            long globalRevision = rs.getLong(1);
+            succeeded = true;
+            return globalRevision;
+
+        } catch (SQLException e) {
+            String msg = "Unable to lock global revision table: " + e.getMessage();
+            throw new JournalException(msg);
+        } finally {
+            close(rs);
+            if (!succeeded) {
+                rollback(con);
+            }
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    protected void unlockRevision(boolean successful) {
+        if (!successful) {
+            rollback(con);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    protected void append(long revision, File record) throws JournalException {
+        try {
+            InputStream in = new BufferedInputStream(new FileInputStream(record));
+
+            try {
+                insertRevisionStmt.clearParameters();
+                insertRevisionStmt.clearWarnings();
+                insertRevisionStmt.setLong(1, revision);
+                insertRevisionStmt.setString(2, id);
+                insertRevisionStmt.setBinaryStream(3, in, (int) record.length());
+                insertRevisionStmt.execute();
+
+                con.commit();
+
+                setLocalRevision(revision);
+            } finally {
+                in.close();
+            }
+        } catch (IOException e) {
+            String msg = "Unable to open journal log " + record + ": " + e.getMessage();
+            throw new JournalException(msg);
+        } catch (SQLException e) {
+            String msg = "Unable to append revision: "  + revision + ": " + e.getMessage();
+            throw new JournalException(msg);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void close() {
+        try {
+            con.close();
+        } catch (SQLException e) {
+            String msg = "Error while closing connection: " + e.getMessage();
+            log.warn(msg);
+        }
+    }
+
+    /**
+     * Close some input stream.
+     *
+     * @param in input stream, may be <code>null</code>.
+     */
+    private void close(InputStream in) {
+        try {
+            if (in != null) {
+                in.close();
+            }
+        } catch (IOException e) {
+            String msg = "Error while closing input stream: " + e.getMessage();
+            log.warn(msg);
+        }
+    }
+
+    /**
+     * Close some statement.
+     *
+     * @param stmt statement, may be <code>null</code>.
+     */
+    private void close(Statement stmt) {
+        try {
+            if (stmt != null) {
+                stmt.close();
+            }
+        } catch (SQLException e) {
+            String msg = "Error while closing statement: " + e.getMessage();
+            log.warn(msg);
+        }
+    }
+
+    /**
+     * Close some resultset.
+     *
+     * @param rs resultset, may be <code>null</code>.
+     */
+    private void close(ResultSet rs) {
+        try {
+            if (rs != null) {
+                rs.close();
+            }
+        } catch (SQLException e) {
+            String msg = "Error while closing result set: " + e.getMessage();
+            log.warn(msg);
+        }
+    }
+
+    /**
+     * Rollback a connection.
+     *
+     * @param con connection.
+     */
+    private void rollback(Connection con) {
+        try {
+            con.rollback();
+        } catch (SQLException e) {
+            String msg = "Error while rolling back connection: " + e.getMessage();
+            log.warn(msg);
+        }
+    }
+
+    /**
+     * Checks if the required schema objects exist and creates them if they
+     * don't exist yet.
+     *
+     * @throws Exception if an error occurs
+     */
+    private void checkSchema() throws Exception {
+        DatabaseMetaData metaData = con.getMetaData();
+        String tableName = schemaObjectPrefix + "JOURNAL";
+        if (metaData.storesLowerCaseIdentifiers()) {
+            tableName = tableName.toLowerCase();
+        } else if (metaData.storesUpperCaseIdentifiers()) {
+            tableName = tableName.toUpperCase();
+        }
+
+        ResultSet rs = metaData.getTables(null, null, tableName, null);
+        boolean schemaExists;
+        try {
+            schemaExists = rs.next();
+        } finally {
+            rs.close();
+        }
+
+        if (!schemaExists) {
+            // read ddl from resources
+            InputStream in = DatabaseJournal.class.getResourceAsStream(schema + ".ddl");
+            if (in == null) {
+                String msg = "No schema-specific DDL found: '" + schema + ".ddl" +
+                        "', falling back to '" + DEFAULT_DDL_NAME + "'.";
+                log.info(msg);
+                in = DatabaseJournal.class.getResourceAsStream(DEFAULT_DDL_NAME);
+                if (in == null) {
+                    msg = "Unable to load '" + DEFAULT_DDL_NAME + "'.";
+                    throw new JournalException(msg);
+                }
+            }
+            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+            Statement stmt = con.createStatement();
+            try {
+                String sql = reader.readLine();
+                while (sql != null) {
+                    // Skip comments and empty lines
+                    if (!sql.startsWith("#") && sql.length() > 0) {
+                        // replace prefix variable
+                        sql = Text.replace(sql, SCHEMA_OBJECT_PREFIX_VARIABLE, schemaObjectPrefix);
+                        // execute sql stmt
+                        stmt.executeUpdate(sql);
+                    }
+                    // read next sql stmt
+                    sql = reader.readLine();
+                }
+                // commit the changes
+                con.commit();
+            } finally {
+                close(in);
+                close(stmt);
+            }
+        }
+    }
+
+    /**
+     * Builds and prepares the SQL statements.
+     *
+     * @throws SQLException if an error occurs
+     */
+    private void prepareStatements() throws SQLException {
+        selectRevisionsStmt = con.prepareStatement(
+                "select REVISION_ID, REVISION_CREATOR, REVISION_DATA " +
+                "from " + schemaObjectPrefix + "JOURNAL " +
+                "where REVISION_ID > ?");
+        updateGlobalStmt = con.prepareStatement(
+                "update " + schemaObjectPrefix + "GLOBAL_REVISION " +
+                "set revision_id = revision_id + 1");
+        selectGlobalStmt = con.prepareStatement(
+                "select revision_id " +
+                "from " + schemaObjectPrefix + "GLOBAL_REVISION");
+        insertRevisionStmt = con.prepareStatement(
+                "insert into " + schemaObjectPrefix + "JOURNAL" +
+                "(REVISION_ID, REVISION_CREATOR, REVISION_DATA) " +
+                "values (?,?,?)");
+    }
+}

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java?view=diff&rev=495239&r1=495238&r2=495239
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java Thu Jan 11 06:40:00 2007
@@ -16,60 +16,45 @@
  */
 package org.apache.jackrabbit.core.cluster;
 
-import org.apache.jackrabbit.core.NodeId;
-import org.apache.jackrabbit.core.nodetype.NodeTypeDef;
-import org.apache.jackrabbit.core.nodetype.compact.ParseException;
-import org.apache.jackrabbit.core.state.ChangeLog;
-import org.apache.jackrabbit.core.state.ItemState;
-import org.apache.jackrabbit.core.state.NodeState;
-import org.apache.jackrabbit.core.state.PropertyState;
-import org.apache.jackrabbit.core.observation.EventState;
-import org.apache.jackrabbit.core.observation.EventStateCollection;
-import org.apache.jackrabbit.name.Path;
-import org.apache.jackrabbit.name.QName;
 import org.apache.jackrabbit.name.NamespaceResolver;
-import org.apache.jackrabbit.name.NoPrefixDeclaredException;
-import org.apache.jackrabbit.name.NameException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Collection;
-
-import EDU.oswego.cs.dl.util.concurrent.Mutex;
-
-import javax.jcr.observation.Event;
-import javax.jcr.Session;
 
 /**
  * File-based journal implementation. A directory specified as <code>directory</code>
  * bean property will contain log files and a global revision file, containing the
- * latest revision file. When the current log file's size exceeds <code>maxSize</code>
+ * next available revision. When the current log file's size exceeds <code>maxSize</code>
  * bytes, it gets renamed to its name appended by '1'. At the same time, all log files
  * already having a version counter, get their version counter incremented by <code>1</code>.
  * <p/>
  * It is configured through the following properties:
  * <ul>
- * <li><code>directory</code>: the shared directory where journal logs and read from
- * and written to; this is a required property with no default value</li>
  * <li><code>revision</code>: the filename where the parent cluster node's revision
  * file should be written to; this is a required property with no default value</li>
+ * <li><code>directory</code>: the shared directory where journal logs and read from
+ * and written to; this is a required property with no default value</li>
  * <li><code>basename</code>: this is the basename of the journal logs created in
  * the shared directory; its default value is <code>journal</code></li>
  * <li><code>maximumSize</code>: this is the maximum size in bytes of a journal log
  * before a new log will be created; its default value is <code>1048576</code> (1MB)</li>
  * </ul>
+ * <p/>
+ * Technically, the global revision file contains the cumulated file position, i.e. if
+ * there are <code>N</code> journal files, with file lengths <code>L[1]</code>...
+ * <code>L[N]</code> (excluding the size of the file headers), then the global revision
+ * will be L[1]+...+L[N].
  *
  * todo after some iterations, old files should be automatically compressed to save space
  */
-public class FileJournal implements Journal {
+public class FileJournal extends AbstractJournal {
 
     /**
      * Global revision counter name, located in the journal directory.
@@ -97,31 +82,11 @@
     private static Logger log = LoggerFactory.getLogger(FileJournal.class);
 
     /**
-     * Journal id.
-     */
-    private String id;
-
-    /**
-     * Namespace resolver used to map prefixes to URIs and vice-versa.
-     */
-    private NamespaceResolver resolver;
-
-    /**
-     * Record processor.
-     */
-    private RecordProcessor processor;
-
-    /**
      * Directory name, bean property.
      */
     private String directory;
 
     /**
-     * Revision file name, bean property.
-     */
-    private String revision;
-
-    /**
      * Journal file base name, bean property.
      */
     private String basename;
@@ -142,39 +107,14 @@
     private File journal;
 
     /**
-     * Instance counter.
-     */
-    private FileRevision instanceRevision;
-
-    /**
-     * Global journal counter.
+     * Global revision counter.
      */
     private FileRevision globalRevision;
 
     /**
-     * Mutex used when writing journal.
-     */
-    private final Mutex writeMutex = new Mutex();
-
-    /**
-     * Current temporary journal log.
+     * Id as byte array.
      */
-    private File tempLog;
-
-    /**
-     * Current file record output.
-     */
-    private FileRecordOutput out;
-
-    /**
-     * Current file record.
-     */
-    private FileRecord record;
-
-    /**
-     * Last used session for event sources.
-     */
-    private Session lastSession;
+    private byte[] rawId;
 
     /**
      * Bean getter for journal directory.
@@ -193,22 +133,6 @@
     }
 
     /**
-     * Bean getter for revision file.
-     * @return revision file
-     */
-    public String getRevision() {
-        return revision;
-    }
-
-    /**
-     * Bean setter for journal directory.
-     * @param revision directory used for journaling
-     */
-    public void setRevision(String revision) {
-        this.revision = revision;
-    }
-
-    /**
      * Bean getter for base name.
      * @return base name
      */
@@ -243,19 +167,15 @@
     /**
      * {@inheritDoc}
      */
-    public void init(String id, RecordProcessor processor, NamespaceResolver resolver) throws JournalException {
-        this.id = id;
-        this.resolver = resolver;
-        this.processor = processor;
+    public void init(String id, RecordProcessor processor, NamespaceResolver resolver)
+            throws JournalException {
+        
+        super.init(id, processor, resolver);
 
         if (directory == null) {
             String msg = "Directory not specified.";
             throw new JournalException(msg);
         }
-        if (revision == null) {
-            String msg = "Revision not specified.";
-            throw new JournalException(msg);
-        }
         if (basename == null) {
             basename = DEFAULT_BASENAME;
         }
@@ -267,9 +187,14 @@
             String msg = "Directory specified does either not exist or is not a directory: " + directory;
             throw new JournalException(msg);
         }
-        journal = new File(root, basename + "." + LOG_EXTENSION);
+        try {
+            rawId = toRawId(id);
+        } catch (IOException e) {
+            String msg = "Unable to convert '" + id + "' to its binary representation.";
+            throw new JournalException(msg, e);
+        }
 
-        instanceRevision = new FileRevision(new File(revision));
+        journal = new File(root, basename + "." + LOG_EXTENSION);
         globalRevision = new FileRevision(new File(root, REVISION_NAME));
 
         log.info("FileJournal initialized at path: " + directory);
@@ -292,35 +217,28 @@
             }
         });
 
-        long instanceValue = instanceRevision.get();
+        long instanceValue = getLocalRevision();
         long globalValue = globalRevision.get();
 
         if (instanceValue < globalValue) {
-            FileRecordCursor cursor = new FileRecordCursor(logFiles,
-                    instanceValue, globalValue);
+            FileRecordCursor cursor = new FileRecordCursor(logFiles, instanceValue, globalValue);
             try {
                 while (cursor.hasNext()) {
                     FileRecord record = cursor.next();
-                    if (!record.getCreator().equals(id)) {
+                    if (!Arrays.equals(rawId, record.getCreator())) {
                         process(record);
                     } else {
                         log.info("Log entry matches journal id, skipped: " + record.getRevision());
                     }
-                    instanceRevision.set(record.getNextRevision());
+                    setLocalRevision(record.getNextRevision());
                 }
             } catch (IOException e) {
-                String msg = "Unable to iterate over modified records: " + e.getMessage();
+                String msg = "Unable to iterate over modified records.";
                 throw new JournalException(msg, e);
-
             } finally {
-                try {
-                    cursor.close();
-                } catch (IOException e) {
-                    String msg = "I/O error while closing record cursor: " + e.getMessage();
-                    log.warn(msg);
-                }
+                cursor.close();
             }
-            log.info("Sync finished, instance revision is: " + instanceRevision.get());
+            log.info("Sync finished, instance revision is: " + getLocalRevision());
         }
     }
 
@@ -330,89 +248,11 @@
      * @param record record to process
      * @throws JournalException if an error occurs
      */
-    void process(FileRecord record) throws JournalException {
-        log.info("Processing revision: " + record.getRevision());
-
-        FileRecordInput in = record.getInput(resolver);
-        String workspace = null;
+    private void process(FileRecord record) throws JournalException {
+        RecordInput in = record.getInput(resolver);
 
         try {
-            workspace = in.readString();
-            processor.start(workspace);
-
-            for (;;) {
-                char c = in.readChar();
-                if (c == '\0') {
-                    break;
-                }
-                if (c == 'N') {
-                    NodeOperation operation = NodeOperation.create(in.readByte());
-                    operation.setId(in.readNodeId());
-                    processor.process(operation);
-                } else if (c == 'P') {
-                    PropertyOperation operation = PropertyOperation.create(in.readByte());
-                    operation.setId(in.readPropertyId());
-                    processor.process(operation);
-                } else if (c == 'E') {
-                    int type = in.readByte();
-                    NodeId parentId = in.readNodeId();
-                    Path parentPath = in.readPath();
-                    NodeId childId = in.readNodeId();
-                    Path.PathElement childRelPath = in.readPathElement();
-                    QName ntName = in.readQName();
-
-                    Set mixins = new HashSet();
-                    int mixinCount = in.readInt();
-                    for (int i = 0; i < mixinCount; i++) {
-                        mixins.add(in.readQName());
-                    }
-                    String userId = in.readString();
-                    processor.process(createEventState(type, parentId, parentPath, childId,
-                            childRelPath, ntName, mixins, userId));
-                } else if (c == 'L') {
-                    NodeId nodeId = in.readNodeId();
-                    boolean isLock = in.readBoolean();
-                    if (isLock) {
-                        boolean isDeep = in.readBoolean();
-                        String owner = in.readString();
-                        processor.process(nodeId, isDeep, owner);
-                    } else {
-                        processor.process(nodeId);
-                    }
-                } else if (c == 'S') {
-                    String oldPrefix = in.readString();
-                    String newPrefix = in.readString();
-                    String uri = in.readString();
-                    processor.process(oldPrefix, newPrefix, uri);
-                } else if (c == 'T') {
-                    int size = in.readInt();
-                    HashSet ntDefs = new HashSet();
-                    for (int i = 0; i < size; i++) {
-                        ntDefs.add(in.readNodeTypeDef());
-                    }
-                    processor.process(ntDefs);
-                } else {
-                    throw new IllegalArgumentException("Unknown entry type: " + c);
-                }
-            }
-            processor.end();
-
-        } catch (NameException e) {
-            String msg = "Unable to read revision " + record.getRevision() +
-                    ": " + e.getMessage();
-            throw new JournalException(msg);
-        } catch (ParseException e) {
-            String msg = "Unable to read revision " + record.getRevision() +
-                    ": " + e.getMessage();
-            throw new JournalException(msg);
-        } catch (IOException e) {
-            String msg = "Unable to read revision " + record.getRevision() +
-                    ": " + e.getMessage();
-            throw new JournalException(msg);
-        } catch (IllegalArgumentException e) {
-            String msg = "Error while processing revision " +
-                    record.getRevision() + ": " + e.getMessage();
-            throw new JournalException(msg);
+            process(record.getRevision(), in);
         } finally {
             in.close();
         }
@@ -421,286 +261,37 @@
     /**
      * {@inheritDoc}
      */
-    public void begin(String workspace) throws JournalException {
-        try {
-            writeMutex.acquire();
-        } catch (InterruptedException e) {
-            String msg = "Interrupted while waiting for write lock.";
-            throw new JournalException(msg);
-        }
-
-        boolean succeeded = false;
-
-        try {
-            sync();
-
-            tempLog = File.createTempFile("journal", ".tmp", root);
-
-            record = new FileRecord(id, tempLog);
-            out = record.getOutput(resolver);
-            out.writeString(workspace);
-
-            succeeded = true;
-        } catch (IOException e) {
-            String msg = "Unable to create journal log " + tempLog + ": " + e.getMessage();
-            throw new JournalException(msg);
-        } finally {
-            if (!succeeded) {
-                writeMutex.release();
-            }
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    public void log(ChangeLog changeLog, EventStateCollection esc) throws JournalException {
-        Iterator addedStates = changeLog.addedStates();
-        while (addedStates.hasNext()) {
-            ItemState state = (ItemState) addedStates.next();
-            if (state.isNode()) {
-                log(NodeAddedOperation.create((NodeState) state));
-            } else {
-                log(PropertyAddedOperation.create((PropertyState) state));
-            }
-        }
-        Iterator modifiedStates = changeLog.modifiedStates();
-        while (modifiedStates.hasNext()) {
-            ItemState state = (ItemState) modifiedStates.next();
-            if (state.isNode()) {
-                log(NodeModifiedOperation.create((NodeState) state));
-            } else {
-                log(PropertyModifiedOperation.create((PropertyState) state));
-            }
-        }
-        Iterator deletedStates = changeLog.deletedStates();
-        while (deletedStates.hasNext()) {
-            ItemState state = (ItemState) deletedStates.next();
-            if (state.isNode()) {
-                log(NodeDeletedOperation.create((NodeState) state));
-            } else {
-                log(PropertyDeletedOperation.create((PropertyState) state));
-            }
-        }
-
-        Iterator events = esc.getEvents().iterator();
-        while (events.hasNext()) {
-            EventState event = (EventState) events.next();
-            log(event);
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    public void log(String oldPrefix, String newPrefix, String uri) throws JournalException {
-        try {
-            out.writeChar('S');
-            out.writeString(oldPrefix);
-            out.writeString(newPrefix);
-            out.writeString(uri);
-        } catch (IOException e) {
-            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
-            throw new JournalException(msg);
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    public void log(NodeId nodeId, boolean isDeep, String owner) throws JournalException {
-        log(nodeId, true, isDeep, owner);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    public void log(NodeId nodeId) throws JournalException {
-        log(nodeId, false, false, null);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    public void log(Collection ntDefs) throws JournalException {
-        try {
-            out.writeChar('T');
-            out.writeInt(ntDefs.size());
-
-            Iterator iter = ntDefs.iterator();
-            while (iter.hasNext()) {
-                out.writeNodeTypeDef((NodeTypeDef) iter.next());
-            }
-        } catch (IOException e) {
-            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
-            throw new JournalException(msg);
-        }
-
-    }
-
-    /**
-     * Log a property operation.
-     *
-     * @param operation property operation
-     */
-    protected void log(PropertyOperation operation) throws JournalException {
-        try {
-            out.writeChar('P');
-            out.writeByte(operation.getOperationType());
-            out.writePropertyId(operation.getId());
-        } catch (NoPrefixDeclaredException e) {
-            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
-            throw new JournalException(msg);
-        } catch (IOException e) {
-            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
-            throw new JournalException(msg);
-        }
-    }
-
-    /**
-     * Log a node operation.
-     *
-     * @param operation node operation
-     */
-    protected void log(NodeOperation operation) throws JournalException {
-        try {
-            out.writeChar('N');
-            out.writeByte(operation.getOperationType());
-            out.writeNodeId(operation.getId());
-        } catch (IOException e) {
-            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
-            throw new JournalException(msg);
-        }
-    }
-
-    /**
-     * Log an event. Subclass responsibility.
-     *
-     * @param event event to log
-     */
-    protected void log(EventState event) throws JournalException {
-        try {
-            out.writeChar('E');
-            out.writeByte(event.getType());
-            out.writeNodeId(event.getParentId());
-            out.writePath(event.getParentPath());
-            out.writeNodeId(event.getChildId());
-            out.writePathElement(event.getChildRelPath());
-            out.writeQName(event.getNodeType());
-
-            Set mixins = event.getMixinNames();
-            out.writeInt(mixins.size());
-            Iterator iter = mixins.iterator();
-            while (iter.hasNext()) {
-                out.writeQName((QName) iter.next());
-            }
-            out.writeString(event.getUserId());
-        } catch (NoPrefixDeclaredException e) {
-            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
-            throw new JournalException(msg);
-        } catch (IOException e) {
-            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
-            throw new JournalException(msg);
-        }
-    }
-
-    /**
-     * Log either a lock or an unlock operation.
-     *
-     * @param nodeId node id
-     * @param isLock <code>true</code> if this is a lock;
-     *               <code>false</code> if this is an unlock
-     * @param isDeep flag indicating whether lock is deep
-     * @param owner lock owner
-     */
-    protected void log(NodeId nodeId, boolean isLock, boolean isDeep, String owner)
-            throws JournalException {
-
-        try {
-            out.writeChar('L');
-            out.writeNodeId(nodeId);
-            out.writeBoolean(isLock);
-            if (isLock) {
-                out.writeBoolean(isDeep);
-                out.writeString(owner);
-            }
-        } catch (IOException e) {
-            String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage();
-            throw new JournalException(msg);
-        }
+    protected long lockRevision() throws JournalException {
+        globalRevision.lock(false);
+        return globalRevision.get();
     }
 
     /**
      * {@inheritDoc}
      */
-    public void prepare() throws JournalException {
-        globalRevision.lock(false);
-
-        boolean prepared = false;
-
-        try {
-            sync();
-
-            record.setRevision(globalRevision.get());
-
-            prepared = true;
-        } finally {
-            if (!prepared) {
-                globalRevision.unlock();
-                writeMutex.release();
-            }
-        }
+    protected void unlockRevision(boolean successful) {
+        globalRevision.unlock();
     }
 
     /**
      * {@inheritDoc}
      */
-    public void commit() throws JournalException {
+    protected void append(long revision, File record) throws JournalException {
         try {
-            out.writeChar('\0');
-            out.close();
-
-            long nextRevision = record.getNextRevision();
-
             FileRecordLog recordLog = new FileRecordLog(journal);
             if (!recordLog.isNew()) {
-                if (nextRevision - recordLog.getFirstRevision() > maximumSize) {
+                if (revision - recordLog.getFirstRevision() > maximumSize) {
                     switchLogs();
                     recordLog = new FileRecordLog(journal);
                 }
             }
-            recordLog.append(record);
-
-            tempLog.delete();
+            long nextRevision = recordLog.append(revision, rawId, record);
             globalRevision.set(nextRevision);
-            instanceRevision.set(nextRevision);
+            setLocalRevision(nextRevision);
 
         } catch (IOException e) {
-            String msg = "Unable to close journal log " + tempLog + ": " + e.getMessage();
+            String msg = "Unable to append new record to journal " + journal + ": " + e.getMessage();
             throw new JournalException(msg);
-        } finally {
-            out = null;
-            globalRevision.unlock();
-            writeMutex.release();
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    public void cancel() {
-        if (out != null) {
-            try {
-                out.close();
-                tempLog.delete();
-            } catch (IOException e) {
-                String msg = "Unable to close journal log " + tempLog + ": " + e.getMessage();
-                log.warn(msg);
-            } finally {
-                out = null;
-                globalRevision.unlock();
-                writeMutex.release();
-            }
         }
     }
 
@@ -710,57 +301,6 @@
     public void close() {}
 
     /**
-     * Create an event state.
-     *
-     * @param type event type
-     * @param parentId parent id
-     * @param parentPath parent path
-     * @param childId child id
-     * @param childRelPath child relative path
-     * @param ntName ndoe type name
-     * @param userId user id
-     * @return event
-     */
-    protected EventState createEventState(int type, NodeId parentId, Path parentPath,
-                                          NodeId childId, Path.PathElement childRelPath,
-                                          QName ntName, Set mixins, String userId) {
-        switch (type) {
-            case Event.NODE_ADDED:
-                return EventState.childNodeAdded(parentId, parentPath, childId, childRelPath,
-                        ntName, mixins, getOrCreateSession(userId));
-            case Event.NODE_REMOVED:
-                return EventState.childNodeRemoved(parentId, parentPath, childId, childRelPath,
-                        ntName, mixins, getOrCreateSession(userId));
-            case Event.PROPERTY_ADDED:
-                return EventState.propertyAdded(parentId, parentPath, childRelPath,
-                        ntName, mixins, getOrCreateSession(userId));
-            case Event.PROPERTY_CHANGED:
-                return EventState.propertyChanged(parentId, parentPath, childRelPath,
-                        ntName, mixins, getOrCreateSession(userId));
-            case Event.PROPERTY_REMOVED:
-                return EventState.propertyRemoved(parentId, parentPath, childRelPath,
-                        ntName, mixins, getOrCreateSession(userId));
-            default:
-                String msg = "Unexpected event type: " + type;
-                throw new IllegalArgumentException(msg);
-        }
-    }
-
-
-    /**
-     * Return a session matching a certain user id.
-     *
-     * @param userId user id
-     * @return session
-     */
-    protected Session getOrCreateSession(String userId) {
-        if (lastSession == null || !lastSession.getUserID().equals(userId)) {
-            lastSession = new ClusterSession(userId);
-        }
-        return lastSession;
-    }
-
-    /**
      * Move away current journal file (and all other files), incrementing their
      * version counter. A file named <code>journal.N.log</code> gets renamed to
      * <code>journal.(N+1).log</code>, whereas the main journal file gets renamed
@@ -800,5 +340,23 @@
                 }
             }
         }
+    }
+
+    /**
+     * Convert an id given as string, to its raw form, i.e. to its binary
+     * representation, encoded as UTF-8.
+     *
+     * @throws IOException if an I/O error occurs, which is very unlikely.
+     */
+    private static byte[] toRawId(String id) throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        dos.writeUTF(id);
+        dos.close();
+
+        byte[] b = bos.toByteArray();
+        byte[] rawId = new byte[b.length - 2];
+        System.arraycopy(b, 2, rawId, 0, rawId.length);
+        return rawId;
     }
 }

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java?view=diff&rev=495239&r1=495238&r2=495239
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java Thu Jan 11 06:40:00 2007
@@ -19,108 +19,52 @@
 import org.apache.jackrabbit.name.NamespaceResolver;
 
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.DataInput;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
 
 /**
- * Represents a file-based record. Physically, a file record contains its length in the
- * first 4 bytes, immediately followed by its creator in a length-prefixed, UTF-encoded
- * string. All further fields are record-specific.
+ * Represents a file-based record. Physically, a file record starts with its creator
+ * in a length-prefixed, UTF-encoded string, followed by a 4 byte indicating the
+ * length of data. All further fields are record-specific.
  */
 class FileRecord {
 
     /**
-     * Indicator for a literal UUID.
+     * Record creator.
      */
-    static final byte UUID_LITERAL = 'L';
-
-    /**
-     * Indicator for a UUID index.
-     */
-    static final byte UUID_INDEX = 'I';
-
-    /**
-     * Revision.
-     */
-    private long revision;
-
-    /**
-     * Underlying input stream.
-     */
-    private DataInputStream in;
-
-    /**
-     * File use when creating a new record.
-     */
-    private File file;
-
-    /**
-     * Underlying output stream.
-     */
-    private DataOutputStream out;
+    //private final String creator;
+    private final byte[] creator;
 
     /**
      * Record length.
      */
-    private int length;
+    private final int length;
 
     /**
-     * Creator of a record.
+     * Input stream associated with record data.
      */
-    private String creator;
+    private final DataInputStream dataIn;
 
     /**
-     * Bytes used by creator when written in UTF encoding and length-prefixed.
+     * Revision.
      */
-    private int creatorLength;
+    private long revision;
 
     /**
-     * Flag indicating whether bytes need to be skipped at the end.
+     * Flag indicating whether the data associated with this record has been consumed.
      */
     private boolean consumed;
 
     /**
-     * Creates a new file record. Used when opening an existing record.
-     *
-     * @param revision revision this record represents
-     * @param in underlying input stream
-     * @throws IOException if reading the creator fails
-     */
-    public FileRecord(long revision, InputStream in)
-            throws IOException {
-
-        this.revision = revision;
-        if (in instanceof DataInputStream) {
-            this.in = (DataInputStream) in;
-        } else {
-            this.in = new DataInputStream(in);
-        }
-        this.length = this.in.readInt();
-
-        readCreator();
-    }
-
-    /**
-     * Creates a new file record. Used when creating a new record.
+     * Creates a new instance of this class. Used when opening an existing record.
      *
      * @param creator creator of this record
-     * @param file underlying (temporary) file
-     * @throws IOException if writing the creator fails
+     * @param length record length
+     * @param dataIn input stream containing record data
      */
-    public FileRecord(String creator, File file) throws IOException {
-
+    public FileRecord(byte[] creator, int length, DataInputStream dataIn) {
         this.creator = creator;
-        this.file = file;
-
-        this.out = new DataOutputStream(new FileOutputStream(file));
-
-        writeCreator();
+        this.length = length;
+        this.dataIn = dataIn;
     }
 
     /**
@@ -133,21 +77,24 @@
     }
 
     /**
-     * Set the journal revision associated with this record.
+     * Set the journal revision associated with this record. Called after creation
+     * of the file record.
      *
-     * @param revision journal revision
+     * @param revision revision
      */
-    public void setRevision(long revision) {
+    void setRevision(long revision) {
         this.revision = revision;
     }
 
     /**
-     * Return the journal counter associated with the next record.
+     * Return the journal counter associated with the next record. A file record's
+     * size is the size of the length-prefixed creator string plus the size of
+     * the length-prefixed data.
      *
      * @return next revision
      */
     public long getNextRevision() {
-        return revision + length + 4;
+        return revision + FileRecordLog.getRecordSize(creator, length);
     }
 
     /**
@@ -155,7 +102,7 @@
      *
      * @return creator
      */
-    public String getCreator() {
+    public byte[] getCreator() {
         return creator;
     }
 
@@ -165,41 +112,9 @@
      * @param resolver resolver to use when mapping prefixes to full names
      * @return record input
      */
-    public FileRecordInput getInput(NamespaceResolver resolver) {
+    public RecordInput getInput(NamespaceResolver resolver) {
         consumed = true;
-        return new FileRecordInput(in, resolver);
-    }
-
-    /**
-     * Return an output on this record.
-     *
-     * @param resolver resolver to use when mapping full names to prefixes
-     * @return record output
-     */
-    public FileRecordOutput getOutput(NamespaceResolver resolver) {
-        return new FileRecordOutput(this, out, resolver);
-    }
-
-    /**
-     * Append this record to some output stream.
-     *
-     * @param out outputstream to append to
-     */
-    void append(DataOutputStream out) throws IOException {
-        out.writeInt(length);
-
-        byte[] buffer = new byte[8192];
-        int len;
-
-        InputStream in = new BufferedInputStream(new FileInputStream(file));
-        try {
-            while ((len = in.read(buffer)) > 0) {
-                out.write(buffer, 0, len);
-            }
-            out.flush();
-        } finally {
-            in.close();
-        }
+        return new RecordInput(dataIn, resolver);
     }
 
     /**
@@ -208,11 +123,11 @@
      *
      * @throws IOException if an I/O error occurs
      */
-    void skip() throws IOException {
+    public void skip() throws IOException {
         if (!consumed) {
-            long skiplen = length - creatorLength;
+            long skiplen = length;
             while (skiplen > 0) {
-                long skipped = in.skip(skiplen);
+                long skipped = dataIn.skip(skiplen);
                 if (skipped <= 0) {
                     break;
                 }
@@ -222,158 +137,6 @@
                 String msg = "Unable to skip remaining bytes.";
                 throw new IOException(msg);
             }
-        }
-    }
-
-    /**
-     * Invoked when output has been closed.
-     */
-    void closed() {
-        length = (int) file.length();
-    }
-
-    /**
-     * Read creator from the underlying data input stream.
-     *
-     * @throws IOException if an I/O error occurs
-     */
-    private void readCreator() throws IOException {
-        UTFByteCounter counter = new UTFByteCounter(in);
-        creator = DataInputStream.readUTF(counter);
-        creatorLength = counter.getBytes();
-    }
-
-    /**
-     * Write creator to the underlying data output stream.
-     *
-     * @throws IOException if an I/O error occurs
-     */
-    private void writeCreator() throws IOException {
-        out.writeUTF(creator);
-    }
-
-    /**
-     * UTF byte counter. Counts the bytes actually read from a given
-     * <code>DataInputStream</code> that make up a UTF-encoded string.
-     */
-    static class UTFByteCounter implements DataInput {
-
-        /**
-         * Underlying input stream.
-         */
-        private final DataInputStream in;
-
-        /**
-         * UTF length.
-         */
-        private int bytes;
-
-        /**
-         * Create a new instance of this class.
-         *
-         * @param in underlying data input stream
-         */
-        public UTFByteCounter(DataInputStream in) {
-            this.in = in;
-        }
-
-        /**
-         * Return the number of bytes read from the underlying input stream.
-         *
-         * @return number of bytes
-         */
-        public int getBytes() {
-            return bytes;
-        }
-
-        /**
-         * @see java.io.DataInputStream#readUnsignedShort()
-         *
-         * Remember number of bytes read.
-         */
-        public int readUnsignedShort() throws IOException {
-            try {
-                return in.readUnsignedShort();
-            } finally {
-                bytes += 2;
-            }
-        }
-
-        /**
-         * @see java.io.DataInputStream#readUnsignedShort()
-         *
-         * Remember number of bytes read.
-         */
-        public void readFully(byte b[]) throws IOException {
-            try {
-                in.readFully(b);
-            } finally {
-                bytes += b.length;
-            }
-        }
-
-        /**
-         * @see java.io.DataInputStream#readUnsignedShort()
-         *
-         * Remember number of bytes read.
-         */
-        public void readFully(byte b[], int off, int len) throws IOException {
-            try {
-                in.readFully(b, off, len);
-            } finally {
-                bytes += b.length;
-            }
-        }
-
-        /**
-         * Methods not implemented.
-         */
-        public byte readByte() throws IOException {
-            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
-        }
-
-        public char readChar() throws IOException {
-            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
-        }
-
-        public double readDouble() throws IOException {
-            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
-        }
-
-        public float readFloat() throws IOException {
-            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
-        }
-
-        public int readInt() throws IOException {
-            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
-       }
-
-        public int readUnsignedByte() throws IOException {
-            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
-        }
-
-        public long readLong() throws IOException {
-            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
-        }
-
-        public short readShort() throws IOException {
-            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
-        }
-
-        public boolean readBoolean() throws IOException {
-            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
-        }
-
-        public int skipBytes(int n) throws IOException {
-            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
-        }
-
-        public String readLine() throws IOException {
-            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
-        }
-
-        public String readUTF() throws IOException {
-            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
         }
     }
 }

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java?view=diff&rev=495239&r1=495238&r2=495239
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java Thu Jan 11 06:40:00 2007
@@ -96,9 +96,9 @@
         }
         if (recordLog == null) {
             recordLog = getRecordLog(nextRevision);
-            recordLog.seek(nextRevision);
         }
-        record = new FileRecord(nextRevision, recordLog.getInputStream());
+        record = recordLog.read();
+        record.setRevision(nextRevision);
         nextRevision = record.getNextRevision();
         return record;
     }
@@ -114,6 +114,7 @@
         for (int i = 0; i < logFiles.length; i++) {
             FileRecordLog recordLog = new FileRecordLog(logFiles[i]);
             if (recordLog.contains(revision)) {
+                recordLog.seek(revision);
                 return recordLog;
             }
         }
@@ -123,10 +124,8 @@
 
     /**
      * Close this cursor, releasing its resources.
-     *
-     * @throws IOException if an I/O error occurs
      */
-    public void close() throws IOException {
+    public void close() {
         if (recordLog != null) {
             recordLog.close();
         }



Mime
View raw message