jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dpfis...@apache.org
Subject svn commit: r473380 - in /jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core: cluster/ state/
Date Fri, 10 Nov 2006 16:26:58 GMT
Author: dpfister
Date: Fri Nov 10 08:26:57 2006
New Revision: 473380

URL: http://svn.apache.org/viewvc?view=rev&rev=473380
Log:
JCR-623 Clustering

Added:
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java
Modified:
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java
    jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java

Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java?view=diff&rev=473380&r1=473379&r2=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
(original)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
Fri Nov 10 08:26:57 2006
@@ -342,7 +342,7 @@
             String msg = "Unable to create log entry: " + e.getMessage();
             log.error(msg);
         } catch (Throwable e) {
-            String msg = "Unexpected error while creating log entry.";
+            String msg = "Unexpected error while preparing log entry.";
             log.error(msg, e);
         }
     }
@@ -357,7 +357,7 @@
             String msg = "Unable to create log entry: " + e.getMessage();
             log.error(msg);
         } catch (Throwable e) {
-            String msg = "Unexpected error while creating log entry.";
+            String msg = "Unexpected error while committing log entry.";
             log.error(msg, e);
         }
     }
@@ -372,7 +372,7 @@
             String msg = "Unable to create log entry: " + e.getMessage();
             log.error(msg);
         } catch (Throwable e) {
-            String msg = "Unexpected error while creating log entry.";
+            String msg = "Unexpected error while cancelling log entry.";
             log.error(msg, e);
         }
     }

Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java?view=diff&rev=473380&r1=473379&r2=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java
(original)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java
Fri Nov 10 08:26:57 2006
@@ -34,8 +34,6 @@
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -45,7 +43,25 @@
 import EDU.oswego.cs.dl.util.concurrent.Mutex;
 
 /**
- * File-based journal implementation.
+ * File-based journal implementation. A directory specified as <code>directory</code>
+ * bean property will contain log files and a global revision file, containing the
+ * latest revision file. When the current log file's size exceeds <code>maxSize</code>
+ * bytes, it gets renamed to its name appended by '1'. At the same time, all log files
+ * already having a version counter, get their version counter incremented by <code>1</code>.
+ * <p/>
+ * It is configured through the following properties:
+ * <ul>
+ * <li><code>directory</code>: the shared directory where journal logs
and read from
+ * and written to; this is a required property with no default value</li>
+ * <li><code>revision</code>: the filename where the parent cluster node's
revision
+ * file should be written to; this is a required property with no default value</li>
+ * <li><code>basename</code>: this is the basename of the journal logs
created in
+ * the shared directory; its default value is <code>journal</code></li>
+ * <li><code>maximumSize</code>: this is the maximum size in bytes of a
journal log
+ * before a new log will be created; its default value is <code>1048576</code>
(1MB)</li>
+ * </ul>
+ *
+ * todo after some iterations, old files should be automatically compressed to save space
  */
 public class FileJournal implements Journal {
 
@@ -55,6 +71,21 @@
     private static final String REVISION_NAME = "revision";
 
     /**
+     * Log extension.
+     */
+    private static final String LOG_EXTENSION = ".log";
+
+    /**
+     * Default base name for journal files.
+     */
+    private static final String DEFAULT_BASENAME = "journal";
+
+    /**
+     * Default max size of a journal file (1MB).
+     */
+    private static final int DEFAULT_MAXSIZE = 1048576;
+
+    /**
      * Logger.
      */
     private static Logger log = LoggerFactory.getLogger(FileJournal.class);
@@ -70,7 +101,7 @@
     private NamespaceResolver resolver;
 
     /**
-     * Callback.
+     * Record processor.
      */
     private RecordProcessor processor;
 
@@ -85,6 +116,16 @@
     private String revision;
 
     /**
+     * Journal file base name, bean property.
+     */
+    private String basename;
+
+    /**
+     * Maximum size of a journal file before a rotation takes place, bean property.
+     */
+    private int maximumSize;
+
+    /**
      * Journal root directory.
      */
     private File root;
@@ -152,6 +193,38 @@
     }
 
     /**
+     * Bean getter for base name.
+     * @return base name
+     */
+    public String getBasename() {
+        return basename;
+    }
+
+    /**
+     * Bean setter for basename.
+     * @param basename base name
+     */
+    public void setBasename(String basename) {
+        this.basename = basename;
+    }
+
+    /**
+     * Bean getter for maximum size.
+     * @return maximum size
+     */
+    public int getMaximumSize() {
+        return maximumSize;
+    }
+
+    /**
+     * Bean setter for maximum size.
+     * @param maximumSize maximum size
+     */
+    public void setMaximumSize(int maximumSize) {
+        this.maximumSize = maximumSize;
+    }
+
+    /**
      * {@inheritDoc}
      */
     public void init(String id, RecordProcessor processor, NamespaceResolver resolver) throws
