From commits-return-3079-apmail-jackrabbit-commits-archive=jackrabbit.apache.org@jackrabbit.apache.org Fri Nov 10 16:27:30 2006 Return-Path: Delivered-To: apmail-jackrabbit-commits-archive@www.apache.org Received: (qmail 36156 invoked from network); 10 Nov 2006 16:27:29 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 10 Nov 2006 16:27:29 -0000 Received: (qmail 86869 invoked by uid 500); 10 Nov 2006 16:27:40 -0000 Delivered-To: apmail-jackrabbit-commits-archive@jackrabbit.apache.org Received: (qmail 86844 invoked by uid 500); 10 Nov 2006 16:27:40 -0000 Mailing-List: contact commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@jackrabbit.apache.org Delivered-To: mailing list commits@jackrabbit.apache.org Received: (qmail 86834 invoked by uid 99); 10 Nov 2006 16:27:40 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Nov 2006 08:27:40 -0800 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Nov 2006 08:27:27 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id F2C881A9846; Fri, 10 Nov 2006 08:26:58 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r473380 - in /jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core: cluster/ state/ Date: Fri, 10 Nov 2006 16:26:58 -0000 To: commits@jackrabbit.apache.org From: dpfister@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061110162658.F2C881A9846@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dpfister Date: Fri Nov 10 08:26:57 2006 New Revision: 473380 URL: http://svn.apache.org/viewvc?view=rev&rev=473380 Log: JCR-623 Clustering Added: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java?view=diff&rev=473380&r1=473379&r2=473380 ============================================================================== --- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java (original) +++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java Fri Nov 10 08:26:57 2006 @@ -342,7 +342,7 @@ String msg = "Unable to create log entry: " + e.getMessage(); log.error(msg); } catch (Throwable e) { - String msg = "Unexpected error while creating log entry."; + String msg = "Unexpected error while preparing log entry."; log.error(msg, e); } } @@ -357,7 +357,7 @@ String msg = "Unable to create log entry: " + e.getMessage(); log.error(msg); } catch (Throwable e) { - String msg = "Unexpected error while creating log entry."; + String msg = "Unexpected error while committing log entry."; log.error(msg, e); } } @@ -372,7 +372,7 @@ String msg = "Unable to create log entry: " + e.getMessage(); log.error(msg); } catch (Throwable e) { - String msg = "Unexpected error while creating log entry."; + String msg = "Unexpected error while cancelling log entry."; log.error(msg, e); } } Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java?view=diff&rev=473380&r1=473379&r2=473380 ============================================================================== --- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java (original) +++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java Fri Nov 10 08:26:57 2006 @@ -34,8 +34,6 @@ import java.io.File; import java.io.FilenameFilter; import java.io.IOException; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; @@ -45,7 +43,25 @@ import EDU.oswego.cs.dl.util.concurrent.Mutex; /** - * File-based journal implementation. + * File-based journal implementation. A directory specified as directory + * bean property will contain log files and a global revision file, containing the + * latest revision file. When the current log file's size exceeds maxSize + * bytes, it gets renamed to its name appended by '1'. At the same time, all log files + * already having a version counter, get their version counter incremented by 1. + *

+ * It is configured through the following properties: + *

    + *
  • directory: the shared directory where journal logs and read from + * and written to; this is a required property with no default value
  • + *
  • revision: the filename where the parent cluster node's revision + * file should be written to; this is a required property with no default value
  • + *
  • basename: this is the basename of the journal logs created in + * the shared directory; its default value is journal
  • + *
  • maximumSize: this is the maximum size in bytes of a journal log + * before a new log will be created; its default value is 1048576 (1MB)
  • + *