JournalException {
@@ -167,6 +240,12 @@
             String msg = "Revision not specified.";
             throw new JournalException(msg);
         }
+        if (basename == null) {
+            basename = DEFAULT_BASENAME;
+        }
+        if (maximumSize == 0) {
+            maximumSize = DEFAULT_MAXSIZE;
+        }
         root = new File(directory);
         if (!root.exists() || !root.isDirectory()) {
             String msg = "Directory specified does either not exist or is not a directory:
" + directory;
@@ -182,49 +261,48 @@
      * {@inheritDoc}
      */
     public void sync() throws JournalException {
-        final long instanceValue = instanceRevision.get();
-        final long globalValue = globalRevision.get();
-
-        File[] files = root.listFiles(new FilenameFilter() {
+        File[] logFiles = root.listFiles(new FilenameFilter() {
             public boolean accept(File dir, String name) {
-                if (name.endsWith(FileRecord.EXTENSION)) {
-                    int sep = name.indexOf('.');
-                    if (sep > 0) {
-                        try {
-                            long counter = Long.parseLong(name.substring(0, sep), 16);
-                            return counter > instanceValue && counter <= globalValue;
-                        } catch (NumberFormatException e) {
-                            String msg = "Skipping bogusly named journal file '" + name +
"': " + e.getMessage();
-                            log.warn(msg);
-                        }
-                    }
-                }
-                return false;
+                return name.startsWith(basename + ".");
             }
         });
-        Arrays.sort(files, new Comparator() {
+        Arrays.sort(logFiles, new Comparator() {
             public int compare(Object o1, Object o2) {
                 File f1 = (File) o1;
                 File f2 = (File) o2;
-                return f1.getName().compareTo(f2.getName());
+                return f1.compareTo(f2);
             }
         });
-        if (files.length > 0) {
-            for (int i = 0; i < files.length; i++) {
-                try {
-                    FileRecord record = new FileRecord(files[i]);
-                    if (!record.getJournalId().equals(id)) {
+
+        long instanceValue = instanceRevision.get();
+        long globalValue = globalRevision.get();
+
+        if (instanceValue < globalValue) {
+            FileRecordCursor cursor = new FileRecordCursor(logFiles,
+                    instanceValue, globalValue);
+            try {
+                while (cursor.hasNext()) {
+                    FileRecord record = cursor.next();
+                    if (!record.getCreator().equals(id)) {
                         process(record);
                     } else {
-                        log.info("Log entry matches journal id, skipped: " + files[i]);
+                        log.info("Log entry matches journal id, skipped: " + record.getRevision());
                     }
-                    instanceRevision.set(record.getCounter());
-                } catch (IllegalArgumentException e) {
-                    String msg = "Skipping bogusly named journal file '" + files[i] + ":
" + e.getMessage();
+                    instanceRevision.set(record.getNextRevision());
+                }
+            } catch (IOException e) {
+                String msg = "Unable to iterate over modified records: " + e.getMessage();
+                throw new JournalException(msg);
+
+            } finally {
+                try {
+                    cursor.close();
+                } catch (IOException e) {
+                    String msg = "I/O error while closing record cursor: " + e.getMessage();
                     log.warn(msg);
                 }
             }
-            log.info("Sync finished, instance revision is: " + FileRecord.toHexString(instanceRevision.get()));
+            log.info("Sync finished, instance revision is: " + instanceRevision.get());
         }
     }
 
@@ -235,16 +313,12 @@
      * @throws JournalException if an error occurs
      */
     void process(FileRecord record) throws JournalException {
-        File file = record.getFile();
-
-        log.info("Processing: " + file);
+        log.info("Processing revision: " + record.getRevision());
 
-        FileRecordInput in = null;
+        FileRecordInput in = record.getInput(resolver);
         String workspace = null;
 
         try {
-            in = new FileRecordInput(new FileInputStream(file), resolver);
-
             workspace = in.readString();
             if (workspace.equals("")) {
                 workspace = null;
@@ -296,23 +370,19 @@
             processor.end();
 
         } catch (NameException e) {
-            String msg = "Unable to read journal entry " + file + ": " + e.getMessage();
+            String msg = "Unable to read revision " + record.getRevision() +
+                    ": " + e.getMessage();
             throw new JournalException(msg);
         } catch (IOException e) {
-            String msg = "Unable to read journal entry " + file + ": " + e.getMessage();
+            String msg = "Unable to read revision " + record.getRevision() +
+                    ": " + e.getMessage();
             throw new JournalException(msg);
         } catch (IllegalArgumentException e) {
-            String msg = "Error while processing journal file " + file + ": " + e.getMessage();
+            String msg = "Error while processing revision " +
+                    record.getRevision() + ": " + e.getMessage();
             throw new JournalException(msg);
         } finally {
-            if (in != null) {
-                try {
-                    in.close();
-                } catch (IOException e) {
-                    String msg = "I/O error while closing " + file + ": " + e.getMessage();
-                    log.warn(msg);
-                }
-            }
+            in.close();
         }
     }
 
@@ -333,7 +403,9 @@
             sync();
 
             tempLog = File.createTempFile("journal", ".tmp", root);
-            out = new FileRecordOutput(new FileOutputStream(tempLog), resolver);
+
+            record = new FileRecord(id, tempLog);
+            out = record.getOutput(resolver);
             out.writeString(workspace != null ? workspace : "");
 
             succeeded = true;
@@ -490,7 +562,8 @@
 
         try {
             sync();
-            record = new FileRecord(root, globalRevision.get() + 1, id);
+
+            record.setRevision(globalRevision.get());
 
             prepared = true;
         } finally {
@@ -508,10 +581,22 @@
             out.writeChar('\0');
             out.close();
 
-            if (!tempLog.renameTo(record.getFile())) {
-                throw new JournalException("Unable to rename " + tempLog + " to " + record.getFile());
+            long nextRevision = record.getNextRevision();
+
+            File journalFile = new File(root, basename + LOG_EXTENSION);
+
+            FileRecordLog recordLog = new FileRecordLog(journalFile);
+            if (!recordLog.isNew()) {
+                if (nextRevision - recordLog.getFirstRevision() > maximumSize) {
+                    switchLogs();
+                    recordLog = new FileRecordLog(journalFile);
+                }
             }
-            globalRevision.set(record.getCounter());
+            recordLog.append(record);
+
+            tempLog.delete();
+            globalRevision.set(nextRevision);
+            instanceRevision.set(nextRevision);
 
         } catch (IOException e) {
             String msg = "Unable to close journal log " + tempLog + ": " + e.getMessage();
@@ -535,6 +620,48 @@
         } finally {
             globalRevision.unlock();
             writeMutex.release();
+        }
+    }
+
+    /**
+     * Move away current journal file (and all other files), incrementing their
+     * version counter. A file named <code>journal.N.log</code> gets renamed
to
+     * <code>journal.(N+1).log</code>, whereas the main journal file gets renamed
+     * to <code>journal.1.log</code>.
+     */
+    private void switchLogs() {
+        FilenameFilter filter = new FilenameFilter() {
+            public boolean accept(File dir, String name) {
+                return name.startsWith(basename + ".");
+            }
+        };
+        File[] files = root.listFiles(filter);
+        Arrays.sort(files, new Comparator() {
+            public int compare(Object o1, Object o2) {
+                File f1 = (File) o1;
+                File f2 = (File) o2;
+                return f2.compareTo(f1);
+            }
+        });
+        for (int i = 0; i < files.length; i++) {
+            File file = files[i];
+            String name = file.getName();
+            int sep = name.lastIndexOf('.');
+            if (sep != -1) {
+                String ext = name.substring(sep + 1);
+                if (ext.equals(LOG_EXTENSION)) {
+                    file.renameTo(new File(root, name + ".1"));
+                } else {
+                    try {
+                        int version = Integer.parseInt(ext);
+                        String newName = name.substring(0, sep + 1) +
+                                String.valueOf(version + 1);
+                        file.renameTo(new File(newName));
+                    } catch (NumberFormatException e) {
+                        log.warn("Bogusly named journal file, skipped: " + file);
+                    }
+                }
+            }
         }
     }
 }

Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java?view=diff&rev=473380&r1=473379&r2=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java
(original)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java
Fri Nov 10 08:26:57 2006
@@ -16,134 +16,364 @@
  */
 package org.apache.jackrabbit.core.cluster;
 
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
+import org.apache.jackrabbit.name.NamespaceResolver;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.DataInput;
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
 
 /**
- * Represents a file-based record.
+ * Represents a file-based record. Physically, a file record contains its length in the
+ * first 4 bytes, immediately followed by its creator in a length-prefixed, UTF-encoded
+ * string. All further fields are record-specific.
  */
 class FileRecord {
 
     /**
-     * File record extension.
+     * Indicator for a literal UUID.
      */
-    static final String EXTENSION = ".log";
+    static final byte UUID_LITERAL = 'L';
 
     /**
-     * Indicator for a literal UUID.
+     * Indicator for a UUID index.
      */
-    static final byte UUID_LITERAL = 0x00;
+    static final byte UUID_INDEX = 'I';
 
     /**
-     * Indicator for a UUID index.
+     * Revision.
+     */
+    private long revision;
+
+    /**
+     * Underlying input stream.
+     */
+    private DataInputStream in;
+
+    /**
+     * File use when creating a new record.
+     */
+    private File file;
+
+    /**
+     * Underlying output stream.
      */
-    static final byte UUID_INDEX = 0x01;
+    private DataOutputStream out;
 
     /**
-     * Used for padding long string representations.
+     * Record length.
      */
-    private static final String LONG_PADDING = "0000000000000000";
+    private int length;
 
     /**
-     * Underlying file.
+     * Creator of a record.
      */
-    private final File file;
+    private String creator;
 
     /**
-     * Counter.
+     * Bytes used by creator when written in UTF encoding and length-prefixed.
      */
-    private final long counter;
+    private int creatorLength;
 
     /**
-     * Journal id.
+     * Flag indicating whether bytes need to be skipped at the end.
      */
-    private final String journalId;
+    private boolean consumed;
+
+    /**
+     * Creates a new file record. Used when opening an existing record.
+     *
+     * @param revision revision this record represents
+     * @param in underlying input stream
+     * @throws IOException if reading the creator fails
+     */
+    public FileRecord(long revision, InputStream in)
+            throws IOException {
+
+        this.revision = revision;
+        if (in instanceof DataInputStream) {
+            this.in = (DataInputStream) in;
+        } else {
+            this.in = new DataInputStream(in);
+        }
+        this.length = this.in.readInt();
+
+        readCreator();
+    }
 
     /**
-     * Creates a new file record from an existing file. Retrieves meta data by parsing the
file's name.
+     * Creates a new file record. Used when creating a new record.
      *
-     * @param file file to use as record
-     * @throws IllegalArgumentException if file name is bogus
+     * @param creator creator of this record
+     * @param file underlying (temporary) file
+     * @throws IOException if writing the creator fails
      */
-    public FileRecord(File file) throws IllegalArgumentException {
+    public FileRecord(String creator, File file) throws IOException {
+
+        this.creator = creator;
         this.file = file;
 
-        String name = file.getName();
+        this.out = new DataOutputStream(new FileOutputStream(file));
 
-        int sep1 = name.indexOf('.');
-        if (sep1 == -1) {
-            throw new IllegalArgumentException("Missing first . separator.");
-        }
-        try {
-            counter = Long.parseLong(name.substring(0, sep1), 16);
-        } catch (NumberFormatException e) {
-            throw new IllegalArgumentException("Unable to decompose long: " + e.getMessage());
-        }
-        int sep2 = name.lastIndexOf('.');
-        if (sep2 == -1) {
-            throw new IllegalArgumentException("Missing second . separator.");
-        }
-        journalId = name.substring(sep1 + 1, sep2);
+        writeCreator();
     }
 
     /**
-     * Creates a new file record from a counter and instance ID.
+     * Return the journal revision associated with this record.
      *
-     * @param parent parent directory
-     * @param counter counter to use
-     * @param journalId journal id to use
+     * @return revision
      */
-    public FileRecord(File parent, long counter, String journalId) {
-        StringBuffer name = new StringBuffer();
-        name.append(toHexString(counter));
-        name.append('.');
-        name.append(journalId);
+    public long getRevision() {
+        return revision;
+    }
 
-        name.append(EXTENSION);
+    /**
+     * Set the journal revision associated with this record.
+     *
+     * @param revision journal revision
+     */
+    public void setRevision(long revision) {
+        this.revision = revision;
+    }
 
-        this.file = new File(parent, name.toString());
-        this.counter = counter;
-        this.journalId = journalId;
+    /**
+     * Return the journal counter associated with the next record.
+     *
+     * @return next revision
+     */
+    public long getNextRevision() {
+        return revision + length + 4;
     }
 
     /**
-     * Return the journal counter associated with this record.
+     * Return the creator of this record.
      *
-     * @return counter
+     * @return creator
      */
-    public long getCounter() {
-        return counter;
+    public String getCreator() {
+        return creator;
     }
 
     /**
-     * Return the id of the journal that created this record.
+     * Return an input on this record.
      *
-     * @return journal id
+     * @param resolver resolver to use when mapping prefixes to full names
+     * @return record input
      */
-    public String getJournalId() {
-        return journalId;
+    public FileRecordInput getInput(NamespaceResolver resolver) {
+        consumed = true;
+        return new FileRecordInput(in, resolver);
     }
 
     /**
-     * Return this record's file.
+     * Return an output on this record.
      *
-     * @return file
+     * @param resolver resolver to use when mapping full names to prefixes
+     * @return record output
      */
-    public File getFile() {
-        return file;
+    public FileRecordOutput getOutput(NamespaceResolver resolver) {
+        return new FileRecordOutput(this, out, resolver);
+    }
+
+    /**
+     * Append this record to some output stream.
+     *
+     * @param out outputstream to append to
+     */
+    void append(DataOutputStream out) throws IOException {
+        out.writeInt(length);
+
+        byte[] buffer = new byte[8192];
+        int len;
+
+        InputStream in = new BufferedInputStream(new FileInputStream(file));
+        try {
+            while ((len = in.read(buffer)) > 0) {
+                out.write(buffer, 0, len);
+            }
+            out.flush();
+        } finally {
+            in.close();
+        }
+    }
+
+    /**
+     * Skip over this record, positioning the underlying input stream
+     * on the next available record.
+     *
+     * @throws IOException if an I/O error occurs
+     */
+    void skip() throws IOException {
+        if (!consumed) {
+            long skiplen = length - creatorLength;
+            while (skiplen > 0) {
+                long skipped = in.skip(skiplen);
+                if (skipped <= 0) {
+                    break;
+                }
+                skiplen -= skipped;
+            }
+            if (skiplen != 0) {
+                String msg = "Unable to skip remaining bytes.";
+                throw new IOException(msg);
+            }
+        }
     }
 
     /**
-     * Return a zero-padded long string representation.
+     * Invoked when output has been closed.
      */
-    public static String toHexString(long l) {
-        String s = Long.toHexString(l);
-        int padlen = LONG_PADDING.length() - s.length();
-        if (padlen > 0) {
-            s = LONG_PADDING.substring(0, padlen) + s;
+    void closed() {
+        length = (int) file.length();
+    }
+
+    /**
+     * Read creator from the underlying data input stream.
+     *
+     * @throws IOException if an I/O error occurs
+     */
+    private void readCreator() throws IOException {
+        UTFByteCounter counter = new UTFByteCounter(in);
+        creator = DataInputStream.readUTF(counter);
+        creatorLength = counter.getBytes();
+    }
+
+    /**
+     * Write creator to the underlying data output stream.
+     *
+     * @throws IOException if an I/O error occurs
+     */
+    private void writeCreator() throws IOException {
+        out.writeUTF(creator);
+    }
+
+    /**
+     * UTF byte counter. Counts the bytes actually read from a given
+     * <code>DataInputStream</code> that make up a UTF-encoded string.
+     */
+    static class UTFByteCounter implements DataInput {
+
+        /**
+         * Underlying input stream.
+         */
+        private final DataInputStream in;
+
+        /**
+         * UTF length.
+         */
+        private int bytes;
+
+        /**
+         * Create a new instance of this class.
+         *
+         * @param in underlying data input stream
+         */
+        public UTFByteCounter(DataInputStream in) {
+            this.in = in;
+        }
+
+        /**
+         * Return the number of bytes read from the underlying input stream.
+         *
+         * @return number of bytes
+         */
+        public int getBytes() {
+            return bytes;
+        }
+
+        /**
+         * @see java.io.DataInputStream#readUnsignedShort()
+         *
+         * Remember number of bytes read.
+         */
+        public int readUnsignedShort() throws IOException {
+            try {
+                return in.readUnsignedShort();
+            } finally {
+                bytes += 2;
+            }
+        }
+
+        /**
+         * @see java.io.DataInputStream#readUnsignedShort()
+         *
+         * Remember number of bytes read.
+         */
+        public void readFully(byte b[]) throws IOException {
+            try {
+                in.readFully(b);
+            } finally {
+                bytes += b.length;
+            }
+        }
+
+        /**
+         * @see java.io.DataInputStream#readUnsignedShort()
+         *
+         * Remember number of bytes read.
+         */
+        public void readFully(byte b[], int off, int len) throws IOException {
+            try {
+                in.readFully(b, off, len);
+            } finally {
+                bytes += b.length;
+            }
+        }
+
+        /**
+         * Methods not implemented.
+         */
+        public byte readByte() throws IOException {
+            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+        }
+
+        public char readChar() throws IOException {
+            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+        }
+
+        public double readDouble() throws IOException {
+            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+        }
+
+        public float readFloat() throws IOException {
+            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+        }
+
+        public int readInt() throws IOException {
+            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+       }
+
+        public int readUnsignedByte() throws IOException {
+            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+        }
+
+        public long readLong() throws IOException {
+            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+        }
+
+        public short readShort() throws IOException {
+            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+        }
+
+        public boolean readBoolean() throws IOException {
+            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+        }
+
+        public int skipBytes(int n) throws IOException {
+            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+        }
+
+        public String readLine() throws IOException {
+            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+        }
+
+        public String readUTF() throws IOException {
+            throw new IllegalStateException("Unexpected call, deliberately not implemented.");
         }
-        return s;
     }
 }

Added: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java?view=auto&rev=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java
(added)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java
Fri Nov 10 08:26:57 2006
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.cluster;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Record cursor that returns unseen revisions in ascending order on every
+ * iteration. When iterating, a record must either be completely processed
+ * or its {@link FileRecord#skip()} method must be invoked to guarantee
+ * that this cursor is pointing at the next record.
+ */
+class FileRecordCursor {
+
+    /**
+     * Log files to scan for revisions.
+     */
+    private File[] logFiles;
+
+    /**
+     * Next revision to visit.
+     */
+    private long nextRevision;
+
+    /**
+     * Last revision to visit.
+     */
+    private long lastRevision;
+
+    /**
+     * Current record log, containing file records.
+     */
+    private FileRecordLog recordLog;
+
+    /**
+     * Current record.
+     */
+    private FileRecord record;
+
+    /**
+     * Creates a new instance of this class.
+     *
+     * @param logFiles available log files, sorted ascending by age
+     * @param firstRevision first revision to return
+     * @param lastRevision last revision to return
+     */
+    public FileRecordCursor(File[] logFiles, long firstRevision, long lastRevision) {
+        this.logFiles = logFiles;
+        this.nextRevision = firstRevision;
+        this.lastRevision = lastRevision;
+    }
+
+
+    /**
+     * Return a flag indicating whether there are next records.
+     */
+    public boolean hasNext() {
+        return nextRevision < lastRevision;
+    }
+
+    /**
+     * Returns the next record.
+     *
+     * @throws IllegalStateException if no next revision exists
+     * @throws IOException if an I/O error occurs
+     */
+    public FileRecord next() throws IOException {
+        if (!hasNext()) {
+            String msg = "No next revision.";
+            throw new IllegalStateException(msg);
+        }
+        if (record != null) {
+            record.skip();
+            record = null;
+        }
+        if (recordLog != null) {
+            if (!recordLog.contains(nextRevision)) {
+                recordLog.close();
+                recordLog = null;
+            }
+        }
+        if (recordLog == null) {
+            recordLog = getRecordLog(nextRevision);
+            recordLog.seek(nextRevision);
+        }
+        record = new FileRecord(nextRevision, recordLog.getInputStream());
+        nextRevision = record.getNextRevision();
+        return record;
+    }
+
+    /**
+     * Return record log containing a given revision.
+     *
+     * @param revision revision to locate
+     * @return record log containing that revision
+     * @throws IOException if an I/O error occurs
+     */
+    private FileRecordLog getRecordLog(long revision) throws IOException {
+        for (int i = 0; i < logFiles.length; i++) {
+            FileRecordLog recordLog = new FileRecordLog(logFiles[i]);
+            if (recordLog.contains(revision)) {
+                return recordLog;
+            }
+        }
+        String msg = "No log file found containing revision: " + revision;
+        throw new IOException(msg);
+    }
+
+    /**
+     * Close this cursor, releasing its resources.
+     *
+     * @throws IOException if an I/O error occurs
+     */
+    public void close() throws IOException {
+        if (recordLog != null) {
+            recordLog.close();
+        }
+    }
+}
\ No newline at end of file

Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java?view=diff&rev=473380&r1=473379&r2=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java
(original)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java
Fri Nov 10 08:26:57 2006
@@ -26,16 +26,15 @@
 import org.apache.jackrabbit.name.Path;
 import org.apache.jackrabbit.name.PathFormat;
 import org.apache.jackrabbit.name.MalformedPathException;
+import org.apache.jackrabbit.uuid.Constants;
+import org.apache.jackrabbit.uuid.UUID;
 
-import java.io.File;
-import java.io.RandomAccessFile;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.DataInputStream;
 import java.util.ArrayList;
 
 /**
- * Defines methods to read members out of a file record.
+ * Allows reading data from a <code>FileRecord</code>.
  */
 class FileRecordInput {
 
@@ -65,8 +64,8 @@
      * @param in       underlying input stream
      * @param resolver namespace resolver
      */
-    public FileRecordInput(InputStream in, NamespaceResolver resolver) {
-        this.in = new DataInputStream(in);
+    public FileRecordInput(DataInputStream in, NamespaceResolver resolver) {
+        this.in = in;
         this.resolver = resolver;
     }
 
@@ -186,20 +185,22 @@
     public NodeId readNodeId() throws IOException {
         checkOpen();
 
-        byte b = readByte();
-        if (b == FileRecord.UUID_INDEX) {
+        byte uuidType = readByte();
+        if (uuidType == FileRecord.UUID_INDEX) {
             int index = readInt();
             if (index == -1) {
                 return null;
             } else {
                 return (NodeId) uuidIndex.get(index);
             }
-        } else if (b == FileRecord.UUID_LITERAL) {
-            NodeId nodeId = NodeId.valueOf(readString());
+        } else if (uuidType == FileRecord.UUID_LITERAL) {
+            byte[] b = new byte[Constants.UUID_BYTE_LENGTH];
+            in.readFully(b);
+            NodeId nodeId = new NodeId(new UUID(b));
             uuidIndex.add(nodeId);
             return nodeId;
         } else {
-            String msg = "UUID indicator unknown: " + b;
+            String msg = "UUID type unknown: " + uuidType;
             throw new IOException(msg);
         }
     }
@@ -219,18 +220,12 @@
     }
 
     /**
-     * Close this input.
-     *
-     * @throws IOException if an I/O error occurs
+     * Close this input. Does not close underlying stream as this is a shared resource.
      */
-    public void close() throws IOException {
+    public void close() {
         checkOpen();
 
-        try {
-            in.close();
-        } finally {
-            closed = true;
-        }
+        closed = true;
     }
 
     /**

Added: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java?view=auto&rev=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java
(added)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java
Fri Nov 10 08:26:57 2006
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.cluster;
+
+import java.io.File;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.FileInputStream;
+import java.io.BufferedInputStream;
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+
+/**
+ * A file record log is a file containing {@link FileRecord}s. Internally,
+ * the first 8 bytes contain the revision this log starts with.
+ */
+class FileRecordLog {
+
+    /**
+     * Underlying file.
+     */
+    private File file;
+
+    /**
+     * Flag indicating whether this is a new log.
+     */
+    private boolean isNew;
+
+    /**
+     * Input stream used when seeking a specific record.
+     */
+    private DataInputStream in;
+
+    /**
+     * First revision available in this log.
+     */
+    private long minRevision;
+
+    /**
+     * First revision that is not available in this, but in the next log.
+     */
+    private long maxRevision;
+
+    /**
+     * Create a new instance of this class.
+     *
+     * @param file file containing record log
+     * @throws IOException if an I/O error occurs
+     */
+    public FileRecordLog(File file) throws IOException {
+        this.file = file;
+
+        if (file.exists()) {
+            DataInputStream in = new DataInputStream(new FileInputStream(file));
+
+            try {
+                minRevision = in.readLong();
+                maxRevision = minRevision + file.length() - 8;
+            } finally {
+                in.close();
+            }
+        } else {
+            isNew = true;
+        }
+    }
+
+    /**
+     * Return the first revision.
+     *
+     * @return first revision
+     */
+    public long getFirstRevision() {
+        return minRevision;
+    }
+
+    /**
+     * Return a flag indicating whether this record log contains a certain revision.
+     *
+     * @param revision revision to look for
+     * @return <code>true</code> if this record log contain a certain revision;
+     *         <code>false</code> otherwise
+     */
+    public boolean contains(long revision) {
+        return (revision >= minRevision && revision < maxRevision);
+    }
+
+    /**
+     * Return a flag indicating whether this record log is new.
+     *
+     * @return <code>true</code> if this record log is new;
+     *         <code>false</code> otherwise
+     */
+    public boolean isNew() {
+        return isNew;
+    }
+
+    /**
+     * Seek an entry. This is an operation that allows the unterlying input stream
+     * to be sequentially scanned and must therefore not be called twice.
+     *
+     * @param revision revision to seek
+     * @throws IOException if an I/O error occurs
+     */
+    public void seek(long revision) throws IOException {
+        if (in != null) {
+            String msg = "Seek allowed exactly once.";
+            throw new IllegalStateException(msg);
+        }
+        open();
+
+        long skiplen = revision - minRevision + 8;
+        while (skiplen > 0) {
+            long skipped = in.skip(skiplen);
+            if (skipped <= 0) {
+                break;
+            }
+            skiplen -= skipped;
+        }
+        if (skiplen != 0) {
+            String msg = "Unable to skip remaining bytes.";
+            throw new IOException(msg);
+        }
+    }
+
+    /**
+     * Append a record to this log.
+     *
+     * @param record record to add
+     * @throws IOException if an I/O error occurs
+     */
+    public void append(FileRecord record) throws IOException {
+        DataOutputStream out = new DataOutputStream(new FileOutputStream(file, true));
+        try {
+            if (isNew) {
+                out.writeLong(record.getRevision());
+            }
+            record.append(out);
+        } finally {
+            out.close();
+        }
+    }
+
+    /**
+     * Open this log.
+     *
+     * @throws IOException if an I/O error occurs
+     */
+    private void open() throws IOException {
+        in = new DataInputStream(new BufferedInputStream(
+                new FileInputStream(file)));
+    }
+
+    /**
+     * Return the underlying input stream.
+     *
+     * @return underlying input stream
+     */
+    protected DataInputStream getInputStream() {
+        if (in == null) {
+            String msg = "Input stream not open.";
+            throw new IllegalStateException(msg);
+        }
+        return in;
+    }
+
+    /**
+     * Close this log.
+     *
+     * @throws IOException if an I/O error occurs
+     */
+    public void close() throws IOException {
+        if (in != null) {
+            in.close();
+        }
+    }
+}
\ No newline at end of file

Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java?view=diff&rev=473380&r1=473379&r2=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java
(original)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java
Fri Nov 10 08:26:57 2006
@@ -27,15 +27,19 @@
 
 import java.io.IOException;
 import java.io.DataOutputStream;
-import java.io.OutputStream;
 import java.util.ArrayList;
 
 /**
- * Defines methods to write members to a file record.
+ * Allows writing data to a <code>FileRecord</code>.
  */
 class FileRecordOutput {
 
     /**
+     * File record.
+     */
+    private final FileRecord record;
+
+    /**
      * Underlying output stream.
      */
     private final DataOutputStream out;
@@ -58,11 +62,13 @@
     /**
      * Create a new file record.
      *
+     * @param record   file record
      * @param out      outputstream to write to
      * @param resolver namespace resolver
      */
-    public FileRecordOutput(OutputStream out, NamespaceResolver resolver) {
-        this.out = new DataOutputStream(out);
+    public FileRecordOutput(FileRecord record, DataOutputStream out, NamespaceResolver resolver)
{
+        this.record = record;
+        this.out = out;
         this.resolver = resolver;
     }
 
@@ -187,7 +193,7 @@
                 writeInt(index);
             } else {
                 writeByte(FileRecord.UUID_LITERAL);
-                writeString(nodeId.toString());
+                out.write(nodeId.getUUID().getRawBytes());
             }
         }
     }
@@ -218,6 +224,7 @@
             out.close();
         } finally {
             closed = true;
+            record.closed();
         }
     }
 

Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java?view=diff&rev=473380&r1=473379&r2=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java
(original)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java
Fri Nov 10 08:26:57 2006
@@ -834,7 +834,7 @@
                         shared.modified(state);
                     } catch (ItemStateException e) {
                         String msg = "Unable to retrieve state: " + state.getId();
-                        log.warn(msg, e);
+                        log.warn(msg);
                         state.discard();
                     }
                 }



Mime
View raw message