+ * + * todo after some iterations, old files should be automatically compressed to save space */ public class FileJournal implements Journal { @@ -55,6 +71,21 @@ private static final String REVISION_NAME = "revision"; /** + * Log extension. + */ + private static final String LOG_EXTENSION = ".log"; + + /** + * Default base name for journal files. + */ + private static final String DEFAULT_BASENAME = "journal"; + + /** + * Default max size of a journal file (1MB). + */ + private static final int DEFAULT_MAXSIZE = 1048576; + + /** * Logger. */ private static Logger log = LoggerFactory.getLogger(FileJournal.class); @@ -70,7 +101,7 @@ private NamespaceResolver resolver; /** - * Callback. + * Record processor. */ private RecordProcessor processor; @@ -85,6 +116,16 @@ private String revision; /** + * Journal file base name, bean property. + */ + private String basename; + + /** + * Maximum size of a journal file before a rotation takes place, bean property. + */ + private int maximumSize; + + /** * Journal root directory. */ private File root; @@ -152,6 +193,38 @@ } /** + * Bean getter for base name. + * @return base name + */ + public String getBasename() { + return basename; + } + + /** + * Bean setter for basename. + * @param basename base name + */ + public void setBasename(String basename) { + this.basename = basename; + } + + /** + * Bean getter for maximum size. + * @return maximum size + */ + public int getMaximumSize() { + return maximumSize; + } + + /** + * Bean setter for maximum size. + * @param maximumSize maximum size + */ + public void setMaximumSize(int maximumSize) { + this.maximumSize = maximumSize; + } + + /** * {@inheritDoc} */ public void init(String id, RecordProcessor processor, NamespaceResolver resolver) throws JournalException { @@ -167,6 +240,12 @@ String msg = "Revision not specified."; throw new JournalException(msg); } + if (basename == null) { + basename = DEFAULT_BASENAME; + } + if (maximumSize == 0) { + maximumSize = DEFAULT_MAXSIZE; + } root = new File(directory); if (!root.exists() || !root.isDirectory()) { String msg = "Directory specified does either not exist or is not a directory: " + directory; @@ -182,49 +261,48 @@ * {@inheritDoc} */ public void sync() throws JournalException { - final long instanceValue = instanceRevision.get(); - final long globalValue = globalRevision.get(); - - File[] files = root.listFiles(new FilenameFilter() { + File[] logFiles = root.listFiles(new FilenameFilter() { public boolean accept(File dir, String name) { - if (name.endsWith(FileRecord.EXTENSION)) { - int sep = name.indexOf('.'); - if (sep > 0) { - try { - long counter = Long.parseLong(name.substring(0, sep), 16); - return counter > instanceValue && counter <= globalValue; - } catch (NumberFormatException e) { - String msg = "Skipping bogusly named journal file '" + name + "': " + e.getMessage(); - log.warn(msg); - } - } - } - return false; + return name.startsWith(basename + "."); } }); - Arrays.sort(files, new Comparator() { + Arrays.sort(logFiles, new Comparator() { public int compare(Object o1, Object o2) { File f1 = (File) o1; File f2 = (File) o2; - return f1.getName().compareTo(f2.getName()); + return f1.compareTo(f2); } }); - if (files.length > 0) { - for (int i = 0; i < files.length; i++) { - try { - FileRecord record = new FileRecord(files[i]); - if (!record.getJournalId().equals(id)) { + + long instanceValue = instanceRevision.get(); + long globalValue = globalRevision.get(); + + if (instanceValue < globalValue) { + FileRecordCursor cursor = new FileRecordCursor(logFiles, + instanceValue, globalValue); + try { + while (cursor.hasNext()) { + FileRecord record = cursor.next(); + if (!record.getCreator().equals(id)) { process(record); } else { - log.info("Log entry matches journal id, skipped: " + files[i]); + log.info("Log entry matches journal id, skipped: " + record.getRevision()); } - instanceRevision.set(record.getCounter()); - } catch (IllegalArgumentException e) { - String msg = "Skipping bogusly named journal file '" + files[i] + ": " + e.getMessage(); + instanceRevision.set(record.getNextRevision()); + } + } catch (IOException e) { + String msg = "Unable to iterate over modified records: " + e.getMessage(); + throw new JournalException(msg); + + } finally { + try { + cursor.close(); + } catch (IOException e) { + String msg = "I/O error while closing record cursor: " + e.getMessage(); log.warn(msg); } } - log.info("Sync finished, instance revision is: " + FileRecord.toHexString(instanceRevision.get())); + log.info("Sync finished, instance revision is: " + instanceRevision.get()); } } @@ -235,16 +313,12 @@ * @throws JournalException if an error occurs */ void process(FileRecord record) throws JournalException { - File file = record.getFile(); - - log.info("Processing: " + file); + log.info("Processing revision: " + record.getRevision()); - FileRecordInput in = null; + FileRecordInput in = record.getInput(resolver); String workspace = null; try { - in = new FileRecordInput(new FileInputStream(file), resolver); - workspace = in.readString(); if (workspace.equals("")) { workspace = null; @@ -296,23 +370,19 @@ processor.end(); } catch (NameException e) { - String msg = "Unable to read journal entry " + file + ": " + e.getMessage(); + String msg = "Unable to read revision " + record.getRevision() + + ": " + e.getMessage(); throw new JournalException(msg); } catch (IOException e) { - String msg = "Unable to read journal entry " + file + ": " + e.getMessage(); + String msg = "Unable to read revision " + record.getRevision() + + ": " + e.getMessage(); throw new JournalException(msg); } catch (IllegalArgumentException e) { - String msg = "Error while processing journal file " + file + ": " + e.getMessage(); + String msg = "Error while processing revision " + + record.getRevision() + ": " + e.getMessage(); throw new JournalException(msg); } finally { - if (in != null) { - try { - in.close(); - } catch (IOException e) { - String msg = "I/O error while closing " + file + ": " + e.getMessage(); - log.warn(msg); - } - } + in.close(); } } @@ -333,7 +403,9 @@ sync(); tempLog = File.createTempFile("journal", ".tmp", root); - out = new FileRecordOutput(new FileOutputStream(tempLog), resolver); + + record = new FileRecord(id, tempLog); + out = record.getOutput(resolver); out.writeString(workspace != null ? workspace : ""); succeeded = true; @@ -490,7 +562,8 @@ try { sync(); - record = new FileRecord(root, globalRevision.get() + 1, id); + + record.setRevision(globalRevision.get()); prepared = true; } finally { @@ -508,10 +581,22 @@ out.writeChar('\0'); out.close(); - if (!tempLog.renameTo(record.getFile())) { - throw new JournalException("Unable to rename " + tempLog + " to " + record.getFile()); + long nextRevision = record.getNextRevision(); + + File journalFile = new File(root, basename + LOG_EXTENSION); + + FileRecordLog recordLog = new FileRecordLog(journalFile); + if (!recordLog.isNew()) { + if (nextRevision - recordLog.getFirstRevision() > maximumSize) { + switchLogs(); + recordLog = new FileRecordLog(journalFile); + } } - globalRevision.set(record.getCounter()); + recordLog.append(record); + + tempLog.delete(); + globalRevision.set(nextRevision); + instanceRevision.set(nextRevision); } catch (IOException e) { String msg = "Unable to close journal log " + tempLog + ": " + e.getMessage(); @@ -535,6 +620,48 @@ } finally { globalRevision.unlock(); writeMutex.release(); + } + } + + /** + * Move away current journal file (and all other files), incrementing their + * version counter. A file named journal.N.log gets renamed to + * journal.(N+1).log, whereas the main journal file gets renamed + * to journal.1.log. + */ + private void switchLogs() { + FilenameFilter filter = new FilenameFilter() { + public boolean accept(File dir, String name) { + return name.startsWith(basename + "."); + } + }; + File[] files = root.listFiles(filter); + Arrays.sort(files, new Comparator() { + public int compare(Object o1, Object o2) { + File f1 = (File) o1; + File f2 = (File) o2; + return f2.compareTo(f1); + } + }); + for (int i = 0; i < files.length; i++) { + File file = files[i]; + String name = file.getName(); + int sep = name.lastIndexOf('.'); + if (sep != -1) { + String ext = name.substring(sep + 1); + if (ext.equals(LOG_EXTENSION)) { + file.renameTo(new File(root, name + ".1")); + } else { + try { + int version = Integer.parseInt(ext); + String newName = name.substring(0, sep + 1) + + String.valueOf(version + 1); + file.renameTo(new File(newName)); + } catch (NumberFormatException e) { + log.warn("Bogusly named journal file, skipped: " + file); + } + } + } } } } Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java?view=diff&rev=473380&r1=473379&r2=473380 ============================================================================== --- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java (original) +++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java Fri Nov 10 08:26:57 2006 @@ -16,134 +16,364 @@ */ package org.apache.jackrabbit.core.cluster; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; +import org.apache.jackrabbit.name.NamespaceResolver; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.DataInput; import java.io.File; +import java.io.FileOutputStream; +import java.io.BufferedInputStream; +import java.io.FileInputStream; /** - * Represents a file-based record. + * Represents a file-based record. Physically, a file record contains its length in the + * first 4 bytes, immediately followed by its creator in a length-prefixed, UTF-encoded + * string. All further fields are record-specific. */ class FileRecord { /** - * File record extension. + * Indicator for a literal UUID. */ - static final String EXTENSION = ".log"; + static final byte UUID_LITERAL = 'L'; /** - * Indicator for a literal UUID. + * Indicator for a UUID index. */ - static final byte UUID_LITERAL = 0x00; + static final byte UUID_INDEX = 'I'; /** - * Indicator for a UUID index. + * Revision. + */ + private long revision; + + /** + * Underlying input stream. + */ + private DataInputStream in; + + /** + * File use when creating a new record. + */ + private File file; + + /** + * Underlying output stream. */ - static final byte UUID_INDEX = 0x01; + private DataOutputStream out; /** - * Used for padding long string representations. + * Record length. */ - private static final String LONG_PADDING = "0000000000000000"; + private int length; /** - * Underlying file. + * Creator of a record. */ - private final File file; + private String creator; /** - * Counter. + * Bytes used by creator when written in UTF encoding and length-prefixed. */ - private final long counter; + private int creatorLength; /** - * Journal id. + * Flag indicating whether bytes need to be skipped at the end. */ - private final String journalId; + private boolean consumed; + + /** + * Creates a new file record. Used when opening an existing record. + * + * @param revision revision this record represents + * @param in underlying input stream + * @throws IOException if reading the creator fails + */ + public FileRecord(long revision, InputStream in) + throws IOException { + + this.revision = revision; + if (in instanceof DataInputStream) { + this.in = (DataInputStream) in; + } else { + this.in = new DataInputStream(in); + } + this.length = this.in.readInt(); + + readCreator(); + } /** - * Creates a new file record from an existing file. Retrieves meta data by parsing the file's name. + * Creates a new file record. Used when creating a new record. * - * @param file file to use as record - * @throws IllegalArgumentException if file name is bogus + * @param creator creator of this record + * @param file underlying (temporary) file + * @throws IOException if writing the creator fails */ - public FileRecord(File file) throws IllegalArgumentException { + public FileRecord(String creator, File file) throws IOException { + + this.creator = creator; this.file = file; - String name = file.getName(); + this.out = new DataOutputStream(new FileOutputStream(file)); - int sep1 = name.indexOf('.'); - if (sep1 == -1) { - throw new IllegalArgumentException("Missing first . separator."); - } - try { - counter = Long.parseLong(name.substring(0, sep1), 16); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Unable to decompose long: " + e.getMessage()); - } - int sep2 = name.lastIndexOf('.'); - if (sep2 == -1) { - throw new IllegalArgumentException("Missing second . separator."); - } - journalId = name.substring(sep1 + 1, sep2); + writeCreator(); } /** - * Creates a new file record from a counter and instance ID. + * Return the journal revision associated with this record. * - * @param parent parent directory - * @param counter counter to use - * @param journalId journal id to use + * @return revision */ - public FileRecord(File parent, long counter, String journalId) { - StringBuffer name = new StringBuffer(); - name.append(toHexString(counter)); - name.append('.'); - name.append(journalId); + public long getRevision() { + return revision; + } - name.append(EXTENSION); + /** + * Set the journal revision associated with this record. + * + * @param revision journal revision + */ + public void setRevision(long revision) { + this.revision = revision; + } - this.file = new File(parent, name.toString()); - this.counter = counter; - this.journalId = journalId; + /** + * Return the journal counter associated with the next record. + * + * @return next revision + */ + public long getNextRevision() { + return revision + length + 4; } /** - * Return the journal counter associated with this record. + * Return the creator of this record. * - * @return counter + * @return creator */ - public long getCounter() { - return counter; + public String getCreator() { + return creator; } /** - * Return the id of the journal that created this record. + * Return an input on this record. * - * @return journal id + * @param resolver resolver to use when mapping prefixes to full names + * @return record input */ - public String getJournalId() { - return journalId; + public FileRecordInput getInput(NamespaceResolver resolver) { + consumed = true; + return new FileRecordInput(in, resolver); } /** - * Return this record's file. + * Return an output on this record. * - * @return file + * @param resolver resolver to use when mapping full names to prefixes + * @return record output */ - public File getFile() { - return file; + public FileRecordOutput getOutput(NamespaceResolver resolver) { + return new FileRecordOutput(this, out, resolver); + } + + /** + * Append this record to some output stream. + * + * @param out outputstream to append to + */ + void append(DataOutputStream out) throws IOException { + out.writeInt(length); + + byte[] buffer = new byte[8192]; + int len; + + InputStream in = new BufferedInputStream(new FileInputStream(file)); + try { + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + out.flush(); + } finally { + in.close(); + } + } + + /** + * Skip over this record, positioning the underlying input stream + * on the next available record. + * + * @throws IOException if an I/O error occurs + */ + void skip() throws IOException { + if (!consumed) { + long skiplen = length - creatorLength; + while (skiplen > 0) { + long skipped = in.skip(skiplen); + if (skipped <= 0) { + break; + } + skiplen -= skipped; + } + if (skiplen != 0) { + String msg = "Unable to skip remaining bytes."; + throw new IOException(msg); + } + } } /** - * Return a zero-padded long string representation. + * Invoked when output has been closed. */ - public static String toHexString(long l) { - String s = Long.toHexString(l); - int padlen = LONG_PADDING.length() - s.length(); - if (padlen > 0) { - s = LONG_PADDING.substring(0, padlen) + s; + void closed() { + length = (int) file.length(); + } + + /** + * Read creator from the underlying data input stream. + * + * @throws IOException if an I/O error occurs + */ + private void readCreator() throws IOException { + UTFByteCounter counter = new UTFByteCounter(in); + creator = DataInputStream.readUTF(counter); + creatorLength = counter.getBytes(); + } + + /** + * Write creator to the underlying data output stream. + * + * @throws IOException if an I/O error occurs + */ + private void writeCreator() throws IOException { + out.writeUTF(creator); + } + + /** + * UTF byte counter. Counts the bytes actually read from a given + * DataInputStream that make up a UTF-encoded string. + */ + static class UTFByteCounter implements DataInput { + + /** + * Underlying input stream. + */ + private final DataInputStream in; + + /** + * UTF length. + */ + private int bytes; + + /** + * Create a new instance of this class. + * + * @param in underlying data input stream + */ + public UTFByteCounter(DataInputStream in) { + this.in = in; + } + + /** + * Return the number of bytes read from the underlying input stream. + * + * @return number of bytes + */ + public int getBytes() { + return bytes; + } + + /** + * @see java.io.DataInputStream#readUnsignedShort() + * + * Remember number of bytes read. + */ + public int readUnsignedShort() throws IOException { + try { + return in.readUnsignedShort(); + } finally { + bytes += 2; + } + } + + /** + * @see java.io.DataInputStream#readUnsignedShort() + * + * Remember number of bytes read. + */ + public void readFully(byte b[]) throws IOException { + try { + in.readFully(b); + } finally { + bytes += b.length; + } + } + + /** + * @see java.io.DataInputStream#readUnsignedShort() + * + * Remember number of bytes read. + */ + public void readFully(byte b[], int off, int len) throws IOException { + try { + in.readFully(b, off, len); + } finally { + bytes += b.length; + } + } + + /** + * Methods not implemented. + */ + public byte readByte() throws IOException { + throw new IllegalStateException("Unexpected call, deliberately not implemented."); + } + + public char readChar() throws IOException { + throw new IllegalStateException("Unexpected call, deliberately not implemented."); + } + + public double readDouble() throws IOException { + throw new IllegalStateException("Unexpected call, deliberately not implemented."); + } + + public float readFloat() throws IOException { + throw new IllegalStateException("Unexpected call, deliberately not implemented."); + } + + public int readInt() throws IOException { + throw new IllegalStateException("Unexpected call, deliberately not implemented."); + } + + public int readUnsignedByte() throws IOException { + throw new IllegalStateException("Unexpected call, deliberately not implemented."); + } + + public long readLong() throws IOException { + throw new IllegalStateException("Unexpected call, deliberately not implemented."); + } + + public short readShort() throws IOException { + throw new IllegalStateException("Unexpected call, deliberately not implemented."); + } + + public boolean readBoolean() throws IOException { + throw new IllegalStateException("Unexpected call, deliberately not implemented."); + } + + public int skipBytes(int n) throws IOException { + throw new IllegalStateException("Unexpected call, deliberately not implemented."); + } + + public String readLine() throws IOException { + throw new IllegalStateException("Unexpected call, deliberately not implemented."); + } + + public String readUTF() throws IOException { + throw new IllegalStateException("Unexpected call, deliberately not implemented."); } - return s; } } Added: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java?view=auto&rev=473380 ============================================================================== --- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java (added) +++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java Fri Nov 10 08:26:57 2006 @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.cluster; + +import java.io.File; +import java.io.IOException; + +/** + * Record cursor that returns unseen revisions in ascending order on every + * iteration. When iterating, a record must either be completely processed + * or its {@link FileRecord#skip()} method must be invoked to guarantee + * that this cursor is pointing at the next record. + */ +class FileRecordCursor { + + /** + * Log files to scan for revisions. + */ + private File[] logFiles; + + /** + * Next revision to visit. + */ + private long nextRevision; + + /** + * Last revision to visit. + */ + private long lastRevision; + + /** + * Current record log, containing file records. + */ + private FileRecordLog recordLog; + + /** + * Current record. + */ + private FileRecord record; + + /** + * Creates a new instance of this class. + * + * @param logFiles available log files, sorted ascending by age + * @param firstRevision first revision to return + * @param lastRevision last revision to return + */ + public FileRecordCursor(File[] logFiles, long firstRevision, long lastRevision) { + this.logFiles = logFiles; + this.nextRevision = firstRevision; + this.lastRevision = lastRevision; + } + + + /** + * Return a flag indicating whether there are next records. + */ + public boolean hasNext() { + return nextRevision < lastRevision; + } + + /** + * Returns the next record. + * + * @throws IllegalStateException if no next revision exists + * @throws IOException if an I/O error occurs + */ + public FileRecord next() throws IOException { + if (!hasNext()) { + String msg = "No next revision."; + throw new IllegalStateException(msg); + } + if (record != null) { + record.skip(); + record = null; + } + if (recordLog != null) { + if (!recordLog.contains(nextRevision)) { + recordLog.close(); + recordLog = null; + } + } + if (recordLog == null) { + recordLog = getRecordLog(nextRevision); + recordLog.seek(nextRevision); + } + record = new FileRecord(nextRevision, recordLog.getInputStream()); + nextRevision = record.getNextRevision(); + return record; + } + + /** + * Return record log containing a given revision. + * + * @param revision revision to locate + * @return record log containing that revision + * @throws IOException if an I/O error occurs + */ + private FileRecordLog getRecordLog(long revision) throws IOException { + for (int i = 0; i < logFiles.length; i++) { + FileRecordLog recordLog = new FileRecordLog(logFiles[i]); + if (recordLog.contains(revision)) { + return recordLog; + } + } + String msg = "No log file found containing revision: " + revision; + throw new IOException(msg); + } + + /** + * Close this cursor, releasing its resources. + * + * @throws IOException if an I/O error occurs + */ + public void close() throws IOException { + if (recordLog != null) { + recordLog.close(); + } + } +} \ No newline at end of file Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java?view=diff&rev=473380&r1=473379&r2=473380 ============================================================================== --- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java (original) +++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java Fri Nov 10 08:26:57 2006 @@ -26,16 +26,15 @@ import org.apache.jackrabbit.name.Path; import org.apache.jackrabbit.name.PathFormat; import org.apache.jackrabbit.name.MalformedPathException; +import org.apache.jackrabbit.uuid.Constants; +import org.apache.jackrabbit.uuid.UUID; -import java.io.File; -import java.io.RandomAccessFile; import java.io.IOException; -import java.io.InputStream; import java.io.DataInputStream; import java.util.ArrayList; /** - * Defines methods to read members out of a file record. + * Allows reading data from a FileRecord. */ class FileRecordInput { @@ -65,8 +64,8 @@ * @param in underlying input stream * @param resolver namespace resolver */ - public FileRecordInput(InputStream in, NamespaceResolver resolver) { - this.in = new DataInputStream(in); + public FileRecordInput(DataInputStream in, NamespaceResolver resolver) { + this.in = in; this.resolver = resolver; } @@ -186,20 +185,22 @@ public NodeId readNodeId() throws IOException { checkOpen(); - byte b = readByte(); - if (b == FileRecord.UUID_INDEX) { + byte uuidType = readByte(); + if (uuidType == FileRecord.UUID_INDEX) { int index = readInt(); if (index == -1) { return null; } else { return (NodeId) uuidIndex.get(index); } - } else if (b == FileRecord.UUID_LITERAL) { - NodeId nodeId = NodeId.valueOf(readString()); + } else if (uuidType == FileRecord.UUID_LITERAL) { + byte[] b = new byte[Constants.UUID_BYTE_LENGTH]; + in.readFully(b); + NodeId nodeId = new NodeId(new UUID(b)); uuidIndex.add(nodeId); return nodeId; } else { - String msg = "UUID indicator unknown: " + b; + String msg = "UUID type unknown: " + uuidType; throw new IOException(msg); } } @@ -219,18 +220,12 @@ } /** - * Close this input. - * - * @throws IOException if an I/O error occurs + * Close this input. Does not close underlying stream as this is a shared resource. */ - public void close() throws IOException { + public void close() { checkOpen(); - try { - in.close(); - } finally { - closed = true; - } + closed = true; } /** Added: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java?view=auto&rev=473380 ============================================================================== --- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java (added) +++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java Fri Nov 10 08:26:57 2006 @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.cluster; + +import java.io.File; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.FileInputStream; +import java.io.BufferedInputStream; +import java.io.DataOutputStream; +import java.io.FileOutputStream; + +/** + * A file record log is a file containing {@link FileRecord}s. Internally, + * the first 8 bytes contain the revision this log starts with. + */ +class FileRecordLog { + + /** + * Underlying file. + */ + private File file; + + /** + * Flag indicating whether this is a new log. + */ + private boolean isNew; + + /** + * Input stream used when seeking a specific record. + */ + private DataInputStream in; + + /** + * First revision available in this log. + */ + private long minRevision; + + /** + * First revision that is not available in this, but in the next log. + */ + private long maxRevision; + + /** + * Create a new instance of this class. + * + * @param file file containing record log + * @throws IOException if an I/O error occurs + */ + public FileRecordLog(File file) throws IOException { + this.file = file; + + if (file.exists()) { + DataInputStream in = new DataInputStream(new FileInputStream(file)); + + try { + minRevision = in.readLong(); + maxRevision = minRevision + file.length() - 8; + } finally { + in.close(); + } + } else { + isNew = true; + } + } + + /** + * Return the first revision. + * + * @return first revision + */ + public long getFirstRevision() { + return minRevision; + } + + /** + * Return a flag indicating whether this record log contains a certain revision. + * + * @param revision revision to look for + * @return true if this record log contain a certain revision; + * false otherwise + */ + public boolean contains(long revision) { + return (revision >= minRevision && revision < maxRevision); + } + + /** + * Return a flag indicating whether this record log is new. + * + * @return true if this record log is new; + * false otherwise + */ + public boolean isNew() { + return isNew; + } + + /** + * Seek an entry. This is an operation that allows the unterlying input stream + * to be sequentially scanned and must therefore not be called twice. + * + * @param revision revision to seek + * @throws IOException if an I/O error occurs + */ + public void seek(long revision) throws IOException { + if (in != null) { + String msg = "Seek allowed exactly once."; + throw new IllegalStateException(msg); + } + open(); + + long skiplen = revision - minRevision + 8; + while (skiplen > 0) { + long skipped = in.skip(skiplen); + if (skipped <= 0) { + break; + } + skiplen -= skipped; + } + if (skiplen != 0) { + String msg = "Unable to skip remaining bytes."; + throw new IOException(msg); + } + } + + /** + * Append a record to this log. + * + * @param record record to add + * @throws IOException if an I/O error occurs + */ + public void append(FileRecord record) throws IOException { + DataOutputStream out = new DataOutputStream(new FileOutputStream(file, true)); + try { + if (isNew) { + out.writeLong(record.getRevision()); + } + record.append(out); + } finally { + out.close(); + } + } + + /** + * Open this log. + * + * @throws IOException if an I/O error occurs + */ + private void open() throws IOException { + in = new DataInputStream(new BufferedInputStream( + new FileInputStream(file))); + } + + /** + * Return the underlying input stream. + * + * @return underlying input stream + */ + protected DataInputStream getInputStream() { + if (in == null) { + String msg = "Input stream not open."; + throw new IllegalStateException(msg); + } + return in; + } + + /** + * Close this log. + * + * @throws IOException if an I/O error occurs + */ + public void close() throws IOException { + if (in != null) { + in.close(); + } + } +} \ No newline at end of file Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java?view=diff&rev=473380&r1=473379&r2=473380 ============================================================================== --- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java (original) +++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java Fri Nov 10 08:26:57 2006 @@ -27,15 +27,19 @@ import java.io.IOException; import java.io.DataOutputStream; -import java.io.OutputStream; import java.util.ArrayList; /** - * Defines methods to write members to a file record. + * Allows writing data to a FileRecord. */ class FileRecordOutput { /** + * File record. + */ + private final FileRecord record; + + /** * Underlying output stream. */ private final DataOutputStream out; @@ -58,11 +62,13 @@ /** * Create a new file record. * + * @param record file record * @param out outputstream to write to * @param resolver namespace resolver */ - public FileRecordOutput(OutputStream out, NamespaceResolver resolver) { - this.out = new DataOutputStream(out); + public FileRecordOutput(FileRecord record, DataOutputStream out, NamespaceResolver resolver) { + this.record = record; + this.out = out; this.resolver = resolver; } @@ -187,7 +193,7 @@ writeInt(index); } else { writeByte(FileRecord.UUID_LITERAL); - writeString(nodeId.toString()); + out.write(nodeId.getUUID().getRawBytes()); } } } @@ -218,6 +224,7 @@ out.close(); } finally { closed = true; + record.closed(); } } Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java?view=diff&rev=473380&r1=473379&r2=473380 ============================================================================== --- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java (original) +++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java Fri Nov 10 08:26:57 2006 @@ -834,7 +834,7 @@ shared.modified(state); } catch (ItemStateException e) { String msg = "Unable to retrieve state: " + state.getId(); - log.warn(msg, e); + log.warn(msg); state.discard(); } }